Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
9use crate::store::tool_span_index::{
10    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
11};
12use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
13use crate::sync::context::SyncIngestContext;
14use crate::sync::outbound::outbound_event_from_row;
15use crate::sync::redact::redact_payload;
16use crate::sync::smart::enqueue_tool_spans_for_session;
17use anyhow::{Context, Result};
18use rusqlite::types::Value;
19use rusqlite::{
20    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
21};
22use std::cell::RefCell;
23use std::collections::{HashMap, HashSet};
24use std::path::{Path, PathBuf};
25
26/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
27/// Rows below this use `sessions.started_at_ms` for time-window matching.
28const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
29const DEFAULT_MMAP_MB: u64 = 256;
30const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
31    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
32    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
33    repo_file_count, repo_total_loc FROM sessions";
34
35const MIGRATIONS: &[&str] = &[
36    "CREATE TABLE IF NOT EXISTS sessions (
37        id TEXT PRIMARY KEY,
38        agent TEXT NOT NULL,
39        model TEXT,
40        workspace TEXT NOT NULL,
41        started_at_ms INTEGER NOT NULL,
42        ended_at_ms INTEGER,
43        status TEXT NOT NULL,
44        trace_path TEXT NOT NULL
45    )",
46    "CREATE TABLE IF NOT EXISTS events (
47        id INTEGER PRIMARY KEY AUTOINCREMENT,
48        session_id TEXT NOT NULL,
49        seq INTEGER NOT NULL,
50        ts_ms INTEGER NOT NULL,
51        kind TEXT NOT NULL,
52        source TEXT NOT NULL,
53        tool TEXT,
54        tokens_in INTEGER,
55        tokens_out INTEGER,
56        cost_usd_e6 INTEGER,
57        payload TEXT NOT NULL
58    )",
59    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
60    "CREATE TABLE IF NOT EXISTS files_touched (
61        id INTEGER PRIMARY KEY AUTOINCREMENT,
62        session_id TEXT NOT NULL,
63        path TEXT NOT NULL
64    )",
65    "CREATE TABLE IF NOT EXISTS skills_used (
66        id INTEGER PRIMARY KEY AUTOINCREMENT,
67        session_id TEXT NOT NULL,
68        skill TEXT NOT NULL
69    )",
70    "CREATE TABLE IF NOT EXISTS sync_outbox (
71        id INTEGER PRIMARY KEY AUTOINCREMENT,
72        session_id TEXT NOT NULL,
73        payload TEXT NOT NULL,
74        sent INTEGER NOT NULL DEFAULT 0
75    )",
76    "CREATE TABLE IF NOT EXISTS experiments (
77        id TEXT PRIMARY KEY,
78        name TEXT NOT NULL,
79        created_at_ms INTEGER NOT NULL,
80        metadata TEXT NOT NULL DEFAULT '{}'
81    )",
82    "CREATE TABLE IF NOT EXISTS experiment_tags (
83        experiment_id TEXT NOT NULL,
84        session_id TEXT NOT NULL,
85        variant TEXT NOT NULL,
86        PRIMARY KEY (experiment_id, session_id)
87    )",
88    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
89    "CREATE TABLE IF NOT EXISTS sync_state (
90        k TEXT PRIMARY KEY,
91        v TEXT NOT NULL
92    )",
93    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
94    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
95    "CREATE TABLE IF NOT EXISTS tool_spans (
96        span_id TEXT PRIMARY KEY,
97        session_id TEXT NOT NULL,
98        tool TEXT,
99        tool_call_id TEXT,
100        status TEXT NOT NULL,
101        started_at_ms INTEGER,
102        ended_at_ms INTEGER,
103        lead_time_ms INTEGER,
104        tokens_in INTEGER,
105        tokens_out INTEGER,
106        reasoning_tokens INTEGER,
107        cost_usd_e6 INTEGER,
108        paths_json TEXT NOT NULL DEFAULT '[]'
109    )",
110    "CREATE TABLE IF NOT EXISTS tool_span_paths (
111        span_id TEXT NOT NULL,
112        path TEXT NOT NULL,
113        PRIMARY KEY (span_id, path)
114    )",
115    "CREATE TABLE IF NOT EXISTS session_repo_binding (
116        session_id TEXT PRIMARY KEY,
117        start_commit TEXT,
118        end_commit TEXT,
119        branch TEXT,
120        dirty_start INTEGER,
121        dirty_end INTEGER,
122        repo_binding_source TEXT NOT NULL DEFAULT ''
123    )",
124    "CREATE TABLE IF NOT EXISTS repo_snapshots (
125        id TEXT PRIMARY KEY,
126        workspace TEXT NOT NULL,
127        head_commit TEXT,
128        dirty_fingerprint TEXT NOT NULL,
129        analyzer_version TEXT NOT NULL,
130        indexed_at_ms INTEGER NOT NULL,
131        dirty INTEGER NOT NULL DEFAULT 0,
132        graph_path TEXT NOT NULL
133    )",
134    "CREATE TABLE IF NOT EXISTS file_facts (
135        snapshot_id TEXT NOT NULL,
136        path TEXT NOT NULL,
137        language TEXT NOT NULL,
138        bytes INTEGER NOT NULL,
139        loc INTEGER NOT NULL,
140        sloc INTEGER NOT NULL,
141        complexity_total INTEGER NOT NULL,
142        max_fn_complexity INTEGER NOT NULL,
143        symbol_count INTEGER NOT NULL,
144        import_count INTEGER NOT NULL,
145        fan_in INTEGER NOT NULL,
146        fan_out INTEGER NOT NULL,
147        churn_30d INTEGER NOT NULL,
148        churn_90d INTEGER NOT NULL,
149        authors_90d INTEGER NOT NULL,
150        last_changed_ms INTEGER,
151        PRIMARY KEY (snapshot_id, path)
152    )",
153    "CREATE TABLE IF NOT EXISTS repo_edges (
154        snapshot_id TEXT NOT NULL,
155        from_id TEXT NOT NULL,
156        to_id TEXT NOT NULL,
157        kind TEXT NOT NULL,
158        weight INTEGER NOT NULL,
159        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
160    )",
161    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
162    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
163    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
164    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
165    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
166        ON sessions(workspace, started_at_ms DESC, id ASC)",
167    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
168        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
169    "CREATE TABLE IF NOT EXISTS rules_used (
170        id INTEGER PRIMARY KEY AUTOINCREMENT,
171        session_id TEXT NOT NULL,
172        rule TEXT NOT NULL
173    )",
174    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
175    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
176    "CREATE TABLE IF NOT EXISTS remote_pull_state (
177        id INTEGER PRIMARY KEY CHECK (id = 1),
178        query_provider TEXT NOT NULL DEFAULT 'none',
179        cursor_json TEXT NOT NULL DEFAULT '',
180        last_success_ms INTEGER
181    )",
182    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
183    "CREATE TABLE IF NOT EXISTS remote_sessions (
184        team_id TEXT NOT NULL,
185        workspace_hash TEXT NOT NULL,
186        session_id_hash TEXT NOT NULL,
187        json TEXT NOT NULL,
188        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
189    )",
190    "CREATE TABLE IF NOT EXISTS remote_events (
191        team_id TEXT NOT NULL,
192        workspace_hash TEXT NOT NULL,
193        session_id_hash TEXT NOT NULL,
194        event_seq INTEGER NOT NULL,
195        json TEXT NOT NULL,
196        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
197    )",
198    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
199        team_id TEXT NOT NULL,
200        workspace_hash TEXT NOT NULL,
201        span_id_hash TEXT NOT NULL,
202        json TEXT NOT NULL,
203        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
204    )",
205    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
206        team_id TEXT NOT NULL,
207        workspace_hash TEXT NOT NULL,
208        snapshot_id_hash TEXT NOT NULL,
209        chunk_index INTEGER NOT NULL,
210        json TEXT NOT NULL,
211        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
212    )",
213    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
214        team_id TEXT NOT NULL,
215        workspace_hash TEXT NOT NULL,
216        fact_key TEXT NOT NULL,
217        json TEXT NOT NULL,
218        PRIMARY KEY (team_id, workspace_hash, fact_key)
219    )",
220    "CREATE TABLE IF NOT EXISTS session_evals (
221        id            TEXT    PRIMARY KEY,
222        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
223        judge_model   TEXT    NOT NULL,
224        rubric_id     TEXT    NOT NULL,
225        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
226        rationale     TEXT    NOT NULL,
227        flagged       INTEGER NOT NULL DEFAULT 0,
228        created_at_ms INTEGER NOT NULL
229    );
230    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
231    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
232    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
233        fingerprint   TEXT    PRIMARY KEY,
234        captured_at_ms INTEGER NOT NULL,
235        files_json    TEXT    NOT NULL,
236        total_bytes   INTEGER NOT NULL
237    )",
238    "CREATE TABLE IF NOT EXISTS session_feedback (
239        id TEXT PRIMARY KEY,
240        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
241        score INTEGER CHECK(score BETWEEN 1 AND 5),
242        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
243        note TEXT,
244        created_at_ms INTEGER NOT NULL
245    );
246    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
247    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
248    "CREATE TABLE IF NOT EXISTS session_outcomes (
249        session_id TEXT PRIMARY KEY NOT NULL,
250        test_passed INTEGER,
251        test_failed INTEGER,
252        test_skipped INTEGER,
253        build_ok INTEGER,
254        lint_errors INTEGER,
255        revert_lines_14d INTEGER,
256        pr_open INTEGER,
257        ci_ok INTEGER,
258        measured_at_ms INTEGER NOT NULL,
259        measure_error TEXT
260    )",
261    "CREATE TABLE IF NOT EXISTS session_samples (
262        session_id TEXT NOT NULL,
263        ts_ms INTEGER NOT NULL,
264        pid INTEGER NOT NULL,
265        cpu_percent REAL,
266        rss_bytes INTEGER,
267        PRIMARY KEY (session_id, ts_ms, pid)
268    )",
269    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
270    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
271    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
272    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
273    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
274    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
275    "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
276    "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
277    "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
278    "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
279    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
280];
281
282/// Per-workspace activity dashboard stats.
283#[derive(Clone)]
284pub struct InsightsStats {
285    pub total_sessions: u64,
286    pub running_sessions: u64,
287    pub total_events: u64,
288    /// (day label e.g. "Mon", count) last 7 days oldest first
289    pub sessions_by_day: Vec<(String, u64)>,
290    /// Recent sessions DESC by started_at, max 3; paired with event count
291    pub recent: Vec<(SessionRecord, u64)>,
292    /// Top tools by event count, max 5
293    pub top_tools: Vec<(String, u64)>,
294    pub total_cost_usd_e6: i64,
295    pub sessions_with_cost: u64,
296}
297
298/// Sync daemon / outbox status for `kaizen sync status`.
299pub struct SyncStatusSnapshot {
300    pub pending_outbox: u64,
301    pub last_success_ms: Option<u64>,
302    pub last_error: Option<String>,
303    pub consecutive_failures: u32,
304}
305
306/// Aggregate stats across sessions + events for a workspace.
307#[derive(serde::Serialize)]
308pub struct SummaryStats {
309    pub session_count: u64,
310    pub total_cost_usd_e6: i64,
311    pub by_agent: Vec<(String, u64)>,
312    pub by_model: Vec<(String, u64)>,
313    pub top_tools: Vec<(String, u64)>,
314}
315
316/// Skill vs Cursor rule for [`GuidancePerfRow`].
317#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
318#[serde(rename_all = "lowercase")]
319pub enum GuidanceKind {
320    Skill,
321    Rule,
322}
323
324/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
325#[derive(Clone, Debug, serde::Serialize)]
326pub struct GuidancePerfRow {
327    pub kind: GuidanceKind,
328    pub id: String,
329    pub sessions: u64,
330    pub sessions_pct: f64,
331    pub total_cost_usd_e6: i64,
332    pub avg_cost_per_session_usd: Option<f64>,
333    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
334    pub on_disk: bool,
335}
336
337/// Aggregated skill/rule adoption and cost proxy for a time window.
338#[derive(Clone, Debug, serde::Serialize)]
339pub struct GuidanceReport {
340    pub workspace: String,
341    pub window_start_ms: u64,
342    pub window_end_ms: u64,
343    pub sessions_in_window: u64,
344    pub workspace_avg_cost_per_session_usd: Option<f64>,
345    pub rows: Vec<GuidancePerfRow>,
346}
347
348/// Result of [`Store::prune_sessions_started_before`].
349#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
350pub struct PruneStats {
351    pub sessions_removed: u64,
352    pub events_removed: u64,
353}
354
355/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
356#[derive(Debug, Clone, Eq, PartialEq)]
357pub struct SessionOutcomeRow {
358    pub session_id: String,
359    pub test_passed: Option<i64>,
360    pub test_failed: Option<i64>,
361    pub test_skipped: Option<i64>,
362    pub build_ok: Option<bool>,
363    pub lint_errors: Option<i64>,
364    pub revert_lines_14d: Option<i64>,
365    pub pr_open: Option<i64>,
366    pub ci_ok: Option<bool>,
367    pub measured_at_ms: u64,
368    pub measure_error: Option<String>,
369}
370
371/// Aggregated process samples for retro (Tier D).
372#[derive(Debug, Clone)]
373pub struct SessionSampleAgg {
374    pub session_id: String,
375    pub sample_count: u64,
376    pub max_cpu_percent: f64,
377    pub max_rss_bytes: u64,
378}
379
380/// `sync_state` keys for agent rescan throttling and auto-prune.
381pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
382pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
383pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
384
385pub struct ToolSpanSyncRow {
386    pub span_id: String,
387    pub session_id: String,
388    pub tool: Option<String>,
389    pub tool_call_id: Option<String>,
390    pub status: String,
391    pub started_at_ms: Option<u64>,
392    pub ended_at_ms: Option<u64>,
393    pub lead_time_ms: Option<u64>,
394    pub tokens_in: Option<u32>,
395    pub tokens_out: Option<u32>,
396    pub reasoning_tokens: Option<u32>,
397    pub cost_usd_e6: Option<i64>,
398    pub paths: Vec<String>,
399}
400
401#[derive(Debug, Clone, Copy, Eq, PartialEq)]
402pub enum StoreOpenMode {
403    ReadWrite,
404    ReadOnlyQuery,
405}
406
407#[derive(Debug, Clone)]
408pub struct SessionStatusRow {
409    pub id: String,
410    pub status: SessionStatus,
411    pub ended_at_ms: Option<u64>,
412}
413
414#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
415pub struct SessionFilter {
416    pub agent_prefix: Option<String>,
417    pub status: Option<SessionStatus>,
418    pub since_ms: Option<u64>,
419}
420
421#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
422pub struct SessionPage {
423    pub rows: Vec<SessionRecord>,
424    pub total: usize,
425    pub next_offset: Option<usize>,
426}
427
428#[derive(Clone)]
429struct SpanTreeCacheEntry {
430    session_id: String,
431    last_event_seq: Option<u64>,
432    nodes: Vec<crate::store::span_tree::SpanNode>,
433}
434
435pub struct Store {
436    conn: Connection,
437    root: PathBuf,
438    hot_log: RefCell<Option<HotLog>>,
439    search_writer: RefCell<Option<crate::search::PendingWriter>>,
440    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
441    projector: RefCell<Projector>,
442}
443
444impl Store {
445    pub(crate) fn conn(&self) -> &Connection {
446        &self.conn
447    }
448
449    pub fn open(path: &Path) -> Result<Self> {
450        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
451    }
452
453    pub fn open_read_only(path: &Path) -> Result<Self> {
454        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
455    }
456
457    pub fn open_query(path: &Path) -> Result<Self> {
458        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
459    }
460
461    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
462        if let Some(parent) = path.parent() {
463            std::fs::create_dir_all(parent)?;
464        }
465        let conn = match mode {
466            StoreOpenMode::ReadWrite => Connection::open(path),
467            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
468                path,
469                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
470            ),
471        }
472        .with_context(|| format!("open db: {}", path.display()))?;
473        apply_pragmas(&conn, mode)?;
474        if mode == StoreOpenMode::ReadWrite {
475            for sql in MIGRATIONS {
476                conn.execute_batch(sql)?;
477            }
478            ensure_schema_columns(&conn)?;
479        }
480        let store = Self {
481            conn,
482            root: path
483                .parent()
484                .unwrap_or_else(|| Path::new("."))
485                .to_path_buf(),
486            hot_log: RefCell::new(None),
487            search_writer: RefCell::new(None),
488            span_tree_cache: RefCell::new(None),
489            projector: RefCell::new(Projector::default()),
490        };
491        if mode == StoreOpenMode::ReadWrite {
492            store.warm_projector()?;
493        }
494        Ok(store)
495    }
496
497    fn invalidate_span_tree_cache(&self) {
498        *self.span_tree_cache.borrow_mut() = None;
499    }
500
501    fn warm_projector(&self) -> Result<()> {
502        let ids = self.running_session_ids()?;
503        let mut projector = self.projector.borrow_mut();
504        for id in ids {
505            for event in self.list_events_for_session(&id)? {
506                let _ = projector.apply(&event);
507            }
508        }
509        Ok(())
510    }
511
512    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
513        self.conn.execute(
514            "INSERT INTO sessions (
515                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
516                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
517                prompt_fingerprint, parent_session_id, agent_version, os, arch,
518                repo_file_count, repo_total_loc
519             )
520             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
521                ?16, ?17, ?18, ?19, ?20, ?21)
522             ON CONFLICT(id) DO UPDATE SET
523               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
524               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
525               status=excluded.status, trace_path=excluded.trace_path,
526               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
527               branch=excluded.branch, dirty_start=excluded.dirty_start,
528               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
529               prompt_fingerprint=excluded.prompt_fingerprint,
530               parent_session_id=excluded.parent_session_id,
531               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
532               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
533            params![
534                s.id,
535                s.agent,
536                s.model,
537                s.workspace,
538                s.started_at_ms as i64,
539                s.ended_at_ms.map(|v| v as i64),
540                format!("{:?}", s.status),
541                s.trace_path,
542                s.start_commit,
543                s.end_commit,
544                s.branch,
545                s.dirty_start.map(bool_to_i64),
546                s.dirty_end.map(bool_to_i64),
547                s.repo_binding_source.clone().unwrap_or_default(),
548                s.prompt_fingerprint.as_deref(),
549                s.parent_session_id.as_deref(),
550                s.agent_version.as_deref(),
551                s.os.as_deref(),
552                s.arch.as_deref(),
553                s.repo_file_count.map(|v| v as i64),
554                s.repo_total_loc.map(|v| v as i64),
555            ],
556        )?;
557        self.conn.execute(
558            "INSERT INTO session_repo_binding (
559                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
560             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
561             ON CONFLICT(session_id) DO UPDATE SET
562                start_commit=excluded.start_commit,
563                end_commit=excluded.end_commit,
564                branch=excluded.branch,
565                dirty_start=excluded.dirty_start,
566                dirty_end=excluded.dirty_end,
567                repo_binding_source=excluded.repo_binding_source",
568            params![
569                s.id,
570                s.start_commit,
571                s.end_commit,
572                s.branch,
573                s.dirty_start.map(bool_to_i64),
574                s.dirty_end.map(bool_to_i64),
575                s.repo_binding_source.clone().unwrap_or_default(),
576            ],
577        )?;
578        Ok(())
579    }
580
581    /// Insert a minimal session row if none exists. Used by hook ingestion when
582    /// the first observed event is not `SessionStart` (hooks installed mid-session).
583    pub fn ensure_session_stub(
584        &self,
585        id: &str,
586        agent: &str,
587        workspace: &str,
588        started_at_ms: u64,
589    ) -> Result<()> {
590        self.conn.execute(
591            "INSERT OR IGNORE INTO sessions (
592                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
593                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
594                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
595             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
596                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
597            params![id, agent, workspace, started_at_ms as i64],
598        )?;
599        Ok(())
600    }
601
602    /// Next `seq` for a new event in this session (0 when there are no events yet).
603    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
604        let n: i64 = self.conn.query_row(
605            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
606            [session_id],
607            |r| r.get(0),
608        )?;
609        Ok(n as u64)
610    }
611
612    pub fn append_event(&self, e: &Event) -> Result<()> {
613        self.append_event_with_sync(e, None)
614    }
615
616    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
617    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
618        let last_before = if projector_legacy_mode() {
619            None
620        } else {
621            self.last_event_seq_for_session(&e.session_id)?
622        };
623        let payload = serde_json::to_string(&e.payload)?;
624        self.conn.execute(
625            "INSERT INTO events (
626                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
627                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
628                stop_reason, latency_ms, ttft_ms, retry_count,
629                context_used_tokens, context_max_tokens,
630                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
631             ) VALUES (
632                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
633                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
634             )
635             ON CONFLICT(session_id, seq) DO UPDATE SET
636                ts_ms = excluded.ts_ms,
637                ts_exact = excluded.ts_exact,
638                kind = excluded.kind,
639                source = excluded.source,
640                tool = excluded.tool,
641                tool_call_id = excluded.tool_call_id,
642                tokens_in = excluded.tokens_in,
643                tokens_out = excluded.tokens_out,
644                reasoning_tokens = excluded.reasoning_tokens,
645                cost_usd_e6 = excluded.cost_usd_e6,
646                payload = excluded.payload,
647                stop_reason = excluded.stop_reason,
648                latency_ms = excluded.latency_ms,
649                ttft_ms = excluded.ttft_ms,
650                retry_count = excluded.retry_count,
651                context_used_tokens = excluded.context_used_tokens,
652                context_max_tokens = excluded.context_max_tokens,
653                cache_creation_tokens = excluded.cache_creation_tokens,
654                cache_read_tokens = excluded.cache_read_tokens,
655                system_prompt_tokens = excluded.system_prompt_tokens",
656            params![
657                e.session_id,
658                e.seq as i64,
659                e.ts_ms as i64,
660                bool_to_i64(e.ts_exact),
661                format!("{:?}", e.kind),
662                format!("{:?}", e.source),
663                e.tool,
664                e.tool_call_id,
665                e.tokens_in.map(|v| v as i64),
666                e.tokens_out.map(|v| v as i64),
667                e.reasoning_tokens.map(|v| v as i64),
668                e.cost_usd_e6,
669                payload,
670                e.stop_reason,
671                e.latency_ms.map(|v| v as i64),
672                e.ttft_ms.map(|v| v as i64),
673                e.retry_count.map(|v| v as i64),
674                e.context_used_tokens.map(|v| v as i64),
675                e.context_max_tokens.map(|v| v as i64),
676                e.cache_creation_tokens.map(|v| v as i64),
677                e.cache_read_tokens.map(|v| v as i64),
678                e.system_prompt_tokens.map(|v| v as i64),
679            ],
680        )?;
681        if self.conn.changes() == 0 {
682            return Ok(());
683        }
684        self.append_hot_event(e)?;
685        if projector_legacy_mode() {
686            index_event_derived(&self.conn, e)?;
687            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
688            self.invalidate_span_tree_cache();
689        } else if last_before.is_some_and(|last| e.seq <= last) {
690            self.replay_projector_session(&e.session_id)?;
691        } else {
692            let deltas = self.projector.borrow_mut().apply(e);
693            self.apply_projector_events(&deltas)?;
694            let expired = self
695                .projector
696                .borrow_mut()
697                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
698            self.apply_projector_events(&expired)?;
699            if is_stop_event(e) {
700                let flushed = self
701                    .projector
702                    .borrow_mut()
703                    .flush_session(&e.session_id, e.ts_ms);
704                self.apply_projector_events(&flushed)?;
705            }
706            self.invalidate_span_tree_cache();
707        }
708        self.append_search_event(e);
709        let Some(ctx) = ctx else {
710            return Ok(());
711        };
712        let sync = &ctx.sync;
713        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
714            return Ok(());
715        }
716        let Some(salt) = try_team_salt(sync) else {
717            tracing::warn!(
718                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
719            );
720            return Ok(());
721        };
722        if sync.sample_rate < 1.0 {
723            let u: f64 = rand::random();
724            if u > sync.sample_rate {
725                return Ok(());
726            }
727        }
728        let Some(session) = self.get_session(&e.session_id)? else {
729            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
730            return Ok(());
731        };
732        let mut outbound = outbound_event_from_row(e, &session, &salt);
733        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
734        let row = serde_json::to_string(&outbound)?;
735        self.outbox()?.append(&e.session_id, "events", &row)?;
736        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
737        Ok(())
738    }
739
740    fn append_hot_event(&self, e: &Event) -> Result<()> {
741        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
742            return Ok(());
743        }
744        let mut slot = self.hot_log.borrow_mut();
745        if slot.is_none() {
746            *slot = Some(HotLog::open(&self.root)?);
747        }
748        if let Some(log) = slot.as_mut() {
749            log.append(e)?;
750        }
751        Ok(())
752    }
753
754    fn append_search_event(&self, e: &Event) {
755        if let Err(err) = self.try_append_search_event(e) {
756            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
757            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
758        }
759    }
760
761    fn try_append_search_event(&self, e: &Event) -> Result<()> {
762        let Some(session) = self.get_session(&e.session_id)? else {
763            return Ok(());
764        };
765        let workspace = PathBuf::from(&session.workspace);
766        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
767        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
768        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
769            return Ok(());
770        };
771        let mut slot = self.search_writer.borrow_mut();
772        if slot.is_none() {
773            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
774        }
775        slot.as_mut().expect("writer").add(&doc)
776    }
777
778    pub fn flush_search(&self) -> Result<()> {
779        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
780            writer.commit()?;
781        }
782        Ok(())
783    }
784
785    fn outbox(&self) -> Result<Outbox> {
786        Outbox::open(&self.root)
787    }
788
789    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
790        if projector_legacy_mode() {
791            rebuild_tool_spans_for_session(&self.conn, session_id)?;
792            self.invalidate_span_tree_cache();
793            return Ok(());
794        }
795        let deltas = self
796            .projector
797            .borrow_mut()
798            .flush_session(session_id, now_ms);
799        if self.apply_projector_events(&deltas)? {
800            self.invalidate_span_tree_cache();
801        }
802        Ok(())
803    }
804
805    fn replay_projector_session(&self, session_id: &str) -> Result<()> {
806        clear_session_spans(&self.conn, session_id)?;
807        self.projector.borrow_mut().reset_session(session_id);
808        let events = self.list_events_for_session(session_id)?;
809        let mut changed = false;
810        for event in &events {
811            let deltas = self.projector.borrow_mut().apply(event);
812            changed |= self.apply_projector_events(&deltas)?;
813        }
814        if self
815            .get_session(session_id)?
816            .is_some_and(|session| session.status == SessionStatus::Done)
817        {
818            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
819            let deltas = self
820                .projector
821                .borrow_mut()
822                .flush_session(session_id, now_ms);
823            changed |= self.apply_projector_events(&deltas)?;
824        }
825        if changed {
826            self.invalidate_span_tree_cache();
827        }
828        Ok(())
829    }
830
831    fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
832        let mut changed = false;
833        for delta in deltas {
834            match delta {
835                ProjectorEvent::SpanClosed(span, sample) => {
836                    upsert_tool_span_record(&self.conn, span)?;
837                    tracing::debug!(
838                        session_id = %sample.session_id,
839                        span_id = %sample.span_id,
840                        tool = ?sample.tool,
841                        lead_time_ms = ?sample.lead_time_ms,
842                        tokens_in = ?sample.tokens_in,
843                        tokens_out = ?sample.tokens_out,
844                        reasoning_tokens = ?sample.reasoning_tokens,
845                        cost_usd_e6 = ?sample.cost_usd_e6,
846                        paths = ?sample.paths,
847                        "tool span closed"
848                    );
849                    changed = true;
850                }
851                ProjectorEvent::SpanPatched(span) => {
852                    upsert_tool_span_record(&self.conn, span)?;
853                    changed = true;
854                }
855                ProjectorEvent::FileTouched { session, path } => {
856                    self.conn.execute(
857                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
858                        params![session, path],
859                    )?;
860                    changed = true;
861                }
862                ProjectorEvent::SkillUsed { session, skill } => {
863                    self.conn.execute(
864                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
865                        params![session, skill],
866                    )?;
867                    changed = true;
868                }
869                ProjectorEvent::RuleUsed { session, rule } => {
870                    self.conn.execute(
871                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
872                        params![session, rule],
873                    )?;
874                    changed = true;
875                }
876            }
877        }
878        Ok(changed)
879    }
880
881    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
882        let rows = self.outbox()?.list_pending(limit)?;
883        if !rows.is_empty() {
884            return Ok(rows);
885        }
886        let mut stmt = self.conn.prepare(
887            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
888        )?;
889        let rows = stmt.query_map(params![limit as i64], |row| {
890            Ok((
891                row.get::<_, i64>(0)?,
892                row.get::<_, String>(1)?,
893                row.get::<_, String>(2)?,
894            ))
895        })?;
896        let mut out = Vec::new();
897        for r in rows {
898            out.push(r?);
899        }
900        Ok(out)
901    }
902
903    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
904        self.outbox()?.delete_ids(ids)?;
905        for id in ids {
906            self.conn
907                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
908        }
909        Ok(())
910    }
911
912    pub fn replace_outbox_rows(
913        &self,
914        owner_id: &str,
915        kind: &str,
916        payloads: &[String],
917    ) -> Result<()> {
918        self.outbox()?.replace(owner_id, kind, payloads)?;
919        self.conn.execute(
920            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
921            params![owner_id, kind],
922        )?;
923        for payload in payloads {
924            self.conn.execute(
925                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
926                params![owner_id, kind, payload],
927            )?;
928        }
929        Ok(())
930    }
931
932    pub fn outbox_pending_count(&self) -> Result<u64> {
933        let redb = self.outbox()?.pending_count()?;
934        if redb > 0 {
935            return Ok(redb);
936        }
937        let c: i64 =
938            self.conn
939                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
940                    r.get(0)
941                })?;
942        Ok(c as u64)
943    }
944
945    pub fn set_sync_state_ok(&self) -> Result<()> {
946        let now = now_ms().to_string();
947        self.conn.execute(
948            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
949             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
950            params![now],
951        )?;
952        self.conn.execute(
953            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
954             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
955            [],
956        )?;
957        self.conn
958            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
959        Ok(())
960    }
961
962    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
963        let prev: i64 = self
964            .conn
965            .query_row(
966                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
967                [],
968                |r| {
969                    let s: String = r.get(0)?;
970                    Ok(s.parse::<i64>().unwrap_or(0))
971                },
972            )
973            .optional()?
974            .unwrap_or(0);
975        let next = prev.saturating_add(1);
976        self.conn.execute(
977            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
978             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
979            params![msg],
980        )?;
981        self.conn.execute(
982            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
983             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
984            params![next.to_string()],
985        )?;
986        Ok(())
987    }
988
989    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
990        let pending_outbox = self.outbox_pending_count()?;
991        let last_success_ms = self
992            .conn
993            .query_row(
994                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
995                [],
996                |r| r.get::<_, String>(0),
997            )
998            .optional()?
999            .and_then(|s| s.parse().ok());
1000        let last_error = self
1001            .conn
1002            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1003                r.get::<_, String>(0)
1004            })
1005            .optional()?;
1006        let consecutive_failures = self
1007            .conn
1008            .query_row(
1009                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1010                [],
1011                |r| r.get::<_, String>(0),
1012            )
1013            .optional()?
1014            .and_then(|s| s.parse().ok())
1015            .unwrap_or(0);
1016        Ok(SyncStatusSnapshot {
1017            pending_outbox,
1018            last_success_ms,
1019            last_error,
1020            consecutive_failures,
1021        })
1022    }
1023
1024    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1025        let row: Option<String> = self
1026            .conn
1027            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1028                r.get::<_, String>(0)
1029            })
1030            .optional()?;
1031        Ok(row.and_then(|s| s.parse().ok()))
1032    }
1033
1034    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1035        self.conn.execute(
1036            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1037             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1038            params![key, v.to_string()],
1039        )?;
1040        Ok(())
1041    }
1042
1043    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
1044    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1045        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1046        let old_ids = old_session_ids(&tx, cutoff_ms)?;
1047        let sessions_to_remove: i64 = tx.query_row(
1048            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1049            params![cutoff_ms],
1050            |r| r.get(0),
1051        )?;
1052        let events_to_remove: i64 = tx.query_row(
1053            "SELECT COUNT(*) FROM events WHERE session_id IN \
1054             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1055            params![cutoff_ms],
1056            |r| r.get(0),
1057        )?;
1058
1059        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1060        tx.execute(
1061            &format!(
1062                "DELETE FROM tool_span_paths WHERE span_id IN \
1063                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1064            ),
1065            params![cutoff_ms],
1066        )?;
1067        tx.execute(
1068            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1069            params![cutoff_ms],
1070        )?;
1071        tx.execute(
1072            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1073            params![cutoff_ms],
1074        )?;
1075        tx.execute(
1076            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1077            params![cutoff_ms],
1078        )?;
1079        tx.execute(
1080            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1081            params![cutoff_ms],
1082        )?;
1083        tx.execute(
1084            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1085            params![cutoff_ms],
1086        )?;
1087        tx.execute(
1088            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1089            params![cutoff_ms],
1090        )?;
1091        tx.execute(
1092            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1093            params![cutoff_ms],
1094        )?;
1095        tx.execute(
1096            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1097            params![cutoff_ms],
1098        )?;
1099        tx.execute(
1100            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1101            params![cutoff_ms],
1102        )?;
1103        tx.execute(
1104            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1105            params![cutoff_ms],
1106        )?;
1107        tx.execute(
1108            "DELETE FROM sessions WHERE started_at_ms < ?1",
1109            params![cutoff_ms],
1110        )?;
1111        tx.commit()?;
1112        if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1113            let _ = writer.commit();
1114        }
1115        if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1116            tracing::warn!("search prune skipped: {err:#}");
1117            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1118        }
1119        self.invalidate_span_tree_cache();
1120        Ok(PruneStats {
1121            sessions_removed: sessions_to_remove as u64,
1122            events_removed: events_to_remove as u64,
1123        })
1124    }
1125
1126    /// Reclaim file space after large deletes (exclusive lock; can be slow).
1127    pub fn vacuum(&self) -> Result<()> {
1128        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1129        Ok(())
1130    }
1131
1132    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1133        Ok(self
1134            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1135            .rows)
1136    }
1137
1138    pub fn list_sessions_page(
1139        &self,
1140        workspace: &str,
1141        offset: usize,
1142        limit: usize,
1143        filter: SessionFilter,
1144    ) -> Result<SessionPage> {
1145        let (where_sql, args) = session_filter_sql(workspace, &filter);
1146        let total = self.query_session_page_count(&where_sql, &args)?;
1147        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1148        let next = offset.saturating_add(rows.len());
1149        Ok(SessionPage {
1150            rows,
1151            total,
1152            next_offset: (next < total).then_some(next),
1153        })
1154    }
1155
1156    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1157        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1158        let total: i64 = self
1159            .conn
1160            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1161        Ok(total as usize)
1162    }
1163
1164    fn query_session_page_rows(
1165        &self,
1166        where_sql: &str,
1167        args: &[Value],
1168        offset: usize,
1169        limit: usize,
1170    ) -> Result<Vec<SessionRecord>> {
1171        let sql = format!(
1172            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1173        );
1174        let mut values = args.to_vec();
1175        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1176        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1177        let mut stmt = self.conn.prepare(&sql)?;
1178        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1179        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1180    }
1181
1182    pub fn list_sessions_started_after(
1183        &self,
1184        workspace: &str,
1185        after_started_at_ms: u64,
1186    ) -> Result<Vec<SessionRecord>> {
1187        let mut stmt = self.conn.prepare(
1188            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1189                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1190                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1191                    repo_file_count, repo_total_loc
1192             FROM sessions
1193             WHERE workspace = ?1 AND started_at_ms > ?2
1194             ORDER BY started_at_ms DESC, id ASC",
1195        )?;
1196        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1197        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1198    }
1199
1200    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1201        if ids.is_empty() {
1202            return Ok(Vec::new());
1203        }
1204        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1205        let sql =
1206            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1207        let mut stmt = self.conn.prepare(&sql)?;
1208        let params: Vec<&dyn rusqlite::ToSql> =
1209            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1210        let rows = stmt.query_map(params.as_slice(), |r| {
1211            let status: String = r.get(1)?;
1212            Ok(SessionStatusRow {
1213                id: r.get(0)?,
1214                status: status_from_str(&status),
1215                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1216            })
1217        })?;
1218        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1219    }
1220
1221    fn running_session_ids(&self) -> Result<Vec<String>> {
1222        let mut stmt = self
1223            .conn
1224            .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1225        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1226        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1227    }
1228
1229    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1230        let session_count: i64 = self.conn.query_row(
1231            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1232            params![workspace],
1233            |r| r.get(0),
1234        )?;
1235
1236        let total_cost: i64 = self.conn.query_row(
1237            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1238             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1239            params![workspace],
1240            |r| r.get(0),
1241        )?;
1242
1243        let mut stmt = self.conn.prepare(
1244            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1245        )?;
1246        let by_agent: Vec<(String, u64)> = stmt
1247            .query_map(params![workspace], |r| {
1248                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1249            })?
1250            .filter_map(|r| r.ok())
1251            .collect();
1252
1253        let mut stmt = self.conn.prepare(
1254            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1255        )?;
1256        let by_model: Vec<(String, u64)> = stmt
1257            .query_map(params![workspace], |r| {
1258                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1259            })?
1260            .filter_map(|r| r.ok())
1261            .collect();
1262
1263        let mut stmt = self.conn.prepare(
1264            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1265             WHERE s.workspace = ?1 AND tool IS NOT NULL
1266             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1267        )?;
1268        let top_tools: Vec<(String, u64)> = stmt
1269            .query_map(params![workspace], |r| {
1270                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1271            })?
1272            .filter_map(|r| r.ok())
1273            .collect();
1274
1275        Ok(SummaryStats {
1276            session_count: session_count as u64,
1277            total_cost_usd_e6: total_cost,
1278            by_agent,
1279            by_model,
1280            top_tools,
1281        })
1282    }
1283
1284    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1285        self.list_events_page(session_id, 0, i64::MAX as usize)
1286    }
1287
1288    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1289        let mut stmt = self.conn.prepare(
1290            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1291                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1292                    stop_reason, latency_ms, ttft_ms, retry_count,
1293                    context_used_tokens, context_max_tokens,
1294                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1295             FROM events WHERE session_id = ?1 AND seq = ?2",
1296        )?;
1297        stmt.query_row(params![session_id, seq as i64], event_row)
1298            .optional()
1299            .map_err(Into::into)
1300    }
1301
1302    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1303        let mut out = Vec::new();
1304        for session in self.list_sessions(workspace)? {
1305            for event in self.list_events_for_session(&session.id)? {
1306                out.push((session.clone(), event));
1307            }
1308        }
1309        out.sort_by(|a, b| {
1310            (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1311        });
1312        Ok(out)
1313    }
1314
1315    pub fn list_events_page(
1316        &self,
1317        session_id: &str,
1318        after_seq: u64,
1319        limit: usize,
1320    ) -> Result<Vec<Event>> {
1321        let mut stmt = self.conn.prepare(
1322            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1323                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1324                    stop_reason, latency_ms, ttft_ms, retry_count,
1325                    context_used_tokens, context_max_tokens,
1326                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1327             FROM events
1328             WHERE session_id = ?1 AND seq >= ?2
1329             ORDER BY seq ASC LIMIT ?3",
1330        )?;
1331        let rows = stmt.query_map(
1332            params![
1333                session_id,
1334                after_seq as i64,
1335                limit.min(i64::MAX as usize) as i64
1336            ],
1337            event_row,
1338        )?;
1339        let mut events = Vec::new();
1340        for row in rows {
1341            events.push(row?);
1342        }
1343        Ok(events)
1344    }
1345
1346    /// Update only status for existing session.
1347    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1348        self.conn.execute(
1349            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1350            params![format!("{:?}", status), id],
1351        )?;
1352        Ok(())
1353    }
1354
1355    /// Workspace activity dashboard — feeds `cmd_insights`.
1356    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1357        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1358        Ok(InsightsStats {
1359            total_sessions: count_q(
1360                &self.conn,
1361                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1362                workspace,
1363            )?,
1364            running_sessions: count_q(
1365                &self.conn,
1366                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1367                workspace,
1368            )?,
1369            total_events: count_q(
1370                &self.conn,
1371                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1372                workspace,
1373            )?,
1374            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1375            recent: recent_sessions_3(&self.conn, workspace)?,
1376            top_tools: top_tools_5(&self.conn, workspace)?,
1377            total_cost_usd_e6,
1378            sessions_with_cost,
1379        })
1380    }
1381
1382    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1383    pub fn retro_events_in_window(
1384        &self,
1385        workspace: &str,
1386        start_ms: u64,
1387        end_ms: u64,
1388    ) -> Result<Vec<(SessionRecord, Event)>> {
1389        let mut stmt = self.conn.prepare(
1390            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1391                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1392                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1393                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1394                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1395                    s.repo_file_count, s.repo_total_loc,
1396                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1397                    e.context_used_tokens, e.context_max_tokens,
1398                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1399             FROM events e
1400             JOIN sessions s ON s.id = e.session_id
1401             WHERE s.workspace = ?1
1402               AND (
1403                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1404                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1405               )
1406             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1407        )?;
1408        let rows = stmt.query_map(
1409            params![
1410                workspace,
1411                start_ms as i64,
1412                end_ms as i64,
1413                SYNTHETIC_TS_CEILING_MS,
1414            ],
1415            |row| {
1416                let payload_str: String = row.get(12)?;
1417                let status_str: String = row.get(19)?;
1418                Ok((
1419                    SessionRecord {
1420                        id: row.get(13)?,
1421                        agent: row.get(14)?,
1422                        model: row.get(15)?,
1423                        workspace: row.get(16)?,
1424                        started_at_ms: row.get::<_, i64>(17)? as u64,
1425                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1426                        status: status_from_str(&status_str),
1427                        trace_path: row.get(20)?,
1428                        start_commit: row.get(21)?,
1429                        end_commit: row.get(22)?,
1430                        branch: row.get(23)?,
1431                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1432                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1433                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1434                        prompt_fingerprint: row.get(27)?,
1435                        parent_session_id: row.get(28)?,
1436                        agent_version: row.get(29)?,
1437                        os: row.get(30)?,
1438                        arch: row.get(31)?,
1439                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1440                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1441                    },
1442                    Event {
1443                        session_id: row.get(0)?,
1444                        seq: row.get::<_, i64>(1)? as u64,
1445                        ts_ms: row.get::<_, i64>(2)? as u64,
1446                        ts_exact: row.get::<_, i64>(3)? != 0,
1447                        kind: kind_from_str(&row.get::<_, String>(4)?),
1448                        source: source_from_str(&row.get::<_, String>(5)?),
1449                        tool: row.get(6)?,
1450                        tool_call_id: row.get(7)?,
1451                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1452                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1453                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1454                        cost_usd_e6: row.get(11)?,
1455                        payload: serde_json::from_str(&payload_str)
1456                            .unwrap_or(serde_json::Value::Null),
1457                        stop_reason: row.get(34)?,
1458                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1459                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1460                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1461                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1462                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1463                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1464                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1465                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1466                    },
1467                ))
1468            },
1469        )?;
1470
1471        let mut out = Vec::new();
1472        for r in rows {
1473            out.push(r?);
1474        }
1475        Ok(out)
1476    }
1477
1478    pub fn experiment_metric_values_in_window(
1479        &self,
1480        workspace: &str,
1481        start_ms: u64,
1482        end_ms: u64,
1483        metric: crate::experiment::types::Metric,
1484    ) -> Result<Vec<(SessionRecord, f64)>> {
1485        use crate::experiment::types::Metric;
1486        let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1487            s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1488            s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1489            s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1490        let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1491            OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1492        let sql = match metric {
1493            Metric::TokensPerSession => format!(
1494                "SELECT {session_cols},
1495                    SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1496                 FROM sessions s JOIN events e ON e.session_id = s.id
1497                 WHERE {window}
1498                 GROUP BY s.id"
1499            ),
1500            Metric::CostPerSession => format!(
1501                "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1502                 FROM sessions s JOIN events e ON e.session_id = s.id
1503                 WHERE {window}
1504                 GROUP BY s.id"
1505            ),
1506            Metric::SuccessRate => format!(
1507                "SELECT {session_cols},
1508                    CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1509                 FROM sessions s JOIN events e ON e.session_id = s.id
1510                 WHERE {window}
1511                 GROUP BY s.id"
1512            ),
1513            Metric::DurationMinutes => format!(
1514                "SELECT {session_cols},
1515                    (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1516                 FROM sessions s
1517                 WHERE s.workspace = ?1
1518                   AND s.ended_at_ms IS NOT NULL
1519                   AND EXISTS (
1520                     SELECT 1 FROM events e
1521                     WHERE e.session_id = s.id
1522                       AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1523                         OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1524                   )"
1525            ),
1526            Metric::FilesPerSession => format!(
1527                "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1528                 FROM sessions s
1529                 JOIN events e ON e.session_id = s.id
1530                 LEFT JOIN files_touched ft ON ft.session_id = s.id
1531                 WHERE {window}
1532                 GROUP BY s.id"
1533            ),
1534            Metric::SuccessRateByPrompt => format!(
1535                "SELECT {session_cols},
1536                    1.0 - (MIN(
1537                      SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1538                      SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1539                    ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1540                 FROM sessions s JOIN events e ON e.session_id = s.id
1541                 WHERE {window}
1542                 GROUP BY s.id
1543                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1544            ),
1545            Metric::CostByPrompt => format!(
1546                "SELECT {session_cols},
1547                    SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1548                    SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1549                 FROM sessions s JOIN events e ON e.session_id = s.id
1550                 WHERE {window}
1551                 GROUP BY s.id
1552                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1553            ),
1554            Metric::ToolLoops => format!(
1555                "WITH calls AS (
1556                   SELECT e.session_id, e.tool,
1557                     LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1558                   FROM events e JOIN sessions s ON s.id = e.session_id
1559                   WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1560                 )
1561                 SELECT {session_cols},
1562                    SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1563                 FROM sessions s JOIN calls ON calls.session_id = s.id
1564                 GROUP BY s.id"
1565            ),
1566        };
1567        let mut stmt = self.conn.prepare(&sql)?;
1568        let rows = stmt.query_map(
1569            params![
1570                workspace,
1571                start_ms as i64,
1572                end_ms as i64,
1573                SYNTHETIC_TS_CEILING_MS,
1574            ],
1575            |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1576        )?;
1577        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1578    }
1579
1580    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1581    pub fn files_touched_in_window(
1582        &self,
1583        workspace: &str,
1584        start_ms: u64,
1585        end_ms: u64,
1586    ) -> Result<Vec<(String, String)>> {
1587        let mut stmt = self.conn.prepare(
1588            "SELECT DISTINCT ft.session_id, ft.path
1589             FROM files_touched ft
1590             JOIN sessions s ON s.id = ft.session_id
1591             WHERE s.workspace = ?1
1592               AND EXISTS (
1593                 SELECT 1 FROM events e
1594                 JOIN sessions ss ON ss.id = e.session_id
1595                 WHERE e.session_id = ft.session_id
1596                   AND (
1597                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1598                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1599                   )
1600               )
1601             ORDER BY ft.session_id, ft.path",
1602        )?;
1603        let out: Vec<(String, String)> = stmt
1604            .query_map(
1605                params![
1606                    workspace,
1607                    start_ms as i64,
1608                    end_ms as i64,
1609                    SYNTHETIC_TS_CEILING_MS,
1610                ],
1611                |r| Ok((r.get(0)?, r.get(1)?)),
1612            )?
1613            .filter_map(|r| r.ok())
1614            .collect();
1615        Ok(out)
1616    }
1617
1618    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1619    /// (any session with an indexed skill row; join events optional — use row existence).
1620    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1621        let mut stmt = self.conn.prepare(
1622            "SELECT DISTINCT su.skill
1623             FROM skills_used su
1624             JOIN sessions s ON s.id = su.session_id
1625             WHERE s.workspace = ?1
1626               AND EXISTS (
1627                 SELECT 1 FROM events e
1628                 JOIN sessions ss ON ss.id = e.session_id
1629                 WHERE e.session_id = su.session_id
1630                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1631               )
1632             ORDER BY su.skill",
1633        )?;
1634        let out: Vec<String> = stmt
1635            .query_map(
1636                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1637                |r| r.get::<_, String>(0),
1638            )?
1639            .filter_map(|r| r.ok())
1640            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1641            .collect();
1642        Ok(out)
1643    }
1644
1645    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1646    pub fn skills_used_in_window(
1647        &self,
1648        workspace: &str,
1649        start_ms: u64,
1650        end_ms: u64,
1651    ) -> Result<Vec<(String, String)>> {
1652        let mut stmt = self.conn.prepare(
1653            "SELECT DISTINCT su.session_id, su.skill
1654             FROM skills_used su
1655             JOIN sessions s ON s.id = su.session_id
1656             WHERE s.workspace = ?1
1657               AND EXISTS (
1658                 SELECT 1 FROM events e
1659                 JOIN sessions ss ON ss.id = e.session_id
1660                 WHERE e.session_id = su.session_id
1661                   AND (
1662                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1663                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1664                   )
1665               )
1666             ORDER BY su.session_id, su.skill",
1667        )?;
1668        let out: Vec<(String, String)> = stmt
1669            .query_map(
1670                params![
1671                    workspace,
1672                    start_ms as i64,
1673                    end_ms as i64,
1674                    SYNTHETIC_TS_CEILING_MS,
1675                ],
1676                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1677            )?
1678            .filter_map(|r| r.ok())
1679            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1680            .collect();
1681        Ok(out)
1682    }
1683
1684    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1685    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1686        let mut stmt = self.conn.prepare(
1687            "SELECT DISTINCT ru.rule
1688             FROM rules_used ru
1689             JOIN sessions s ON s.id = ru.session_id
1690             WHERE s.workspace = ?1
1691               AND EXISTS (
1692                 SELECT 1 FROM events e
1693                 JOIN sessions ss ON ss.id = e.session_id
1694                 WHERE e.session_id = ru.session_id
1695                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1696               )
1697             ORDER BY ru.rule",
1698        )?;
1699        let out: Vec<String> = stmt
1700            .query_map(
1701                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1702                |r| r.get::<_, String>(0),
1703            )?
1704            .filter_map(|r| r.ok())
1705            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1706            .collect();
1707        Ok(out)
1708    }
1709
1710    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1711    pub fn rules_used_in_window(
1712        &self,
1713        workspace: &str,
1714        start_ms: u64,
1715        end_ms: u64,
1716    ) -> Result<Vec<(String, String)>> {
1717        let mut stmt = self.conn.prepare(
1718            "SELECT DISTINCT ru.session_id, ru.rule
1719             FROM rules_used ru
1720             JOIN sessions s ON s.id = ru.session_id
1721             WHERE s.workspace = ?1
1722               AND EXISTS (
1723                 SELECT 1 FROM events e
1724                 JOIN sessions ss ON ss.id = e.session_id
1725                 WHERE e.session_id = ru.session_id
1726                   AND (
1727                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1728                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1729                   )
1730               )
1731             ORDER BY ru.session_id, ru.rule",
1732        )?;
1733        let out: Vec<(String, String)> = stmt
1734            .query_map(
1735                params![
1736                    workspace,
1737                    start_ms as i64,
1738                    end_ms as i64,
1739                    SYNTHETIC_TS_CEILING_MS,
1740                ],
1741                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1742            )?
1743            .filter_map(|r| r.ok())
1744            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1745            .collect();
1746        Ok(out)
1747    }
1748
1749    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1750    pub fn sessions_active_in_window(
1751        &self,
1752        workspace: &str,
1753        start_ms: u64,
1754        end_ms: u64,
1755    ) -> Result<HashSet<String>> {
1756        let mut stmt = self.conn.prepare(
1757            "SELECT DISTINCT s.id
1758             FROM sessions s
1759             WHERE s.workspace = ?1
1760               AND EXISTS (
1761                 SELECT 1 FROM events e
1762                 WHERE e.session_id = s.id
1763                   AND (
1764                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1765                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1766                   )
1767               )",
1768        )?;
1769        let out: HashSet<String> = stmt
1770            .query_map(
1771                params![
1772                    workspace,
1773                    start_ms as i64,
1774                    end_ms as i64,
1775                    SYNTHETIC_TS_CEILING_MS,
1776                ],
1777                |r| r.get(0),
1778            )?
1779            .filter_map(|r| r.ok())
1780            .collect();
1781        Ok(out)
1782    }
1783
1784    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1785    pub fn session_costs_usd_e6_in_window(
1786        &self,
1787        workspace: &str,
1788        start_ms: u64,
1789        end_ms: u64,
1790    ) -> Result<HashMap<String, i64>> {
1791        let mut stmt = self.conn.prepare(
1792            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1793             FROM events e
1794             JOIN sessions s ON s.id = e.session_id
1795             WHERE s.workspace = ?1
1796               AND (
1797                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1798                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1799               )
1800             GROUP BY e.session_id",
1801        )?;
1802        let rows: Vec<(String, i64)> = stmt
1803            .query_map(
1804                params![
1805                    workspace,
1806                    start_ms as i64,
1807                    end_ms as i64,
1808                    SYNTHETIC_TS_CEILING_MS,
1809                ],
1810                |r| Ok((r.get(0)?, r.get(1)?)),
1811            )?
1812            .filter_map(|r| r.ok())
1813            .collect();
1814        Ok(rows.into_iter().collect())
1815    }
1816
1817    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1818    pub fn guidance_report(
1819        &self,
1820        workspace: &str,
1821        window_start_ms: u64,
1822        window_end_ms: u64,
1823        skill_slugs_on_disk: &HashSet<String>,
1824        rule_slugs_on_disk: &HashSet<String>,
1825    ) -> Result<GuidanceReport> {
1826        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1827        let denom = active.len() as u64;
1828        let costs =
1829            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1830
1831        let workspace_avg_cost_per_session_usd = if denom > 0 {
1832            let total_e6: i64 = active
1833                .iter()
1834                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1835                .sum();
1836            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1837        } else {
1838            None
1839        };
1840
1841        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1842        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1843            skill_sessions.entry(skill).or_default().insert(sid);
1844        }
1845        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1846        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1847            rule_sessions.entry(rule).or_default().insert(sid);
1848        }
1849
1850        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1851
1852        let mut push_row =
1853            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1854                let sessions = sids.len() as u64;
1855                let sessions_pct = if denom > 0 {
1856                    sessions as f64 * 100.0 / denom as f64
1857                } else {
1858                    0.0
1859                };
1860                let total_cost_usd_e6: i64 = sids
1861                    .iter()
1862                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1863                    .sum();
1864                let avg_cost_per_session_usd = if sessions > 0 {
1865                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1866                } else {
1867                    None
1868                };
1869                let vs_workspace_avg_cost_per_session_usd =
1870                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1871                        (Some(avg), Some(w)) => Some(avg - w),
1872                        _ => None,
1873                    };
1874                rows.push(GuidancePerfRow {
1875                    kind,
1876                    id,
1877                    sessions,
1878                    sessions_pct,
1879                    total_cost_usd_e6,
1880                    avg_cost_per_session_usd,
1881                    vs_workspace_avg_cost_per_session_usd,
1882                    on_disk,
1883                });
1884            };
1885
1886        let mut seen_skills: HashSet<String> = HashSet::new();
1887        for (id, sids) in &skill_sessions {
1888            seen_skills.insert(id.clone());
1889            push_row(
1890                GuidanceKind::Skill,
1891                id.clone(),
1892                sids,
1893                skill_slugs_on_disk.contains(id),
1894            );
1895        }
1896        for slug in skill_slugs_on_disk {
1897            if seen_skills.contains(slug) {
1898                continue;
1899            }
1900            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1901        }
1902
1903        let mut seen_rules: HashSet<String> = HashSet::new();
1904        for (id, sids) in &rule_sessions {
1905            seen_rules.insert(id.clone());
1906            push_row(
1907                GuidanceKind::Rule,
1908                id.clone(),
1909                sids,
1910                rule_slugs_on_disk.contains(id),
1911            );
1912        }
1913        for slug in rule_slugs_on_disk {
1914            if seen_rules.contains(slug) {
1915                continue;
1916            }
1917            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1918        }
1919
1920        rows.sort_by(|a, b| {
1921            b.sessions
1922                .cmp(&a.sessions)
1923                .then_with(|| a.kind.cmp(&b.kind))
1924                .then_with(|| a.id.cmp(&b.id))
1925        });
1926
1927        Ok(GuidanceReport {
1928            workspace: workspace.to_string(),
1929            window_start_ms,
1930            window_end_ms,
1931            sessions_in_window: denom,
1932            workspace_avg_cost_per_session_usd,
1933            rows,
1934        })
1935    }
1936
1937    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1938        let mut stmt = self.conn.prepare(
1939            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1940                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1941                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1942                    repo_file_count, repo_total_loc
1943             FROM sessions WHERE id = ?1",
1944        )?;
1945        let mut rows = stmt.query_map(params![id], |row| {
1946            Ok((
1947                row.get::<_, String>(0)?,
1948                row.get::<_, String>(1)?,
1949                row.get::<_, Option<String>>(2)?,
1950                row.get::<_, String>(3)?,
1951                row.get::<_, i64>(4)?,
1952                row.get::<_, Option<i64>>(5)?,
1953                row.get::<_, String>(6)?,
1954                row.get::<_, String>(7)?,
1955                row.get::<_, Option<String>>(8)?,
1956                row.get::<_, Option<String>>(9)?,
1957                row.get::<_, Option<String>>(10)?,
1958                row.get::<_, Option<i64>>(11)?,
1959                row.get::<_, Option<i64>>(12)?,
1960                row.get::<_, String>(13)?,
1961                row.get::<_, Option<String>>(14)?,
1962                row.get::<_, Option<String>>(15)?,
1963                row.get::<_, Option<String>>(16)?,
1964                row.get::<_, Option<String>>(17)?,
1965                row.get::<_, Option<String>>(18)?,
1966                row.get::<_, Option<i64>>(19)?,
1967                row.get::<_, Option<i64>>(20)?,
1968            ))
1969        })?;
1970
1971        if let Some(row) = rows.next() {
1972            let (
1973                id,
1974                agent,
1975                model,
1976                workspace,
1977                started,
1978                ended,
1979                status_str,
1980                trace,
1981                start_commit,
1982                end_commit,
1983                branch,
1984                dirty_start,
1985                dirty_end,
1986                source,
1987                prompt_fingerprint,
1988                parent_session_id,
1989                agent_version,
1990                os,
1991                arch,
1992                repo_file_count,
1993                repo_total_loc,
1994            ) = row?;
1995            Ok(Some(SessionRecord {
1996                id,
1997                agent,
1998                model,
1999                workspace,
2000                started_at_ms: started as u64,
2001                ended_at_ms: ended.map(|v| v as u64),
2002                status: status_from_str(&status_str),
2003                trace_path: trace,
2004                start_commit,
2005                end_commit,
2006                branch,
2007                dirty_start: dirty_start.map(i64_to_bool),
2008                dirty_end: dirty_end.map(i64_to_bool),
2009                repo_binding_source: empty_to_none(source),
2010                prompt_fingerprint,
2011                parent_session_id,
2012                agent_version,
2013                os,
2014                arch,
2015                repo_file_count: repo_file_count.map(|v| v as u32),
2016                repo_total_loc: repo_total_loc.map(|v| v as u64),
2017            }))
2018        } else {
2019            Ok(None)
2020        }
2021    }
2022
2023    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2024        let mut stmt = self.conn.prepare(
2025            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2026                    indexed_at_ms, dirty, graph_path
2027             FROM repo_snapshots WHERE workspace = ?1
2028             ORDER BY indexed_at_ms DESC LIMIT 1",
2029        )?;
2030        let mut rows = stmt.query_map(params![workspace], |row| {
2031            Ok(RepoSnapshotRecord {
2032                id: row.get(0)?,
2033                workspace: row.get(1)?,
2034                head_commit: row.get(2)?,
2035                dirty_fingerprint: row.get(3)?,
2036                analyzer_version: row.get(4)?,
2037                indexed_at_ms: row.get::<_, i64>(5)? as u64,
2038                dirty: row.get::<_, i64>(6)? != 0,
2039                graph_path: row.get(7)?,
2040            })
2041        })?;
2042        Ok(rows.next().transpose()?)
2043    }
2044
2045    pub fn save_repo_snapshot(
2046        &self,
2047        snapshot: &RepoSnapshotRecord,
2048        facts: &[FileFact],
2049        edges: &[RepoEdge],
2050    ) -> Result<()> {
2051        self.conn.execute(
2052            "INSERT INTO repo_snapshots (
2053                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2054                indexed_at_ms, dirty, graph_path
2055             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2056             ON CONFLICT(id) DO UPDATE SET
2057                workspace=excluded.workspace,
2058                head_commit=excluded.head_commit,
2059                dirty_fingerprint=excluded.dirty_fingerprint,
2060                analyzer_version=excluded.analyzer_version,
2061                indexed_at_ms=excluded.indexed_at_ms,
2062                dirty=excluded.dirty,
2063                graph_path=excluded.graph_path",
2064            params![
2065                snapshot.id,
2066                snapshot.workspace,
2067                snapshot.head_commit,
2068                snapshot.dirty_fingerprint,
2069                snapshot.analyzer_version,
2070                snapshot.indexed_at_ms as i64,
2071                bool_to_i64(snapshot.dirty),
2072                snapshot.graph_path,
2073            ],
2074        )?;
2075        self.conn.execute(
2076            "DELETE FROM file_facts WHERE snapshot_id = ?1",
2077            params![snapshot.id],
2078        )?;
2079        self.conn.execute(
2080            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2081            params![snapshot.id],
2082        )?;
2083        for fact in facts {
2084            self.conn.execute(
2085                "INSERT INTO file_facts (
2086                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2087                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2088                    churn_30d, churn_90d, authors_90d, last_changed_ms
2089                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2090                params![
2091                    fact.snapshot_id,
2092                    fact.path,
2093                    fact.language,
2094                    fact.bytes as i64,
2095                    fact.loc as i64,
2096                    fact.sloc as i64,
2097                    fact.complexity_total as i64,
2098                    fact.max_fn_complexity as i64,
2099                    fact.symbol_count as i64,
2100                    fact.import_count as i64,
2101                    fact.fan_in as i64,
2102                    fact.fan_out as i64,
2103                    fact.churn_30d as i64,
2104                    fact.churn_90d as i64,
2105                    fact.authors_90d as i64,
2106                    fact.last_changed_ms.map(|v| v as i64),
2107                ],
2108            )?;
2109        }
2110        for edge in edges {
2111            self.conn.execute(
2112                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2113                 VALUES (?1, ?2, ?3, ?4, ?5)
2114                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2115                 DO UPDATE SET weight = weight + excluded.weight",
2116                params![
2117                    snapshot.id,
2118                    edge.from_path,
2119                    edge.to_path,
2120                    edge.kind,
2121                    edge.weight as i64,
2122                ],
2123            )?;
2124        }
2125        Ok(())
2126    }
2127
2128    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2129        let mut stmt = self.conn.prepare(
2130            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2131                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2132                    churn_30d, churn_90d, authors_90d, last_changed_ms
2133             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2134        )?;
2135        let rows = stmt.query_map(params![snapshot_id], |row| {
2136            Ok(FileFact {
2137                snapshot_id: row.get(0)?,
2138                path: row.get(1)?,
2139                language: row.get(2)?,
2140                bytes: row.get::<_, i64>(3)? as u64,
2141                loc: row.get::<_, i64>(4)? as u32,
2142                sloc: row.get::<_, i64>(5)? as u32,
2143                complexity_total: row.get::<_, i64>(6)? as u32,
2144                max_fn_complexity: row.get::<_, i64>(7)? as u32,
2145                symbol_count: row.get::<_, i64>(8)? as u32,
2146                import_count: row.get::<_, i64>(9)? as u32,
2147                fan_in: row.get::<_, i64>(10)? as u32,
2148                fan_out: row.get::<_, i64>(11)? as u32,
2149                churn_30d: row.get::<_, i64>(12)? as u32,
2150                churn_90d: row.get::<_, i64>(13)? as u32,
2151                authors_90d: row.get::<_, i64>(14)? as u32,
2152                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2153            })
2154        })?;
2155        Ok(rows.filter_map(|row| row.ok()).collect())
2156    }
2157
2158    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2159        let mut stmt = self.conn.prepare(
2160            "SELECT from_id, to_id, kind, weight
2161             FROM repo_edges WHERE snapshot_id = ?1
2162             ORDER BY kind, from_id, to_id",
2163        )?;
2164        let rows = stmt.query_map(params![snapshot_id], |row| {
2165            Ok(RepoEdge {
2166                from_path: row.get(0)?,
2167                to_path: row.get(1)?,
2168                kind: row.get(2)?,
2169                weight: row.get::<_, i64>(3)? as u32,
2170            })
2171        })?;
2172        Ok(rows.filter_map(|row| row.ok()).collect())
2173    }
2174
2175    pub fn tool_spans_in_window(
2176        &self,
2177        workspace: &str,
2178        start_ms: u64,
2179        end_ms: u64,
2180    ) -> Result<Vec<ToolSpanView>> {
2181        let mut stmt = self.conn.prepare(
2182            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2183                    reasoning_tokens, cost_usd_e6, paths_json,
2184                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2185             FROM (
2186                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2187                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2188                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2189                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2190                        ts.started_at_ms AS sort_ms
2191                 FROM tool_spans ts
2192                 JOIN sessions s ON s.id = ts.session_id
2193                 WHERE s.workspace = ?1
2194                   AND ts.started_at_ms >= ?2
2195                   AND ts.started_at_ms <= ?3
2196                 UNION ALL
2197                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2198                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2199                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2200                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2201                        ts.ended_at_ms AS sort_ms
2202                 FROM tool_spans ts
2203                 JOIN sessions s ON s.id = ts.session_id
2204                 WHERE s.workspace = ?1
2205                   AND ts.started_at_ms IS NULL
2206                   AND ts.ended_at_ms >= ?2
2207                   AND ts.ended_at_ms <= ?3
2208             )
2209             ORDER BY sort_ms DESC",
2210        )?;
2211        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2212            let paths_json: String = row.get(8)?;
2213            Ok(ToolSpanView {
2214                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2215                tool: row
2216                    .get::<_, Option<String>>(1)?
2217                    .unwrap_or_else(|| "unknown".into()),
2218                status: row.get(2)?,
2219                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2220                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2221                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2222                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2223                cost_usd_e6: row.get(7)?,
2224                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2225                parent_span_id: row.get(9)?,
2226                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2227                subtree_cost_usd_e6: row.get(11)?,
2228                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2229            })
2230        })?;
2231        Ok(rows.filter_map(|row| row.ok()).collect())
2232    }
2233
2234    pub fn session_span_tree(
2235        &self,
2236        session_id: &str,
2237    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2238        let last_event_seq = self.last_event_seq_for_session(session_id)?;
2239        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2240            && entry.session_id == session_id
2241            && entry.last_event_seq == last_event_seq
2242        {
2243            return Ok(entry.nodes.clone());
2244        }
2245        let mut stmt = self.conn.prepare(
2246            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2247                    reasoning_tokens, cost_usd_e6, paths_json,
2248                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2249             FROM tool_spans
2250             WHERE session_id = ?1
2251             ORDER BY depth ASC, started_at_ms ASC",
2252        )?;
2253        let rows = stmt.query_map(params![session_id], |row| {
2254            let paths_json: String = row.get(8)?;
2255            Ok(crate::metrics::types::ToolSpanView {
2256                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2257                tool: row
2258                    .get::<_, Option<String>>(1)?
2259                    .unwrap_or_else(|| "unknown".into()),
2260                status: row.get(2)?,
2261                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2262                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2263                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2264                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2265                cost_usd_e6: row.get(7)?,
2266                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2267                parent_span_id: row.get(9)?,
2268                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2269                subtree_cost_usd_e6: row.get(11)?,
2270                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2271            })
2272        })?;
2273        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2274        let nodes = crate::store::span_tree::build_tree(spans);
2275        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2276            session_id: session_id.to_string(),
2277            last_event_seq,
2278            nodes: nodes.clone(),
2279        });
2280        Ok(nodes)
2281    }
2282
2283    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2284        let seq = self
2285            .conn
2286            .query_row(
2287                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2288                params![session_id],
2289                |r| r.get::<_, Option<i64>>(0),
2290            )?
2291            .map(|v| v as u64);
2292        Ok(seq)
2293    }
2294
2295    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2296        let mut stmt = self.conn.prepare(
2297            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2298                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2299             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2300        )?;
2301        let rows = stmt.query_map(params![session_id], |row| {
2302            let paths_json: String = row.get(12)?;
2303            Ok(ToolSpanSyncRow {
2304                span_id: row.get(0)?,
2305                session_id: row.get(1)?,
2306                tool: row.get(2)?,
2307                tool_call_id: row.get(3)?,
2308                status: row.get(4)?,
2309                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2310                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2311                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2312                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2313                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2314                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2315                cost_usd_e6: row.get(11)?,
2316                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2317            })
2318        })?;
2319        Ok(rows.filter_map(|row| row.ok()).collect())
2320    }
2321
2322    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2323        self.conn.execute(
2324            "INSERT OR REPLACE INTO session_evals
2325             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2326             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2327            rusqlite::params![
2328                eval.id,
2329                eval.session_id,
2330                eval.judge_model,
2331                eval.rubric_id,
2332                eval.score,
2333                eval.rationale,
2334                eval.flagged as i64,
2335                eval.created_at_ms as i64,
2336            ],
2337        )?;
2338        Ok(())
2339    }
2340
2341    pub fn list_evals_in_window(
2342        &self,
2343        start_ms: u64,
2344        end_ms: u64,
2345    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2346        let mut stmt = self.conn.prepare(
2347            "SELECT id, session_id, judge_model, rubric_id, score,
2348                    rationale, flagged, created_at_ms
2349             FROM session_evals
2350             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2351             ORDER BY created_at_ms ASC",
2352        )?;
2353        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2354            Ok(crate::eval::types::EvalRow {
2355                id: r.get(0)?,
2356                session_id: r.get(1)?,
2357                judge_model: r.get(2)?,
2358                rubric_id: r.get(3)?,
2359                score: r.get(4)?,
2360                rationale: r.get(5)?,
2361                flagged: r.get::<_, i64>(6)? != 0,
2362                created_at_ms: r.get::<_, i64>(7)? as u64,
2363            })
2364        })?;
2365        rows.collect()
2366    }
2367
2368    pub fn list_evals_for_session(
2369        &self,
2370        session_id: &str,
2371    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2372        let mut stmt = self.conn.prepare(
2373            "SELECT id, session_id, judge_model, rubric_id, score,
2374                    rationale, flagged, created_at_ms
2375             FROM session_evals
2376             WHERE session_id = ?1
2377             ORDER BY created_at_ms DESC",
2378        )?;
2379        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2380            Ok(crate::eval::types::EvalRow {
2381                id: r.get(0)?,
2382                session_id: r.get(1)?,
2383                judge_model: r.get(2)?,
2384                rubric_id: r.get(3)?,
2385                score: r.get(4)?,
2386                rationale: r.get(5)?,
2387                flagged: r.get::<_, i64>(6)? != 0,
2388                created_at_ms: r.get::<_, i64>(7)? as u64,
2389            })
2390        })?;
2391        rows.collect()
2392    }
2393
2394    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2395        use crate::feedback::types::FeedbackLabel;
2396        self.conn.execute(
2397            "INSERT OR REPLACE INTO session_feedback
2398             (id, session_id, score, label, note, created_at_ms)
2399             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2400            rusqlite::params![
2401                r.id,
2402                r.session_id,
2403                r.score.as_ref().map(|s| s.0 as i64),
2404                r.label.as_ref().map(FeedbackLabel::to_db_str),
2405                r.note,
2406                r.created_at_ms as i64,
2407            ],
2408        )?;
2409        let payload = serde_json::to_string(r).unwrap_or_default();
2410        self.conn.execute(
2411            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2412             VALUES (?1, 'session_feedback', ?2, 0)",
2413            rusqlite::params![r.session_id, payload],
2414        )?;
2415        Ok(())
2416    }
2417
2418    pub fn list_feedback_in_window(
2419        &self,
2420        start_ms: u64,
2421        end_ms: u64,
2422    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2423        let mut stmt = self.conn.prepare(
2424            "SELECT id, session_id, score, label, note, created_at_ms
2425             FROM session_feedback
2426             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2427             ORDER BY created_at_ms ASC",
2428        )?;
2429        let rows = stmt.query_map(
2430            rusqlite::params![start_ms as i64, end_ms as i64],
2431            feedback_row,
2432        )?;
2433        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2434    }
2435
2436    pub fn feedback_for_sessions(
2437        &self,
2438        ids: &[String],
2439    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2440        if ids.is_empty() {
2441            return Ok(std::collections::HashMap::new());
2442        }
2443        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2444        let sql = format!(
2445            "SELECT id, session_id, score, label, note, created_at_ms
2446             FROM session_feedback WHERE session_id IN ({placeholders})
2447             ORDER BY created_at_ms DESC"
2448        );
2449        let mut stmt = self.conn.prepare(&sql)?;
2450        let params: Vec<&dyn rusqlite::ToSql> =
2451            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2452        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2453        let mut map = std::collections::HashMap::new();
2454        for row in rows {
2455            let r = row?;
2456            map.entry(r.session_id.clone()).or_insert(r);
2457        }
2458        Ok(map)
2459    }
2460
2461    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2462        self.conn.execute(
2463            "INSERT INTO session_outcomes (
2464                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2465                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2466            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2467            ON CONFLICT(session_id) DO UPDATE SET
2468                test_passed=excluded.test_passed,
2469                test_failed=excluded.test_failed,
2470                test_skipped=excluded.test_skipped,
2471                build_ok=excluded.build_ok,
2472                lint_errors=excluded.lint_errors,
2473                revert_lines_14d=excluded.revert_lines_14d,
2474                pr_open=excluded.pr_open,
2475                ci_ok=excluded.ci_ok,
2476                measured_at_ms=excluded.measured_at_ms,
2477                measure_error=excluded.measure_error",
2478            params![
2479                row.session_id,
2480                row.test_passed,
2481                row.test_failed,
2482                row.test_skipped,
2483                row.build_ok.map(bool_to_i64),
2484                row.lint_errors,
2485                row.revert_lines_14d,
2486                row.pr_open,
2487                row.ci_ok.map(bool_to_i64),
2488                row.measured_at_ms as i64,
2489                row.measure_error.as_deref(),
2490            ],
2491        )?;
2492        Ok(())
2493    }
2494
2495    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2496        let mut stmt = self.conn.prepare(
2497            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2498                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2499             FROM session_outcomes WHERE session_id = ?1",
2500        )?;
2501        let row = stmt
2502            .query_row(params![session_id], outcome_row)
2503            .optional()?;
2504        Ok(row)
2505    }
2506
2507    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2508    pub fn list_session_outcomes_in_window(
2509        &self,
2510        workspace: &str,
2511        start_ms: u64,
2512        end_ms: u64,
2513    ) -> Result<Vec<SessionOutcomeRow>> {
2514        let mut stmt = self.conn.prepare(
2515            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2516                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2517             FROM session_outcomes o
2518             JOIN sessions s ON s.id = o.session_id
2519             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2520             ORDER BY o.measured_at_ms ASC",
2521        )?;
2522        let rows = stmt.query_map(
2523            params![workspace, start_ms as i64, end_ms as i64],
2524            outcome_row,
2525        )?;
2526        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2527    }
2528
2529    pub fn append_session_sample(
2530        &self,
2531        session_id: &str,
2532        ts_ms: u64,
2533        pid: u32,
2534        cpu_percent: Option<f64>,
2535        rss_bytes: Option<u64>,
2536    ) -> Result<()> {
2537        self.conn.execute(
2538            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2539             VALUES (?1, ?2, ?3, ?4, ?5)",
2540            params![
2541                session_id,
2542                ts_ms as i64,
2543                pid as i64,
2544                cpu_percent,
2545                rss_bytes.map(|b| b as i64)
2546            ],
2547        )?;
2548        Ok(())
2549    }
2550
2551    /// Per-session maxima for retro heuristics.
2552    pub fn list_session_sample_aggs_in_window(
2553        &self,
2554        workspace: &str,
2555        start_ms: u64,
2556        end_ms: u64,
2557    ) -> Result<Vec<SessionSampleAgg>> {
2558        let mut stmt = self.conn.prepare(
2559            "SELECT ss.session_id, COUNT(*) AS n,
2560                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2561             FROM session_samples ss
2562             JOIN sessions s ON s.id = ss.session_id
2563             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2564             GROUP BY ss.session_id",
2565        )?;
2566        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2567            let sid: String = r.get(0)?;
2568            let n: i64 = r.get(1)?;
2569            let max_cpu: Option<f64> = r.get(2)?;
2570            let max_rss: Option<i64> = r.get(3)?;
2571            Ok(SessionSampleAgg {
2572                session_id: sid,
2573                sample_count: n as u64,
2574                max_cpu_percent: max_cpu.unwrap_or(0.0),
2575                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2576            })
2577        })?;
2578        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2579    }
2580
2581    pub fn list_sessions_for_eval(
2582        &self,
2583        since_ms: u64,
2584        min_cost_usd: f64,
2585    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2586        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2587        let mut stmt = self.conn.prepare(
2588            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2589                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2590                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2591                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2592             FROM sessions s
2593             WHERE s.started_at_ms >= ?1
2594               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2595               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2596             ORDER BY s.started_at_ms DESC",
2597        )?;
2598        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2599            Ok((
2600                r.get::<_, String>(0)?,
2601                r.get::<_, String>(1)?,
2602                r.get::<_, Option<String>>(2)?,
2603                r.get::<_, String>(3)?,
2604                r.get::<_, i64>(4)?,
2605                r.get::<_, Option<i64>>(5)?,
2606                r.get::<_, String>(6)?,
2607                r.get::<_, String>(7)?,
2608                r.get::<_, Option<String>>(8)?,
2609                r.get::<_, Option<String>>(9)?,
2610                r.get::<_, Option<String>>(10)?,
2611                r.get::<_, Option<i64>>(11)?,
2612                r.get::<_, Option<i64>>(12)?,
2613                r.get::<_, Option<String>>(13)?,
2614                r.get::<_, Option<String>>(14)?,
2615                r.get::<_, Option<String>>(15)?,
2616                r.get::<_, Option<String>>(16)?,
2617                r.get::<_, Option<String>>(17)?,
2618                r.get::<_, Option<String>>(18)?,
2619                r.get::<_, Option<i64>>(19)?,
2620                r.get::<_, Option<i64>>(20)?,
2621            ))
2622        })?;
2623        let mut out = Vec::new();
2624        for row in rows {
2625            let (
2626                id,
2627                agent,
2628                model,
2629                workspace,
2630                started,
2631                ended,
2632                status_str,
2633                trace,
2634                start_commit,
2635                end_commit,
2636                branch,
2637                dirty_start,
2638                dirty_end,
2639                source,
2640                prompt_fingerprint,
2641                parent_session_id,
2642                agent_version,
2643                os,
2644                arch,
2645                repo_file_count,
2646                repo_total_loc,
2647            ) = row?;
2648            out.push(crate::core::event::SessionRecord {
2649                id,
2650                agent,
2651                model,
2652                workspace,
2653                started_at_ms: started as u64,
2654                ended_at_ms: ended.map(|v| v as u64),
2655                status: status_from_str(&status_str),
2656                trace_path: trace,
2657                start_commit,
2658                end_commit,
2659                branch,
2660                dirty_start: dirty_start.map(i64_to_bool),
2661                dirty_end: dirty_end.map(i64_to_bool),
2662                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2663                prompt_fingerprint,
2664                parent_session_id,
2665                agent_version,
2666                os,
2667                arch,
2668                repo_file_count: repo_file_count.map(|v| v as u32),
2669                repo_total_loc: repo_total_loc.map(|v| v as u64),
2670            });
2671        }
2672        Ok(out)
2673    }
2674
2675    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2676        self.conn.execute(
2677            "INSERT OR IGNORE INTO prompt_snapshots
2678             (fingerprint, captured_at_ms, files_json, total_bytes)
2679             VALUES (?1, ?2, ?3, ?4)",
2680            params![
2681                snap.fingerprint,
2682                snap.captured_at_ms as i64,
2683                snap.files_json,
2684                snap.total_bytes as i64
2685            ],
2686        )?;
2687        Ok(())
2688    }
2689
2690    pub fn get_prompt_snapshot(
2691        &self,
2692        fingerprint: &str,
2693    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2694        self.conn
2695            .query_row(
2696                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2697                 FROM prompt_snapshots WHERE fingerprint = ?1",
2698                params![fingerprint],
2699                |r| {
2700                    Ok(crate::prompt::PromptSnapshot {
2701                        fingerprint: r.get(0)?,
2702                        captured_at_ms: r.get::<_, i64>(1)? as u64,
2703                        files_json: r.get(2)?,
2704                        total_bytes: r.get::<_, i64>(3)? as u64,
2705                    })
2706                },
2707            )
2708            .optional()
2709            .map_err(Into::into)
2710    }
2711
2712    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2713        let mut stmt = self.conn.prepare(
2714            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2715             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2716        )?;
2717        let rows = stmt.query_map([], |r| {
2718            Ok(crate::prompt::PromptSnapshot {
2719                fingerprint: r.get(0)?,
2720                captured_at_ms: r.get::<_, i64>(1)? as u64,
2721                files_json: r.get(2)?,
2722                total_bytes: r.get::<_, i64>(3)? as u64,
2723            })
2724        })?;
2725        Ok(rows.filter_map(|r| r.ok()).collect())
2726    }
2727
2728    /// Sessions with a non-null prompt_fingerprint in the given window.
2729    pub fn sessions_with_prompt_fingerprint(
2730        &self,
2731        workspace: &str,
2732        start_ms: u64,
2733        end_ms: u64,
2734    ) -> Result<Vec<(String, String)>> {
2735        let mut stmt = self.conn.prepare(
2736            "SELECT id, prompt_fingerprint FROM sessions
2737             WHERE workspace = ?1
2738               AND started_at_ms >= ?2 AND started_at_ms < ?3
2739               AND prompt_fingerprint IS NOT NULL",
2740        )?;
2741        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2742            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2743        })?;
2744        Ok(rows.filter_map(|r| r.ok()).collect())
2745    }
2746}
2747
2748impl Drop for Store {
2749    fn drop(&mut self) {
2750        if let Some(writer) = self.search_writer.get_mut().as_mut() {
2751            let _ = writer.commit();
2752        }
2753    }
2754}
2755
2756fn now_ms() -> u64 {
2757    std::time::SystemTime::now()
2758        .duration_since(std::time::UNIX_EPOCH)
2759        .unwrap_or_default()
2760        .as_millis() as u64
2761}
2762
2763fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
2764    let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
2765    let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
2766    Ok(rows.filter_map(|r| r.ok()).collect())
2767}
2768
2769fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2770    raw.and_then(|s| s.trim().parse::<u64>().ok())
2771        .unwrap_or(DEFAULT_MMAP_MB)
2772        .saturating_mul(1024)
2773        .saturating_mul(1024)
2774        .min(i64::MAX as u64) as i64
2775}
2776
2777fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2778    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2779    conn.execute_batch(&format!(
2780        "
2781        PRAGMA journal_mode=WAL;
2782        PRAGMA busy_timeout=5000;
2783        PRAGMA synchronous=NORMAL;
2784        PRAGMA cache_size=-65536;
2785        PRAGMA mmap_size={mmap_size};
2786        PRAGMA temp_store=MEMORY;
2787        PRAGMA wal_autocheckpoint=1000;
2788        "
2789    ))?;
2790    if mode == StoreOpenMode::ReadOnlyQuery {
2791        conn.execute_batch("PRAGMA query_only=ON;")?;
2792    }
2793    Ok(())
2794}
2795
2796fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2797    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2798}
2799
2800fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2801    let status_str: String = row.get(6)?;
2802    Ok(SessionRecord {
2803        id: row.get(0)?,
2804        agent: row.get(1)?,
2805        model: row.get(2)?,
2806        workspace: row.get(3)?,
2807        started_at_ms: row.get::<_, i64>(4)? as u64,
2808        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2809        status: status_from_str(&status_str),
2810        trace_path: row.get(7)?,
2811        start_commit: row.get(8)?,
2812        end_commit: row.get(9)?,
2813        branch: row.get(10)?,
2814        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2815        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2816        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2817        prompt_fingerprint: row.get(14)?,
2818        parent_session_id: row.get(15)?,
2819        agent_version: row.get(16)?,
2820        os: row.get(17)?,
2821        arch: row.get(18)?,
2822        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2823        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2824    })
2825}
2826
2827fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2828    let payload_str: String = row.get(12)?;
2829    Ok(Event {
2830        session_id: row.get(0)?,
2831        seq: row.get::<_, i64>(1)? as u64,
2832        ts_ms: row.get::<_, i64>(2)? as u64,
2833        ts_exact: row.get::<_, i64>(3)? != 0,
2834        kind: kind_from_str(&row.get::<_, String>(4)?),
2835        source: source_from_str(&row.get::<_, String>(5)?),
2836        tool: row.get(6)?,
2837        tool_call_id: row.get(7)?,
2838        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2839        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2840        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2841        cost_usd_e6: row.get(11)?,
2842        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2843        stop_reason: row.get(13)?,
2844        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2845        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2846        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2847        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2848        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2849        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2850        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2851        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2852    })
2853}
2854
2855fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2856    let mut clauses = vec!["workspace = ?".to_string()];
2857    let mut args = vec![Value::Text(workspace.to_string())];
2858    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2859        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2860        args.push(Value::Text(format!("{}%", escape_like(prefix))));
2861    }
2862    if let Some(status) = &filter.status {
2863        clauses.push("status = ?".to_string());
2864        args.push(Value::Text(format!("{status:?}")));
2865    }
2866    if let Some(since_ms) = filter.since_ms {
2867        clauses.push("started_at_ms >= ?".to_string());
2868        args.push(Value::Integer(since_ms as i64));
2869    }
2870    (format!("WHERE {}", clauses.join(" AND ")), args)
2871}
2872
2873fn escape_like(raw: &str) -> String {
2874    raw.to_lowercase()
2875        .replace('\\', "\\\\")
2876        .replace('%', "\\%")
2877        .replace('_', "\\_")
2878}
2879
2880fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2881    let cost: i64 = conn.query_row(
2882        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2883        params![workspace], |r| r.get(0),
2884    )?;
2885    let with_cost: i64 = conn.query_row(
2886        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2887        params![workspace], |r| r.get(0),
2888    )?;
2889    Ok((cost, with_cost as u64))
2890}
2891
2892fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2893    let build_raw: Option<i64> = r.get(4)?;
2894    let ci_raw: Option<i64> = r.get(8)?;
2895    Ok(SessionOutcomeRow {
2896        session_id: r.get(0)?,
2897        test_passed: r.get(1)?,
2898        test_failed: r.get(2)?,
2899        test_skipped: r.get(3)?,
2900        build_ok: build_raw.map(|v| v != 0),
2901        lint_errors: r.get(5)?,
2902        revert_lines_14d: r.get(6)?,
2903        pr_open: r.get(7)?,
2904        ci_ok: ci_raw.map(|v| v != 0),
2905        measured_at_ms: r.get::<_, i64>(9)? as u64,
2906        measure_error: r.get(10)?,
2907    })
2908}
2909
2910fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2911    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2912    let score = r
2913        .get::<_, Option<i64>>(2)?
2914        .and_then(|v| FeedbackScore::new(v as u8));
2915    let label = r
2916        .get::<_, Option<String>>(3)?
2917        .and_then(|s| FeedbackLabel::from_str_opt(&s));
2918    Ok(FeedbackRecord {
2919        id: r.get(0)?,
2920        session_id: r.get(1)?,
2921        score,
2922        label,
2923        note: r.get(4)?,
2924        created_at_ms: r.get::<_, i64>(5)? as u64,
2925    })
2926}
2927
2928fn day_label(day_idx: u64) -> &'static str {
2929    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2930}
2931
2932fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2933    let week_ago = now.saturating_sub(7 * 86_400_000);
2934    let mut stmt = conn
2935        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2936    let days: Vec<u64> = stmt
2937        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2938        .filter_map(|r| r.ok())
2939        .map(|v| v as u64 / 86_400_000)
2940        .collect();
2941    let today = now / 86_400_000;
2942    Ok((0u64..7)
2943        .map(|i| {
2944            let d = today.saturating_sub(6 - i);
2945            (
2946                day_label(d).to_string(),
2947                days.iter().filter(|&&x| x == d).count() as u64,
2948            )
2949        })
2950        .collect())
2951}
2952
2953fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2954    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2955               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2956               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2957               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2958               COUNT(e.id) FROM sessions s \
2959               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2960               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2961    let mut stmt = conn.prepare(sql)?;
2962    let out: Vec<(SessionRecord, u64)> = stmt
2963        .query_map(params![workspace], |r| {
2964            let st: String = r.get(6)?;
2965            Ok((
2966                SessionRecord {
2967                    id: r.get(0)?,
2968                    agent: r.get(1)?,
2969                    model: r.get(2)?,
2970                    workspace: r.get(3)?,
2971                    started_at_ms: r.get::<_, i64>(4)? as u64,
2972                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2973                    status: status_from_str(&st),
2974                    trace_path: r.get(7)?,
2975                    start_commit: r.get(8)?,
2976                    end_commit: r.get(9)?,
2977                    branch: r.get(10)?,
2978                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2979                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2980                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2981                    prompt_fingerprint: r.get(14)?,
2982                    parent_session_id: r.get(15)?,
2983                    agent_version: r.get(16)?,
2984                    os: r.get(17)?,
2985                    arch: r.get(18)?,
2986                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2987                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2988                },
2989                r.get::<_, i64>(21)? as u64,
2990            ))
2991        })?
2992        .filter_map(|r| r.ok())
2993        .collect();
2994    Ok(out)
2995}
2996
2997fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2998    let mut stmt = conn.prepare(
2999        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3000         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3001    )?;
3002    let out: Vec<(String, u64)> = stmt
3003        .query_map(params![workspace], |r| {
3004            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3005        })?
3006        .filter_map(|r| r.ok())
3007        .collect();
3008    Ok(out)
3009}
3010
3011fn status_from_str(s: &str) -> SessionStatus {
3012    match s {
3013        "Running" => SessionStatus::Running,
3014        "Waiting" => SessionStatus::Waiting,
3015        "Idle" => SessionStatus::Idle,
3016        _ => SessionStatus::Done,
3017    }
3018}
3019
3020fn projector_legacy_mode() -> bool {
3021    std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3022}
3023
3024fn is_stop_event(e: &Event) -> bool {
3025    if !matches!(e.kind, EventKind::Hook) {
3026        return false;
3027    }
3028    e.payload
3029        .get("event")
3030        .and_then(|v| v.as_str())
3031        .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3032        == Some("Stop")
3033}
3034
3035fn kind_from_str(s: &str) -> EventKind {
3036    match s {
3037        "ToolCall" => EventKind::ToolCall,
3038        "ToolResult" => EventKind::ToolResult,
3039        "Message" => EventKind::Message,
3040        "Error" => EventKind::Error,
3041        "Cost" => EventKind::Cost,
3042        "Hook" => EventKind::Hook,
3043        "Lifecycle" => EventKind::Lifecycle,
3044        _ => EventKind::Hook,
3045    }
3046}
3047
3048fn source_from_str(s: &str) -> EventSource {
3049    match s {
3050        "Tail" => EventSource::Tail,
3051        "Hook" => EventSource::Hook,
3052        _ => EventSource::Proxy,
3053    }
3054}
3055
3056fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3057    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3058    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3059    ensure_column(conn, "sessions", "branch", "TEXT")?;
3060    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3061    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3062    ensure_column(
3063        conn,
3064        "sessions",
3065        "repo_binding_source",
3066        "TEXT NOT NULL DEFAULT ''",
3067    )?;
3068    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3069    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3070    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3071    ensure_column(conn, "events", "stop_reason", "TEXT")?;
3072    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3073    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3074    ensure_column(conn, "events", "retry_count", "INTEGER")?;
3075    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3076    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3077    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3078    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3079    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3080    ensure_column(
3081        conn,
3082        "sync_outbox",
3083        "kind",
3084        "TEXT NOT NULL DEFAULT 'events'",
3085    )?;
3086    ensure_column(
3087        conn,
3088        "experiments",
3089        "state",
3090        "TEXT NOT NULL DEFAULT 'Draft'",
3091    )?;
3092    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3093    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3094    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3095    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3096    ensure_column(conn, "sessions", "os", "TEXT")?;
3097    ensure_column(conn, "sessions", "arch", "TEXT")?;
3098    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3099    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3100    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3101    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3102    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3103    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3104    conn.execute_batch(
3105        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3106         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3107    )?;
3108    Ok(())
3109}
3110
3111fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3112    if has_column(conn, table, column)? {
3113        return Ok(());
3114    }
3115    conn.execute(
3116        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
3117        [],
3118    )?;
3119    Ok(())
3120}
3121
3122fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3123    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3124    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3125    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3126}
3127
3128fn bool_to_i64(v: bool) -> i64 {
3129    if v { 1 } else { 0 }
3130}
3131
3132fn i64_to_bool(v: i64) -> bool {
3133    v != 0
3134}
3135
3136fn empty_to_none(s: String) -> Option<String> {
3137    if s.is_empty() { None } else { Some(s) }
3138}
3139
3140#[cfg(test)]
3141mod tests {
3142    use super::*;
3143    use serde_json::json;
3144    use std::collections::HashSet;
3145    use tempfile::TempDir;
3146
3147    fn make_session(id: &str) -> SessionRecord {
3148        SessionRecord {
3149            id: id.to_string(),
3150            agent: "cursor".to_string(),
3151            model: None,
3152            workspace: "/ws".to_string(),
3153            started_at_ms: 1000,
3154            ended_at_ms: None,
3155            status: SessionStatus::Done,
3156            trace_path: "/trace".to_string(),
3157            start_commit: None,
3158            end_commit: None,
3159            branch: None,
3160            dirty_start: None,
3161            dirty_end: None,
3162            repo_binding_source: None,
3163            prompt_fingerprint: None,
3164            parent_session_id: None,
3165            agent_version: None,
3166            os: None,
3167            arch: None,
3168            repo_file_count: None,
3169            repo_total_loc: None,
3170        }
3171    }
3172
3173    fn make_event(session_id: &str, seq: u64) -> Event {
3174        Event {
3175            session_id: session_id.to_string(),
3176            seq,
3177            ts_ms: 1000 + seq * 100,
3178            ts_exact: false,
3179            kind: EventKind::ToolCall,
3180            source: EventSource::Tail,
3181            tool: Some("read_file".to_string()),
3182            tool_call_id: Some(format!("call_{seq}")),
3183            tokens_in: None,
3184            tokens_out: None,
3185            reasoning_tokens: None,
3186            cost_usd_e6: None,
3187            stop_reason: None,
3188            latency_ms: None,
3189            ttft_ms: None,
3190            retry_count: None,
3191            context_used_tokens: None,
3192            context_max_tokens: None,
3193            cache_creation_tokens: None,
3194            cache_read_tokens: None,
3195            system_prompt_tokens: None,
3196            payload: json!({}),
3197        }
3198    }
3199
3200    #[test]
3201    fn open_and_wal_mode() {
3202        let dir = TempDir::new().unwrap();
3203        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3204        let mode: String = store
3205            .conn
3206            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3207            .unwrap();
3208        assert_eq!(mode, "wal");
3209    }
3210
3211    #[test]
3212    fn open_applies_phase0_pragmas() {
3213        let dir = TempDir::new().unwrap();
3214        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3215        let synchronous: i64 = store
3216            .conn
3217            .query_row("PRAGMA synchronous", [], |r| r.get(0))
3218            .unwrap();
3219        let cache_size: i64 = store
3220            .conn
3221            .query_row("PRAGMA cache_size", [], |r| r.get(0))
3222            .unwrap();
3223        let temp_store: i64 = store
3224            .conn
3225            .query_row("PRAGMA temp_store", [], |r| r.get(0))
3226            .unwrap();
3227        let wal_autocheckpoint: i64 = store
3228            .conn
3229            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3230            .unwrap();
3231        assert_eq!(synchronous, 1);
3232        assert_eq!(cache_size, -65_536);
3233        assert_eq!(temp_store, 2);
3234        assert_eq!(wal_autocheckpoint, 1_000);
3235        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3236    }
3237
3238    #[test]
3239    fn read_only_open_sets_query_only() {
3240        let dir = TempDir::new().unwrap();
3241        let db = dir.path().join("kaizen.db");
3242        Store::open(&db).unwrap();
3243        let store = Store::open_read_only(&db).unwrap();
3244        let query_only: i64 = store
3245            .conn
3246            .query_row("PRAGMA query_only", [], |r| r.get(0))
3247            .unwrap();
3248        assert_eq!(query_only, 1);
3249    }
3250
3251    #[test]
3252    fn phase0_indexes_exist() {
3253        let dir = TempDir::new().unwrap();
3254        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3255        for name in [
3256            "tool_spans_session_idx",
3257            "tool_spans_started_idx",
3258            "session_samples_ts_idx",
3259            "events_ts_idx",
3260            "feedback_session_idx",
3261        ] {
3262            let found: i64 = store
3263                .conn
3264                .query_row(
3265                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3266                    params![name],
3267                    |r| r.get(0),
3268                )
3269                .unwrap();
3270            assert_eq!(found, 1, "{name}");
3271        }
3272    }
3273
3274    #[test]
3275    fn upsert_and_get_session() {
3276        let dir = TempDir::new().unwrap();
3277        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3278        let s = make_session("s1");
3279        store.upsert_session(&s).unwrap();
3280
3281        let got = store.get_session("s1").unwrap().unwrap();
3282        assert_eq!(got.id, "s1");
3283        assert_eq!(got.status, SessionStatus::Done);
3284    }
3285
3286    #[test]
3287    fn append_and_list_events_round_trip() {
3288        let dir = TempDir::new().unwrap();
3289        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3290        let s = make_session("s2");
3291        store.upsert_session(&s).unwrap();
3292        store.append_event(&make_event("s2", 0)).unwrap();
3293        store.append_event(&make_event("s2", 1)).unwrap();
3294
3295        let sessions = store.list_sessions("/ws").unwrap();
3296        assert_eq!(sessions.len(), 1);
3297        assert_eq!(sessions[0].id, "s2");
3298    }
3299
3300    #[test]
3301    fn list_sessions_page_orders_and_counts() {
3302        let dir = TempDir::new().unwrap();
3303        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3304        let mut a = make_session("a");
3305        a.started_at_ms = 2_000;
3306        let mut b = make_session("b");
3307        b.started_at_ms = 2_000;
3308        let mut c = make_session("c");
3309        c.started_at_ms = 1_000;
3310        store.upsert_session(&c).unwrap();
3311        store.upsert_session(&b).unwrap();
3312        store.upsert_session(&a).unwrap();
3313
3314        let page = store
3315            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3316            .unwrap();
3317        assert_eq!(page.total, 3);
3318        assert_eq!(page.next_offset, Some(2));
3319        assert_eq!(
3320            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3321            vec!["a", "b"]
3322        );
3323
3324        let all = store.list_sessions("/ws").unwrap();
3325        assert_eq!(
3326            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3327            vec!["a", "b", "c"]
3328        );
3329    }
3330
3331    #[test]
3332    fn list_sessions_page_filters_in_sql_shape() {
3333        let dir = TempDir::new().unwrap();
3334        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3335        let mut cursor = make_session("cursor");
3336        cursor.agent = "Cursor".into();
3337        cursor.started_at_ms = 2_000;
3338        cursor.status = SessionStatus::Running;
3339        let mut claude = make_session("claude");
3340        claude.agent = "claude".into();
3341        claude.started_at_ms = 3_000;
3342        store.upsert_session(&cursor).unwrap();
3343        store.upsert_session(&claude).unwrap();
3344
3345        let page = store
3346            .list_sessions_page(
3347                "/ws",
3348                0,
3349                10,
3350                SessionFilter {
3351                    agent_prefix: Some("cur".into()),
3352                    status: Some(SessionStatus::Running),
3353                    since_ms: Some(1_500),
3354                },
3355            )
3356            .unwrap();
3357        assert_eq!(page.total, 1);
3358        assert_eq!(page.rows[0].id, "cursor");
3359    }
3360
3361    #[test]
3362    fn incremental_session_helpers_find_new_rows_and_statuses() {
3363        let dir = TempDir::new().unwrap();
3364        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3365        let mut old = make_session("old");
3366        old.started_at_ms = 1_000;
3367        let mut new = make_session("new");
3368        new.started_at_ms = 2_000;
3369        new.status = SessionStatus::Running;
3370        store.upsert_session(&old).unwrap();
3371        store.upsert_session(&new).unwrap();
3372
3373        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3374        assert_eq!(rows.len(), 1);
3375        assert_eq!(rows[0].id, "new");
3376
3377        store
3378            .update_session_status("new", SessionStatus::Done)
3379            .unwrap();
3380        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3381        assert_eq!(statuses.len(), 1);
3382        assert_eq!(statuses[0].status, SessionStatus::Done);
3383    }
3384
3385    #[test]
3386    fn summary_stats_empty() {
3387        let dir = TempDir::new().unwrap();
3388        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3389        let stats = store.summary_stats("/ws").unwrap();
3390        assert_eq!(stats.session_count, 0);
3391        assert_eq!(stats.total_cost_usd_e6, 0);
3392    }
3393
3394    #[test]
3395    fn summary_stats_counts_sessions() {
3396        let dir = TempDir::new().unwrap();
3397        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3398        store.upsert_session(&make_session("a")).unwrap();
3399        store.upsert_session(&make_session("b")).unwrap();
3400        let stats = store.summary_stats("/ws").unwrap();
3401        assert_eq!(stats.session_count, 2);
3402        assert_eq!(stats.by_agent.len(), 1);
3403        assert_eq!(stats.by_agent[0].0, "cursor");
3404        assert_eq!(stats.by_agent[0].1, 2);
3405    }
3406
3407    #[test]
3408    fn list_events_for_session_round_trip() {
3409        let dir = TempDir::new().unwrap();
3410        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3411        store.upsert_session(&make_session("s4")).unwrap();
3412        store.append_event(&make_event("s4", 0)).unwrap();
3413        store.append_event(&make_event("s4", 1)).unwrap();
3414        let events = store.list_events_for_session("s4").unwrap();
3415        assert_eq!(events.len(), 2);
3416        assert_eq!(events[0].seq, 0);
3417        assert_eq!(events[1].seq, 1);
3418    }
3419
3420    #[test]
3421    fn list_events_page_uses_inclusive_seq_cursor() {
3422        let dir = TempDir::new().unwrap();
3423        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3424        store.upsert_session(&make_session("paged")).unwrap();
3425        for seq in 0..5 {
3426            store.append_event(&make_event("paged", seq)).unwrap();
3427        }
3428        let first = store.list_events_page("paged", 0, 2).unwrap();
3429        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3430        let second = store
3431            .list_events_page("paged", first[1].seq + 1, 2)
3432            .unwrap();
3433        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3434    }
3435
3436    #[test]
3437    fn append_event_dedup() {
3438        let dir = TempDir::new().unwrap();
3439        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3440        store.upsert_session(&make_session("s5")).unwrap();
3441        store.append_event(&make_event("s5", 0)).unwrap();
3442        // Duplicate — should be silently ignored
3443        store.append_event(&make_event("s5", 0)).unwrap();
3444        let events = store.list_events_for_session("s5").unwrap();
3445        assert_eq!(events.len(), 1);
3446    }
3447
3448    #[test]
3449    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3450        let dir = TempDir::new().unwrap();
3451        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3452        assert!(store.session_span_tree("missing").unwrap().is_empty());
3453        assert!(store.span_tree_cache.borrow().is_some());
3454
3455        store.upsert_session(&make_session("tree")).unwrap();
3456        let call = make_event("tree", 0);
3457        store.append_event(&call).unwrap();
3458        assert!(store.span_tree_cache.borrow().is_none());
3459        assert!(store.session_span_tree("tree").unwrap().is_empty());
3460        assert!(store.span_tree_cache.borrow().is_some());
3461        let mut result = make_event("tree", 1);
3462        result.kind = EventKind::ToolResult;
3463        result.tool_call_id = call.tool_call_id.clone();
3464        store.append_event(&result).unwrap();
3465        assert!(store.span_tree_cache.borrow().is_none());
3466        let first = store.session_span_tree("tree").unwrap();
3467        assert_eq!(first.len(), 1);
3468        assert!(store.span_tree_cache.borrow().is_some());
3469        store.append_event(&make_event("tree", 2)).unwrap();
3470        assert!(store.span_tree_cache.borrow().is_none());
3471    }
3472
3473    #[test]
3474    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3475        let dir = TempDir::new().unwrap();
3476        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3477        store.upsert_session(&make_session("spans")).unwrap();
3478        for (id, started, ended) in [
3479            ("started", Some(200_i64), None),
3480            ("fallback", None, Some(250_i64)),
3481            ("outside", Some(400_i64), None),
3482            ("too_old", None, Some(50_i64)),
3483            ("started_wins", Some(500_i64), Some(200_i64)),
3484        ] {
3485            store
3486                .conn
3487                .execute(
3488                    "INSERT INTO tool_spans
3489                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3490                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3491                    params![id, started, ended],
3492                )
3493                .unwrap();
3494        }
3495        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3496        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3497        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3498    }
3499
3500    #[test]
3501    fn upsert_idempotent() {
3502        let dir = TempDir::new().unwrap();
3503        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3504        let mut s = make_session("s3");
3505        store.upsert_session(&s).unwrap();
3506        s.status = SessionStatus::Running;
3507        store.upsert_session(&s).unwrap();
3508
3509        let got = store.get_session("s3").unwrap().unwrap();
3510        assert_eq!(got.status, SessionStatus::Running);
3511    }
3512
3513    #[test]
3514    fn append_event_indexes_path_from_payload() {
3515        let dir = TempDir::new().unwrap();
3516        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3517        store.upsert_session(&make_session("sx")).unwrap();
3518        let mut ev = make_event("sx", 0);
3519        ev.payload = json!({"input": {"path": "src/lib.rs"}});
3520        store.append_event(&ev).unwrap();
3521        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3522        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3523    }
3524
3525    #[test]
3526    fn update_session_status_changes_status() {
3527        let dir = TempDir::new().unwrap();
3528        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3529        store.upsert_session(&make_session("s6")).unwrap();
3530        store
3531            .update_session_status("s6", SessionStatus::Running)
3532            .unwrap();
3533        let got = store.get_session("s6").unwrap().unwrap();
3534        assert_eq!(got.status, SessionStatus::Running);
3535    }
3536
3537    #[test]
3538    fn prune_sessions_removes_old_rows_and_keeps_recent() {
3539        let dir = TempDir::new().unwrap();
3540        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3541        let mut old = make_session("old");
3542        old.started_at_ms = 1_000;
3543        let mut new = make_session("new");
3544        new.started_at_ms = 9_000_000_000_000;
3545        store.upsert_session(&old).unwrap();
3546        store.upsert_session(&new).unwrap();
3547        store.append_event(&make_event("old", 0)).unwrap();
3548
3549        let stats = store.prune_sessions_started_before(5_000).unwrap();
3550        assert_eq!(
3551            stats,
3552            PruneStats {
3553                sessions_removed: 1,
3554                events_removed: 1,
3555            }
3556        );
3557        assert!(store.get_session("old").unwrap().is_none());
3558        assert!(store.get_session("new").unwrap().is_some());
3559        let sessions = store.list_sessions("/ws").unwrap();
3560        assert_eq!(sessions.len(), 1);
3561        assert_eq!(sessions[0].id, "new");
3562    }
3563
3564    #[test]
3565    fn append_event_indexes_rules_from_payload() {
3566        let dir = TempDir::new().unwrap();
3567        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3568        store.upsert_session(&make_session("sr")).unwrap();
3569        let mut ev = make_event("sr", 0);
3570        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3571        store.append_event(&ev).unwrap();
3572        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3573        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3574    }
3575
3576    #[test]
3577    fn guidance_report_counts_skill_and_rule_sessions() {
3578        let dir = TempDir::new().unwrap();
3579        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3580        store.upsert_session(&make_session("sx")).unwrap();
3581        let mut ev = make_event("sx", 0);
3582        ev.payload =
3583            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3584        ev.cost_usd_e6 = Some(500_000);
3585        store.append_event(&ev).unwrap();
3586
3587        let mut skill_slugs = HashSet::new();
3588        skill_slugs.insert("tdd".into());
3589        let mut rule_slugs = HashSet::new();
3590        rule_slugs.insert("style".into());
3591
3592        let rep = store
3593            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3594            .unwrap();
3595        assert_eq!(rep.sessions_in_window, 1);
3596        let tdd = rep
3597            .rows
3598            .iter()
3599            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3600            .unwrap();
3601        assert_eq!(tdd.sessions, 1);
3602        assert!(tdd.on_disk);
3603        let style = rep
3604            .rows
3605            .iter()
3606            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3607            .unwrap();
3608        assert_eq!(style.sessions, 1);
3609        assert!(style.on_disk);
3610    }
3611
3612    #[test]
3613    fn prune_sessions_removes_rules_used_rows() {
3614        let dir = TempDir::new().unwrap();
3615        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3616        let mut old = make_session("old_r");
3617        old.started_at_ms = 1_000;
3618        store.upsert_session(&old).unwrap();
3619        let mut ev = make_event("old_r", 0);
3620        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3621        store.append_event(&ev).unwrap();
3622
3623        store.prune_sessions_started_before(5_000).unwrap();
3624        let n: i64 = store
3625            .conn
3626            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3627            .unwrap();
3628        assert_eq!(n, 0);
3629    }
3630}