Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
9use crate::store::tool_span_index::{
10    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
11};
12use crate::sync::context::SyncIngestContext;
13use crate::sync::outbound::outbound_event_from_row;
14use crate::sync::redact::redact_payload;
15use crate::sync::smart::enqueue_tool_spans_for_session;
16use anyhow::{Context, Result};
17use rusqlite::types::Value;
18use rusqlite::{
19    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
20};
21use std::cell::RefCell;
22use std::collections::{HashMap, HashSet};
23use std::path::Path;
24
25/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
26/// Rows below this use `sessions.started_at_ms` for time-window matching.
27const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
28const DEFAULT_MMAP_MB: u64 = 256;
29const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
30    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
31    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
32    repo_file_count, repo_total_loc FROM sessions";
33
34const MIGRATIONS: &[&str] = &[
35    "CREATE TABLE IF NOT EXISTS sessions (
36        id TEXT PRIMARY KEY,
37        agent TEXT NOT NULL,
38        model TEXT,
39        workspace TEXT NOT NULL,
40        started_at_ms INTEGER NOT NULL,
41        ended_at_ms INTEGER,
42        status TEXT NOT NULL,
43        trace_path TEXT NOT NULL
44    )",
45    "CREATE TABLE IF NOT EXISTS events (
46        id INTEGER PRIMARY KEY AUTOINCREMENT,
47        session_id TEXT NOT NULL,
48        seq INTEGER NOT NULL,
49        ts_ms INTEGER NOT NULL,
50        kind TEXT NOT NULL,
51        source TEXT NOT NULL,
52        tool TEXT,
53        tokens_in INTEGER,
54        tokens_out INTEGER,
55        cost_usd_e6 INTEGER,
56        payload TEXT NOT NULL
57    )",
58    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
59    "CREATE TABLE IF NOT EXISTS files_touched (
60        id INTEGER PRIMARY KEY AUTOINCREMENT,
61        session_id TEXT NOT NULL,
62        path TEXT NOT NULL
63    )",
64    "CREATE TABLE IF NOT EXISTS skills_used (
65        id INTEGER PRIMARY KEY AUTOINCREMENT,
66        session_id TEXT NOT NULL,
67        skill TEXT NOT NULL
68    )",
69    "CREATE TABLE IF NOT EXISTS sync_outbox (
70        id INTEGER PRIMARY KEY AUTOINCREMENT,
71        session_id TEXT NOT NULL,
72        payload TEXT NOT NULL,
73        sent INTEGER NOT NULL DEFAULT 0
74    )",
75    "CREATE TABLE IF NOT EXISTS experiments (
76        id TEXT PRIMARY KEY,
77        name TEXT NOT NULL,
78        created_at_ms INTEGER NOT NULL,
79        metadata TEXT NOT NULL DEFAULT '{}'
80    )",
81    "CREATE TABLE IF NOT EXISTS experiment_tags (
82        experiment_id TEXT NOT NULL,
83        session_id TEXT NOT NULL,
84        variant TEXT NOT NULL,
85        PRIMARY KEY (experiment_id, session_id)
86    )",
87    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
88    "CREATE TABLE IF NOT EXISTS sync_state (
89        k TEXT PRIMARY KEY,
90        v TEXT NOT NULL
91    )",
92    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
93    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
94    "CREATE TABLE IF NOT EXISTS tool_spans (
95        span_id TEXT PRIMARY KEY,
96        session_id TEXT NOT NULL,
97        tool TEXT,
98        tool_call_id TEXT,
99        status TEXT NOT NULL,
100        started_at_ms INTEGER,
101        ended_at_ms INTEGER,
102        lead_time_ms INTEGER,
103        tokens_in INTEGER,
104        tokens_out INTEGER,
105        reasoning_tokens INTEGER,
106        cost_usd_e6 INTEGER,
107        paths_json TEXT NOT NULL DEFAULT '[]'
108    )",
109    "CREATE TABLE IF NOT EXISTS tool_span_paths (
110        span_id TEXT NOT NULL,
111        path TEXT NOT NULL,
112        PRIMARY KEY (span_id, path)
113    )",
114    "CREATE TABLE IF NOT EXISTS session_repo_binding (
115        session_id TEXT PRIMARY KEY,
116        start_commit TEXT,
117        end_commit TEXT,
118        branch TEXT,
119        dirty_start INTEGER,
120        dirty_end INTEGER,
121        repo_binding_source TEXT NOT NULL DEFAULT ''
122    )",
123    "CREATE TABLE IF NOT EXISTS repo_snapshots (
124        id TEXT PRIMARY KEY,
125        workspace TEXT NOT NULL,
126        head_commit TEXT,
127        dirty_fingerprint TEXT NOT NULL,
128        analyzer_version TEXT NOT NULL,
129        indexed_at_ms INTEGER NOT NULL,
130        dirty INTEGER NOT NULL DEFAULT 0,
131        graph_path TEXT NOT NULL
132    )",
133    "CREATE TABLE IF NOT EXISTS file_facts (
134        snapshot_id TEXT NOT NULL,
135        path TEXT NOT NULL,
136        language TEXT NOT NULL,
137        bytes INTEGER NOT NULL,
138        loc INTEGER NOT NULL,
139        sloc INTEGER NOT NULL,
140        complexity_total INTEGER NOT NULL,
141        max_fn_complexity INTEGER NOT NULL,
142        symbol_count INTEGER NOT NULL,
143        import_count INTEGER NOT NULL,
144        fan_in INTEGER NOT NULL,
145        fan_out INTEGER NOT NULL,
146        churn_30d INTEGER NOT NULL,
147        churn_90d INTEGER NOT NULL,
148        authors_90d INTEGER NOT NULL,
149        last_changed_ms INTEGER,
150        PRIMARY KEY (snapshot_id, path)
151    )",
152    "CREATE TABLE IF NOT EXISTS repo_edges (
153        snapshot_id TEXT NOT NULL,
154        from_id TEXT NOT NULL,
155        to_id TEXT NOT NULL,
156        kind TEXT NOT NULL,
157        weight INTEGER NOT NULL,
158        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
159    )",
160    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
161    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
162    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
163    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
164    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
165        ON sessions(workspace, started_at_ms DESC, id ASC)",
166    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
167        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
168    "CREATE TABLE IF NOT EXISTS rules_used (
169        id INTEGER PRIMARY KEY AUTOINCREMENT,
170        session_id TEXT NOT NULL,
171        rule TEXT NOT NULL
172    )",
173    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
174    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
175    "CREATE TABLE IF NOT EXISTS remote_pull_state (
176        id INTEGER PRIMARY KEY CHECK (id = 1),
177        query_provider TEXT NOT NULL DEFAULT 'none',
178        cursor_json TEXT NOT NULL DEFAULT '',
179        last_success_ms INTEGER
180    )",
181    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
182    "CREATE TABLE IF NOT EXISTS remote_sessions (
183        team_id TEXT NOT NULL,
184        workspace_hash TEXT NOT NULL,
185        session_id_hash TEXT NOT NULL,
186        json TEXT NOT NULL,
187        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
188    )",
189    "CREATE TABLE IF NOT EXISTS remote_events (
190        team_id TEXT NOT NULL,
191        workspace_hash TEXT NOT NULL,
192        session_id_hash TEXT NOT NULL,
193        event_seq INTEGER NOT NULL,
194        json TEXT NOT NULL,
195        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
196    )",
197    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
198        team_id TEXT NOT NULL,
199        workspace_hash TEXT NOT NULL,
200        span_id_hash TEXT NOT NULL,
201        json TEXT NOT NULL,
202        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
203    )",
204    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
205        team_id TEXT NOT NULL,
206        workspace_hash TEXT NOT NULL,
207        snapshot_id_hash TEXT NOT NULL,
208        chunk_index INTEGER NOT NULL,
209        json TEXT NOT NULL,
210        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
211    )",
212    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
213        team_id TEXT NOT NULL,
214        workspace_hash TEXT NOT NULL,
215        fact_key TEXT NOT NULL,
216        json TEXT NOT NULL,
217        PRIMARY KEY (team_id, workspace_hash, fact_key)
218    )",
219    "CREATE TABLE IF NOT EXISTS session_evals (
220        id            TEXT    PRIMARY KEY,
221        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
222        judge_model   TEXT    NOT NULL,
223        rubric_id     TEXT    NOT NULL,
224        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
225        rationale     TEXT    NOT NULL,
226        flagged       INTEGER NOT NULL DEFAULT 0,
227        created_at_ms INTEGER NOT NULL
228    );
229    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
230    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
231    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
232        fingerprint   TEXT    PRIMARY KEY,
233        captured_at_ms INTEGER NOT NULL,
234        files_json    TEXT    NOT NULL,
235        total_bytes   INTEGER NOT NULL
236    )",
237    "CREATE TABLE IF NOT EXISTS session_feedback (
238        id TEXT PRIMARY KEY,
239        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
240        score INTEGER CHECK(score BETWEEN 1 AND 5),
241        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
242        note TEXT,
243        created_at_ms INTEGER NOT NULL
244    );
245    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
246    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
247    "CREATE TABLE IF NOT EXISTS session_outcomes (
248        session_id TEXT PRIMARY KEY NOT NULL,
249        test_passed INTEGER,
250        test_failed INTEGER,
251        test_skipped INTEGER,
252        build_ok INTEGER,
253        lint_errors INTEGER,
254        revert_lines_14d INTEGER,
255        pr_open INTEGER,
256        ci_ok INTEGER,
257        measured_at_ms INTEGER NOT NULL,
258        measure_error TEXT
259    )",
260    "CREATE TABLE IF NOT EXISTS session_samples (
261        session_id TEXT NOT NULL,
262        ts_ms INTEGER NOT NULL,
263        pid INTEGER NOT NULL,
264        cpu_percent REAL,
265        rss_bytes INTEGER,
266        PRIMARY KEY (session_id, ts_ms, pid)
267    )",
268    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
269    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
270    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
271    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
272    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
273    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
274    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
275];
276
277/// Per-workspace activity dashboard stats.
278#[derive(Clone)]
279pub struct InsightsStats {
280    pub total_sessions: u64,
281    pub running_sessions: u64,
282    pub total_events: u64,
283    /// (day label e.g. "Mon", count) last 7 days oldest first
284    pub sessions_by_day: Vec<(String, u64)>,
285    /// Recent sessions DESC by started_at, max 3; paired with event count
286    pub recent: Vec<(SessionRecord, u64)>,
287    /// Top tools by event count, max 5
288    pub top_tools: Vec<(String, u64)>,
289    pub total_cost_usd_e6: i64,
290    pub sessions_with_cost: u64,
291}
292
293/// Sync daemon / outbox status for `kaizen sync status`.
294pub struct SyncStatusSnapshot {
295    pub pending_outbox: u64,
296    pub last_success_ms: Option<u64>,
297    pub last_error: Option<String>,
298    pub consecutive_failures: u32,
299}
300
301/// Aggregate stats across sessions + events for a workspace.
302#[derive(serde::Serialize)]
303pub struct SummaryStats {
304    pub session_count: u64,
305    pub total_cost_usd_e6: i64,
306    pub by_agent: Vec<(String, u64)>,
307    pub by_model: Vec<(String, u64)>,
308    pub top_tools: Vec<(String, u64)>,
309}
310
311/// Skill vs Cursor rule for [`GuidancePerfRow`].
312#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
313#[serde(rename_all = "lowercase")]
314pub enum GuidanceKind {
315    Skill,
316    Rule,
317}
318
319/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
320#[derive(Clone, Debug, serde::Serialize)]
321pub struct GuidancePerfRow {
322    pub kind: GuidanceKind,
323    pub id: String,
324    pub sessions: u64,
325    pub sessions_pct: f64,
326    pub total_cost_usd_e6: i64,
327    pub avg_cost_per_session_usd: Option<f64>,
328    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
329    pub on_disk: bool,
330}
331
332/// Aggregated skill/rule adoption and cost proxy for a time window.
333#[derive(Clone, Debug, serde::Serialize)]
334pub struct GuidanceReport {
335    pub workspace: String,
336    pub window_start_ms: u64,
337    pub window_end_ms: u64,
338    pub sessions_in_window: u64,
339    pub workspace_avg_cost_per_session_usd: Option<f64>,
340    pub rows: Vec<GuidancePerfRow>,
341}
342
343/// Result of [`Store::prune_sessions_started_before`].
344#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
345pub struct PruneStats {
346    pub sessions_removed: u64,
347    pub events_removed: u64,
348}
349
350/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
351#[derive(Debug, Clone, Eq, PartialEq)]
352pub struct SessionOutcomeRow {
353    pub session_id: String,
354    pub test_passed: Option<i64>,
355    pub test_failed: Option<i64>,
356    pub test_skipped: Option<i64>,
357    pub build_ok: Option<bool>,
358    pub lint_errors: Option<i64>,
359    pub revert_lines_14d: Option<i64>,
360    pub pr_open: Option<i64>,
361    pub ci_ok: Option<bool>,
362    pub measured_at_ms: u64,
363    pub measure_error: Option<String>,
364}
365
366/// Aggregated process samples for retro (Tier D).
367#[derive(Debug, Clone)]
368pub struct SessionSampleAgg {
369    pub session_id: String,
370    pub sample_count: u64,
371    pub max_cpu_percent: f64,
372    pub max_rss_bytes: u64,
373}
374
375/// `sync_state` keys for agent rescan throttling and auto-prune.
376pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
377pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
378
379pub struct ToolSpanSyncRow {
380    pub span_id: String,
381    pub session_id: String,
382    pub tool: Option<String>,
383    pub tool_call_id: Option<String>,
384    pub status: String,
385    pub started_at_ms: Option<u64>,
386    pub ended_at_ms: Option<u64>,
387    pub lead_time_ms: Option<u64>,
388    pub tokens_in: Option<u32>,
389    pub tokens_out: Option<u32>,
390    pub reasoning_tokens: Option<u32>,
391    pub cost_usd_e6: Option<i64>,
392    pub paths: Vec<String>,
393}
394
395#[derive(Debug, Clone, Copy, Eq, PartialEq)]
396pub enum StoreOpenMode {
397    ReadWrite,
398    ReadOnlyQuery,
399}
400
401#[derive(Debug, Clone)]
402pub struct SessionStatusRow {
403    pub id: String,
404    pub status: SessionStatus,
405    pub ended_at_ms: Option<u64>,
406}
407
408#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
409pub struct SessionFilter {
410    pub agent_prefix: Option<String>,
411    pub status: Option<SessionStatus>,
412    pub since_ms: Option<u64>,
413}
414
415#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
416pub struct SessionPage {
417    pub rows: Vec<SessionRecord>,
418    pub total: usize,
419    pub next_offset: Option<usize>,
420}
421
422#[derive(Clone)]
423struct SpanTreeCacheEntry {
424    session_id: String,
425    last_event_seq: Option<u64>,
426    nodes: Vec<crate::store::span_tree::SpanNode>,
427}
428
429pub struct Store {
430    conn: Connection,
431    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
432    projector: RefCell<Projector>,
433}
434
435impl Store {
436    pub(crate) fn conn(&self) -> &Connection {
437        &self.conn
438    }
439
440    pub fn open(path: &Path) -> Result<Self> {
441        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
442    }
443
444    pub fn open_read_only(path: &Path) -> Result<Self> {
445        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
446    }
447
448    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
449        if let Some(parent) = path.parent() {
450            std::fs::create_dir_all(parent)?;
451        }
452        let conn = match mode {
453            StoreOpenMode::ReadWrite => Connection::open(path),
454            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
455                path,
456                OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
457            ),
458        }
459        .with_context(|| format!("open db: {}", path.display()))?;
460        apply_pragmas(&conn, mode)?;
461        if mode == StoreOpenMode::ReadWrite {
462            for sql in MIGRATIONS {
463                conn.execute_batch(sql)?;
464            }
465            ensure_schema_columns(&conn)?;
466        }
467        let store = Self {
468            conn,
469            span_tree_cache: RefCell::new(None),
470            projector: RefCell::new(Projector::default()),
471        };
472        if mode == StoreOpenMode::ReadWrite {
473            store.warm_projector()?;
474        }
475        Ok(store)
476    }
477
478    fn invalidate_span_tree_cache(&self) {
479        *self.span_tree_cache.borrow_mut() = None;
480    }
481
482    fn warm_projector(&self) -> Result<()> {
483        let ids = self.running_session_ids()?;
484        let mut projector = self.projector.borrow_mut();
485        for id in ids {
486            for event in self.list_events_for_session(&id)? {
487                let _ = projector.apply(&event);
488            }
489        }
490        Ok(())
491    }
492
493    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
494        self.conn.execute(
495            "INSERT INTO sessions (
496                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
497                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
498                prompt_fingerprint, parent_session_id, agent_version, os, arch,
499                repo_file_count, repo_total_loc
500             )
501             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
502                ?16, ?17, ?18, ?19, ?20, ?21)
503             ON CONFLICT(id) DO UPDATE SET
504               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
505               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
506               status=excluded.status, trace_path=excluded.trace_path,
507               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
508               branch=excluded.branch, dirty_start=excluded.dirty_start,
509               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
510               prompt_fingerprint=excluded.prompt_fingerprint,
511               parent_session_id=excluded.parent_session_id,
512               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
513               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
514            params![
515                s.id,
516                s.agent,
517                s.model,
518                s.workspace,
519                s.started_at_ms as i64,
520                s.ended_at_ms.map(|v| v as i64),
521                format!("{:?}", s.status),
522                s.trace_path,
523                s.start_commit,
524                s.end_commit,
525                s.branch,
526                s.dirty_start.map(bool_to_i64),
527                s.dirty_end.map(bool_to_i64),
528                s.repo_binding_source.clone().unwrap_or_default(),
529                s.prompt_fingerprint.as_deref(),
530                s.parent_session_id.as_deref(),
531                s.agent_version.as_deref(),
532                s.os.as_deref(),
533                s.arch.as_deref(),
534                s.repo_file_count.map(|v| v as i64),
535                s.repo_total_loc.map(|v| v as i64),
536            ],
537        )?;
538        self.conn.execute(
539            "INSERT INTO session_repo_binding (
540                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
541             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
542             ON CONFLICT(session_id) DO UPDATE SET
543                start_commit=excluded.start_commit,
544                end_commit=excluded.end_commit,
545                branch=excluded.branch,
546                dirty_start=excluded.dirty_start,
547                dirty_end=excluded.dirty_end,
548                repo_binding_source=excluded.repo_binding_source",
549            params![
550                s.id,
551                s.start_commit,
552                s.end_commit,
553                s.branch,
554                s.dirty_start.map(bool_to_i64),
555                s.dirty_end.map(bool_to_i64),
556                s.repo_binding_source.clone().unwrap_or_default(),
557            ],
558        )?;
559        Ok(())
560    }
561
562    /// Insert a minimal session row if none exists. Used by hook ingestion when
563    /// the first observed event is not `SessionStart` (hooks installed mid-session).
564    pub fn ensure_session_stub(
565        &self,
566        id: &str,
567        agent: &str,
568        workspace: &str,
569        started_at_ms: u64,
570    ) -> Result<()> {
571        self.conn.execute(
572            "INSERT OR IGNORE INTO sessions (
573                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
574                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
575                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
576             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
577                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
578            params![id, agent, workspace, started_at_ms as i64],
579        )?;
580        Ok(())
581    }
582
583    /// Next `seq` for a new event in this session (0 when there are no events yet).
584    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
585        let n: i64 = self.conn.query_row(
586            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
587            [session_id],
588            |r| r.get(0),
589        )?;
590        Ok(n as u64)
591    }
592
593    pub fn append_event(&self, e: &Event) -> Result<()> {
594        self.append_event_with_sync(e, None)
595    }
596
597    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
598    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
599        let last_before = if projector_legacy_mode() {
600            None
601        } else {
602            self.last_event_seq_for_session(&e.session_id)?
603        };
604        let payload = serde_json::to_string(&e.payload)?;
605        self.conn.execute(
606            "INSERT INTO events (
607                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
608                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
609                stop_reason, latency_ms, ttft_ms, retry_count,
610                context_used_tokens, context_max_tokens,
611                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
612             ) VALUES (
613                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
614                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
615             )
616             ON CONFLICT(session_id, seq) DO UPDATE SET
617                ts_ms = excluded.ts_ms,
618                ts_exact = excluded.ts_exact,
619                kind = excluded.kind,
620                source = excluded.source,
621                tool = excluded.tool,
622                tool_call_id = excluded.tool_call_id,
623                tokens_in = excluded.tokens_in,
624                tokens_out = excluded.tokens_out,
625                reasoning_tokens = excluded.reasoning_tokens,
626                cost_usd_e6 = excluded.cost_usd_e6,
627                payload = excluded.payload,
628                stop_reason = excluded.stop_reason,
629                latency_ms = excluded.latency_ms,
630                ttft_ms = excluded.ttft_ms,
631                retry_count = excluded.retry_count,
632                context_used_tokens = excluded.context_used_tokens,
633                context_max_tokens = excluded.context_max_tokens,
634                cache_creation_tokens = excluded.cache_creation_tokens,
635                cache_read_tokens = excluded.cache_read_tokens,
636                system_prompt_tokens = excluded.system_prompt_tokens",
637            params![
638                e.session_id,
639                e.seq as i64,
640                e.ts_ms as i64,
641                bool_to_i64(e.ts_exact),
642                format!("{:?}", e.kind),
643                format!("{:?}", e.source),
644                e.tool,
645                e.tool_call_id,
646                e.tokens_in.map(|v| v as i64),
647                e.tokens_out.map(|v| v as i64),
648                e.reasoning_tokens.map(|v| v as i64),
649                e.cost_usd_e6,
650                payload,
651                e.stop_reason,
652                e.latency_ms.map(|v| v as i64),
653                e.ttft_ms.map(|v| v as i64),
654                e.retry_count.map(|v| v as i64),
655                e.context_used_tokens.map(|v| v as i64),
656                e.context_max_tokens.map(|v| v as i64),
657                e.cache_creation_tokens.map(|v| v as i64),
658                e.cache_read_tokens.map(|v| v as i64),
659                e.system_prompt_tokens.map(|v| v as i64),
660            ],
661        )?;
662        if self.conn.changes() == 0 {
663            return Ok(());
664        }
665        if projector_legacy_mode() {
666            index_event_derived(&self.conn, e)?;
667            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
668            self.invalidate_span_tree_cache();
669        } else if last_before.is_some_and(|last| e.seq <= last) {
670            self.replay_projector_session(&e.session_id)?;
671        } else {
672            let deltas = self.projector.borrow_mut().apply(e);
673            self.apply_projector_events(&deltas)?;
674            let expired = self
675                .projector
676                .borrow_mut()
677                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
678            self.apply_projector_events(&expired)?;
679            if is_stop_event(e) {
680                let flushed = self
681                    .projector
682                    .borrow_mut()
683                    .flush_session(&e.session_id, e.ts_ms);
684                self.apply_projector_events(&flushed)?;
685            }
686            self.invalidate_span_tree_cache();
687        }
688        let Some(ctx) = ctx else {
689            return Ok(());
690        };
691        let sync = &ctx.sync;
692        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
693            return Ok(());
694        }
695        let Some(salt) = try_team_salt(sync) else {
696            tracing::warn!(
697                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
698            );
699            return Ok(());
700        };
701        if sync.sample_rate < 1.0 {
702            let u: f64 = rand::random();
703            if u > sync.sample_rate {
704                return Ok(());
705            }
706        }
707        let Some(session) = self.get_session(&e.session_id)? else {
708            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
709            return Ok(());
710        };
711        let mut outbound = outbound_event_from_row(e, &session, &salt);
712        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
713        let row = serde_json::to_string(&outbound)?;
714        self.conn.execute(
715            "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
716            params![e.session_id, row],
717        )?;
718        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
719        Ok(())
720    }
721
722    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
723        if projector_legacy_mode() {
724            rebuild_tool_spans_for_session(&self.conn, session_id)?;
725            self.invalidate_span_tree_cache();
726            return Ok(());
727        }
728        let deltas = self
729            .projector
730            .borrow_mut()
731            .flush_session(session_id, now_ms);
732        if self.apply_projector_events(&deltas)? {
733            self.invalidate_span_tree_cache();
734        }
735        Ok(())
736    }
737
738    fn replay_projector_session(&self, session_id: &str) -> Result<()> {
739        clear_session_spans(&self.conn, session_id)?;
740        self.projector.borrow_mut().reset_session(session_id);
741        let events = self.list_events_for_session(session_id)?;
742        let mut changed = false;
743        for event in &events {
744            let deltas = self.projector.borrow_mut().apply(event);
745            changed |= self.apply_projector_events(&deltas)?;
746        }
747        if self
748            .get_session(session_id)?
749            .is_some_and(|session| session.status == SessionStatus::Done)
750        {
751            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
752            let deltas = self
753                .projector
754                .borrow_mut()
755                .flush_session(session_id, now_ms);
756            changed |= self.apply_projector_events(&deltas)?;
757        }
758        if changed {
759            self.invalidate_span_tree_cache();
760        }
761        Ok(())
762    }
763
764    fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
765        let mut changed = false;
766        for delta in deltas {
767            match delta {
768                ProjectorEvent::SpanClosed(span, sample) => {
769                    upsert_tool_span_record(&self.conn, span)?;
770                    tracing::debug!(
771                        session_id = %sample.session_id,
772                        span_id = %sample.span_id,
773                        tool = ?sample.tool,
774                        lead_time_ms = ?sample.lead_time_ms,
775                        tokens_in = ?sample.tokens_in,
776                        tokens_out = ?sample.tokens_out,
777                        reasoning_tokens = ?sample.reasoning_tokens,
778                        cost_usd_e6 = ?sample.cost_usd_e6,
779                        paths = ?sample.paths,
780                        "tool span closed"
781                    );
782                    changed = true;
783                }
784                ProjectorEvent::SpanPatched(span) => {
785                    upsert_tool_span_record(&self.conn, span)?;
786                    changed = true;
787                }
788                ProjectorEvent::FileTouched { session, path } => {
789                    self.conn.execute(
790                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
791                        params![session, path],
792                    )?;
793                    changed = true;
794                }
795                ProjectorEvent::SkillUsed { session, skill } => {
796                    self.conn.execute(
797                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
798                        params![session, skill],
799                    )?;
800                    changed = true;
801                }
802                ProjectorEvent::RuleUsed { session, rule } => {
803                    self.conn.execute(
804                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
805                        params![session, rule],
806                    )?;
807                    changed = true;
808                }
809            }
810        }
811        Ok(changed)
812    }
813
814    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
815        let mut stmt = self.conn.prepare(
816            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
817        )?;
818        let rows = stmt.query_map(params![limit as i64], |row| {
819            Ok((
820                row.get::<_, i64>(0)?,
821                row.get::<_, String>(1)?,
822                row.get::<_, String>(2)?,
823            ))
824        })?;
825        let mut out = Vec::new();
826        for r in rows {
827            out.push(r?);
828        }
829        Ok(out)
830    }
831
832    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
833        for id in ids {
834            self.conn
835                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
836        }
837        Ok(())
838    }
839
840    pub fn replace_outbox_rows(
841        &self,
842        owner_id: &str,
843        kind: &str,
844        payloads: &[String],
845    ) -> Result<()> {
846        self.conn.execute(
847            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
848            params![owner_id, kind],
849        )?;
850        for payload in payloads {
851            self.conn.execute(
852                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
853                params![owner_id, kind, payload],
854            )?;
855        }
856        Ok(())
857    }
858
859    pub fn outbox_pending_count(&self) -> Result<u64> {
860        let c: i64 =
861            self.conn
862                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
863                    r.get(0)
864                })?;
865        Ok(c as u64)
866    }
867
868    pub fn set_sync_state_ok(&self) -> Result<()> {
869        let now = now_ms().to_string();
870        self.conn.execute(
871            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
872             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
873            params![now],
874        )?;
875        self.conn.execute(
876            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
877             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
878            [],
879        )?;
880        self.conn
881            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
882        Ok(())
883    }
884
885    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
886        let prev: i64 = self
887            .conn
888            .query_row(
889                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
890                [],
891                |r| {
892                    let s: String = r.get(0)?;
893                    Ok(s.parse::<i64>().unwrap_or(0))
894                },
895            )
896            .optional()?
897            .unwrap_or(0);
898        let next = prev.saturating_add(1);
899        self.conn.execute(
900            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
901             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
902            params![msg],
903        )?;
904        self.conn.execute(
905            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
906             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
907            params![next.to_string()],
908        )?;
909        Ok(())
910    }
911
912    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
913        let pending_outbox = self.outbox_pending_count()?;
914        let last_success_ms = self
915            .conn
916            .query_row(
917                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
918                [],
919                |r| r.get::<_, String>(0),
920            )
921            .optional()?
922            .and_then(|s| s.parse().ok());
923        let last_error = self
924            .conn
925            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
926                r.get::<_, String>(0)
927            })
928            .optional()?;
929        let consecutive_failures = self
930            .conn
931            .query_row(
932                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
933                [],
934                |r| r.get::<_, String>(0),
935            )
936            .optional()?
937            .and_then(|s| s.parse().ok())
938            .unwrap_or(0);
939        Ok(SyncStatusSnapshot {
940            pending_outbox,
941            last_success_ms,
942            last_error,
943            consecutive_failures,
944        })
945    }
946
947    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
948        let row: Option<String> = self
949            .conn
950            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
951                r.get::<_, String>(0)
952            })
953            .optional()?;
954        Ok(row.and_then(|s| s.parse().ok()))
955    }
956
957    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
958        self.conn.execute(
959            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
960             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
961            params![key, v.to_string()],
962        )?;
963        Ok(())
964    }
965
966    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
967    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
968        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
969        let sessions_to_remove: i64 = tx.query_row(
970            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
971            params![cutoff_ms],
972            |r| r.get(0),
973        )?;
974        let events_to_remove: i64 = tx.query_row(
975            "SELECT COUNT(*) FROM events WHERE session_id IN \
976             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
977            params![cutoff_ms],
978            |r| r.get(0),
979        )?;
980
981        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
982        tx.execute(
983            &format!(
984                "DELETE FROM tool_span_paths WHERE span_id IN \
985                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
986            ),
987            params![cutoff_ms],
988        )?;
989        tx.execute(
990            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
991            params![cutoff_ms],
992        )?;
993        tx.execute(
994            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
995            params![cutoff_ms],
996        )?;
997        tx.execute(
998            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
999            params![cutoff_ms],
1000        )?;
1001        tx.execute(
1002            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1003            params![cutoff_ms],
1004        )?;
1005        tx.execute(
1006            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1007            params![cutoff_ms],
1008        )?;
1009        tx.execute(
1010            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1011            params![cutoff_ms],
1012        )?;
1013        tx.execute(
1014            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1015            params![cutoff_ms],
1016        )?;
1017        tx.execute(
1018            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1019            params![cutoff_ms],
1020        )?;
1021        tx.execute(
1022            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1023            params![cutoff_ms],
1024        )?;
1025        tx.execute(
1026            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1027            params![cutoff_ms],
1028        )?;
1029        tx.execute(
1030            "DELETE FROM sessions WHERE started_at_ms < ?1",
1031            params![cutoff_ms],
1032        )?;
1033        tx.commit()?;
1034        self.invalidate_span_tree_cache();
1035        Ok(PruneStats {
1036            sessions_removed: sessions_to_remove as u64,
1037            events_removed: events_to_remove as u64,
1038        })
1039    }
1040
1041    /// Reclaim file space after large deletes (exclusive lock; can be slow).
1042    pub fn vacuum(&self) -> Result<()> {
1043        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1044        Ok(())
1045    }
1046
1047    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1048        Ok(self
1049            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1050            .rows)
1051    }
1052
1053    pub fn list_sessions_page(
1054        &self,
1055        workspace: &str,
1056        offset: usize,
1057        limit: usize,
1058        filter: SessionFilter,
1059    ) -> Result<SessionPage> {
1060        let (where_sql, args) = session_filter_sql(workspace, &filter);
1061        let total = self.query_session_page_count(&where_sql, &args)?;
1062        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1063        let next = offset.saturating_add(rows.len());
1064        Ok(SessionPage {
1065            rows,
1066            total,
1067            next_offset: (next < total).then_some(next),
1068        })
1069    }
1070
1071    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1072        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1073        let total: i64 = self
1074            .conn
1075            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1076        Ok(total as usize)
1077    }
1078
1079    fn query_session_page_rows(
1080        &self,
1081        where_sql: &str,
1082        args: &[Value],
1083        offset: usize,
1084        limit: usize,
1085    ) -> Result<Vec<SessionRecord>> {
1086        let sql = format!(
1087            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1088        );
1089        let mut values = args.to_vec();
1090        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1091        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1092        let mut stmt = self.conn.prepare(&sql)?;
1093        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1094        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1095    }
1096
1097    pub fn list_sessions_started_after(
1098        &self,
1099        workspace: &str,
1100        after_started_at_ms: u64,
1101    ) -> Result<Vec<SessionRecord>> {
1102        let mut stmt = self.conn.prepare(
1103            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1104                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1105                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1106                    repo_file_count, repo_total_loc
1107             FROM sessions
1108             WHERE workspace = ?1 AND started_at_ms > ?2
1109             ORDER BY started_at_ms DESC, id ASC",
1110        )?;
1111        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1112        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1113    }
1114
1115    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1116        if ids.is_empty() {
1117            return Ok(Vec::new());
1118        }
1119        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1120        let sql =
1121            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1122        let mut stmt = self.conn.prepare(&sql)?;
1123        let params: Vec<&dyn rusqlite::ToSql> =
1124            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1125        let rows = stmt.query_map(params.as_slice(), |r| {
1126            let status: String = r.get(1)?;
1127            Ok(SessionStatusRow {
1128                id: r.get(0)?,
1129                status: status_from_str(&status),
1130                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1131            })
1132        })?;
1133        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1134    }
1135
1136    fn running_session_ids(&self) -> Result<Vec<String>> {
1137        let mut stmt = self
1138            .conn
1139            .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1140        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1141        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1142    }
1143
1144    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1145        let session_count: i64 = self.conn.query_row(
1146            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1147            params![workspace],
1148            |r| r.get(0),
1149        )?;
1150
1151        let total_cost: i64 = self.conn.query_row(
1152            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1153             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1154            params![workspace],
1155            |r| r.get(0),
1156        )?;
1157
1158        let mut stmt = self.conn.prepare(
1159            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1160        )?;
1161        let by_agent: Vec<(String, u64)> = stmt
1162            .query_map(params![workspace], |r| {
1163                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1164            })?
1165            .filter_map(|r| r.ok())
1166            .collect();
1167
1168        let mut stmt = self.conn.prepare(
1169            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1170        )?;
1171        let by_model: Vec<(String, u64)> = stmt
1172            .query_map(params![workspace], |r| {
1173                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1174            })?
1175            .filter_map(|r| r.ok())
1176            .collect();
1177
1178        let mut stmt = self.conn.prepare(
1179            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1180             WHERE s.workspace = ?1 AND tool IS NOT NULL
1181             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1182        )?;
1183        let top_tools: Vec<(String, u64)> = stmt
1184            .query_map(params![workspace], |r| {
1185                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1186            })?
1187            .filter_map(|r| r.ok())
1188            .collect();
1189
1190        Ok(SummaryStats {
1191            session_count: session_count as u64,
1192            total_cost_usd_e6: total_cost,
1193            by_agent,
1194            by_model,
1195            top_tools,
1196        })
1197    }
1198
1199    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1200        self.list_events_page(session_id, 0, i64::MAX as usize)
1201    }
1202
1203    pub fn list_events_page(
1204        &self,
1205        session_id: &str,
1206        after_seq: u64,
1207        limit: usize,
1208    ) -> Result<Vec<Event>> {
1209        let mut stmt = self.conn.prepare(
1210            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1211                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1212                    stop_reason, latency_ms, ttft_ms, retry_count,
1213                    context_used_tokens, context_max_tokens,
1214                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1215             FROM events
1216             WHERE session_id = ?1 AND seq >= ?2
1217             ORDER BY seq ASC LIMIT ?3",
1218        )?;
1219        let rows = stmt.query_map(
1220            params![
1221                session_id,
1222                after_seq as i64,
1223                limit.min(i64::MAX as usize) as i64
1224            ],
1225            event_row,
1226        )?;
1227        let mut events = Vec::new();
1228        for row in rows {
1229            events.push(row?);
1230        }
1231        Ok(events)
1232    }
1233
1234    /// Update only status for existing session.
1235    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1236        self.conn.execute(
1237            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1238            params![format!("{:?}", status), id],
1239        )?;
1240        Ok(())
1241    }
1242
1243    /// Workspace activity dashboard — feeds `cmd_insights`.
1244    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1245        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1246        Ok(InsightsStats {
1247            total_sessions: count_q(
1248                &self.conn,
1249                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1250                workspace,
1251            )?,
1252            running_sessions: count_q(
1253                &self.conn,
1254                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1255                workspace,
1256            )?,
1257            total_events: count_q(
1258                &self.conn,
1259                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1260                workspace,
1261            )?,
1262            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1263            recent: recent_sessions_3(&self.conn, workspace)?,
1264            top_tools: top_tools_5(&self.conn, workspace)?,
1265            total_cost_usd_e6,
1266            sessions_with_cost,
1267        })
1268    }
1269
1270    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1271    pub fn retro_events_in_window(
1272        &self,
1273        workspace: &str,
1274        start_ms: u64,
1275        end_ms: u64,
1276    ) -> Result<Vec<(SessionRecord, Event)>> {
1277        let mut stmt = self.conn.prepare(
1278            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1279                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1280                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1281                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1282                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1283                    s.repo_file_count, s.repo_total_loc,
1284                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1285                    e.context_used_tokens, e.context_max_tokens,
1286                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1287             FROM events e
1288             JOIN sessions s ON s.id = e.session_id
1289             WHERE s.workspace = ?1
1290               AND (
1291                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1292                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1293               )
1294             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1295        )?;
1296        let rows = stmt.query_map(
1297            params![
1298                workspace,
1299                start_ms as i64,
1300                end_ms as i64,
1301                SYNTHETIC_TS_CEILING_MS,
1302            ],
1303            |row| {
1304                let payload_str: String = row.get(12)?;
1305                let status_str: String = row.get(19)?;
1306                Ok((
1307                    SessionRecord {
1308                        id: row.get(13)?,
1309                        agent: row.get(14)?,
1310                        model: row.get(15)?,
1311                        workspace: row.get(16)?,
1312                        started_at_ms: row.get::<_, i64>(17)? as u64,
1313                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1314                        status: status_from_str(&status_str),
1315                        trace_path: row.get(20)?,
1316                        start_commit: row.get(21)?,
1317                        end_commit: row.get(22)?,
1318                        branch: row.get(23)?,
1319                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1320                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1321                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1322                        prompt_fingerprint: row.get(27)?,
1323                        parent_session_id: row.get(28)?,
1324                        agent_version: row.get(29)?,
1325                        os: row.get(30)?,
1326                        arch: row.get(31)?,
1327                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1328                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1329                    },
1330                    Event {
1331                        session_id: row.get(0)?,
1332                        seq: row.get::<_, i64>(1)? as u64,
1333                        ts_ms: row.get::<_, i64>(2)? as u64,
1334                        ts_exact: row.get::<_, i64>(3)? != 0,
1335                        kind: kind_from_str(&row.get::<_, String>(4)?),
1336                        source: source_from_str(&row.get::<_, String>(5)?),
1337                        tool: row.get(6)?,
1338                        tool_call_id: row.get(7)?,
1339                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1340                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1341                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1342                        cost_usd_e6: row.get(11)?,
1343                        payload: serde_json::from_str(&payload_str)
1344                            .unwrap_or(serde_json::Value::Null),
1345                        stop_reason: row.get(34)?,
1346                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1347                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1348                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1349                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1350                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1351                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1352                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1353                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1354                    },
1355                ))
1356            },
1357        )?;
1358
1359        let mut out = Vec::new();
1360        for r in rows {
1361            out.push(r?);
1362        }
1363        Ok(out)
1364    }
1365
1366    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1367    pub fn files_touched_in_window(
1368        &self,
1369        workspace: &str,
1370        start_ms: u64,
1371        end_ms: u64,
1372    ) -> Result<Vec<(String, String)>> {
1373        let mut stmt = self.conn.prepare(
1374            "SELECT DISTINCT ft.session_id, ft.path
1375             FROM files_touched ft
1376             JOIN sessions s ON s.id = ft.session_id
1377             WHERE s.workspace = ?1
1378               AND EXISTS (
1379                 SELECT 1 FROM events e
1380                 JOIN sessions ss ON ss.id = e.session_id
1381                 WHERE e.session_id = ft.session_id
1382                   AND (
1383                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1384                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1385                   )
1386               )
1387             ORDER BY ft.session_id, ft.path",
1388        )?;
1389        let out: Vec<(String, String)> = stmt
1390            .query_map(
1391                params![
1392                    workspace,
1393                    start_ms as i64,
1394                    end_ms as i64,
1395                    SYNTHETIC_TS_CEILING_MS,
1396                ],
1397                |r| Ok((r.get(0)?, r.get(1)?)),
1398            )?
1399            .filter_map(|r| r.ok())
1400            .collect();
1401        Ok(out)
1402    }
1403
1404    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1405    /// (any session with an indexed skill row; join events optional — use row existence).
1406    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1407        let mut stmt = self.conn.prepare(
1408            "SELECT DISTINCT su.skill
1409             FROM skills_used su
1410             JOIN sessions s ON s.id = su.session_id
1411             WHERE s.workspace = ?1
1412               AND EXISTS (
1413                 SELECT 1 FROM events e
1414                 JOIN sessions ss ON ss.id = e.session_id
1415                 WHERE e.session_id = su.session_id
1416                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1417               )
1418             ORDER BY su.skill",
1419        )?;
1420        let out: Vec<String> = stmt
1421            .query_map(
1422                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1423                |r| r.get::<_, String>(0),
1424            )?
1425            .filter_map(|r| r.ok())
1426            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1427            .collect();
1428        Ok(out)
1429    }
1430
1431    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1432    pub fn skills_used_in_window(
1433        &self,
1434        workspace: &str,
1435        start_ms: u64,
1436        end_ms: u64,
1437    ) -> Result<Vec<(String, String)>> {
1438        let mut stmt = self.conn.prepare(
1439            "SELECT DISTINCT su.session_id, su.skill
1440             FROM skills_used su
1441             JOIN sessions s ON s.id = su.session_id
1442             WHERE s.workspace = ?1
1443               AND EXISTS (
1444                 SELECT 1 FROM events e
1445                 JOIN sessions ss ON ss.id = e.session_id
1446                 WHERE e.session_id = su.session_id
1447                   AND (
1448                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1449                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1450                   )
1451               )
1452             ORDER BY su.session_id, su.skill",
1453        )?;
1454        let out: Vec<(String, String)> = stmt
1455            .query_map(
1456                params![
1457                    workspace,
1458                    start_ms as i64,
1459                    end_ms as i64,
1460                    SYNTHETIC_TS_CEILING_MS,
1461                ],
1462                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1463            )?
1464            .filter_map(|r| r.ok())
1465            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1466            .collect();
1467        Ok(out)
1468    }
1469
1470    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1471    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1472        let mut stmt = self.conn.prepare(
1473            "SELECT DISTINCT ru.rule
1474             FROM rules_used ru
1475             JOIN sessions s ON s.id = ru.session_id
1476             WHERE s.workspace = ?1
1477               AND EXISTS (
1478                 SELECT 1 FROM events e
1479                 JOIN sessions ss ON ss.id = e.session_id
1480                 WHERE e.session_id = ru.session_id
1481                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1482               )
1483             ORDER BY ru.rule",
1484        )?;
1485        let out: Vec<String> = stmt
1486            .query_map(
1487                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1488                |r| r.get::<_, String>(0),
1489            )?
1490            .filter_map(|r| r.ok())
1491            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1492            .collect();
1493        Ok(out)
1494    }
1495
1496    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1497    pub fn rules_used_in_window(
1498        &self,
1499        workspace: &str,
1500        start_ms: u64,
1501        end_ms: u64,
1502    ) -> Result<Vec<(String, String)>> {
1503        let mut stmt = self.conn.prepare(
1504            "SELECT DISTINCT ru.session_id, ru.rule
1505             FROM rules_used ru
1506             JOIN sessions s ON s.id = ru.session_id
1507             WHERE s.workspace = ?1
1508               AND EXISTS (
1509                 SELECT 1 FROM events e
1510                 JOIN sessions ss ON ss.id = e.session_id
1511                 WHERE e.session_id = ru.session_id
1512                   AND (
1513                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1514                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1515                   )
1516               )
1517             ORDER BY ru.session_id, ru.rule",
1518        )?;
1519        let out: Vec<(String, String)> = stmt
1520            .query_map(
1521                params![
1522                    workspace,
1523                    start_ms as i64,
1524                    end_ms as i64,
1525                    SYNTHETIC_TS_CEILING_MS,
1526                ],
1527                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1528            )?
1529            .filter_map(|r| r.ok())
1530            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1531            .collect();
1532        Ok(out)
1533    }
1534
1535    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1536    pub fn sessions_active_in_window(
1537        &self,
1538        workspace: &str,
1539        start_ms: u64,
1540        end_ms: u64,
1541    ) -> Result<HashSet<String>> {
1542        let mut stmt = self.conn.prepare(
1543            "SELECT DISTINCT s.id
1544             FROM sessions s
1545             WHERE s.workspace = ?1
1546               AND EXISTS (
1547                 SELECT 1 FROM events e
1548                 WHERE e.session_id = s.id
1549                   AND (
1550                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1551                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1552                   )
1553               )",
1554        )?;
1555        let out: HashSet<String> = stmt
1556            .query_map(
1557                params![
1558                    workspace,
1559                    start_ms as i64,
1560                    end_ms as i64,
1561                    SYNTHETIC_TS_CEILING_MS,
1562                ],
1563                |r| r.get(0),
1564            )?
1565            .filter_map(|r| r.ok())
1566            .collect();
1567        Ok(out)
1568    }
1569
1570    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1571    pub fn session_costs_usd_e6_in_window(
1572        &self,
1573        workspace: &str,
1574        start_ms: u64,
1575        end_ms: u64,
1576    ) -> Result<HashMap<String, i64>> {
1577        let mut stmt = self.conn.prepare(
1578            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1579             FROM events e
1580             JOIN sessions s ON s.id = e.session_id
1581             WHERE s.workspace = ?1
1582               AND (
1583                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1584                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1585               )
1586             GROUP BY e.session_id",
1587        )?;
1588        let rows: Vec<(String, i64)> = stmt
1589            .query_map(
1590                params![
1591                    workspace,
1592                    start_ms as i64,
1593                    end_ms as i64,
1594                    SYNTHETIC_TS_CEILING_MS,
1595                ],
1596                |r| Ok((r.get(0)?, r.get(1)?)),
1597            )?
1598            .filter_map(|r| r.ok())
1599            .collect();
1600        Ok(rows.into_iter().collect())
1601    }
1602
1603    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1604    pub fn guidance_report(
1605        &self,
1606        workspace: &str,
1607        window_start_ms: u64,
1608        window_end_ms: u64,
1609        skill_slugs_on_disk: &HashSet<String>,
1610        rule_slugs_on_disk: &HashSet<String>,
1611    ) -> Result<GuidanceReport> {
1612        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1613        let denom = active.len() as u64;
1614        let costs =
1615            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1616
1617        let workspace_avg_cost_per_session_usd = if denom > 0 {
1618            let total_e6: i64 = active
1619                .iter()
1620                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1621                .sum();
1622            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1623        } else {
1624            None
1625        };
1626
1627        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1628        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1629            skill_sessions.entry(skill).or_default().insert(sid);
1630        }
1631        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1632        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1633            rule_sessions.entry(rule).or_default().insert(sid);
1634        }
1635
1636        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1637
1638        let mut push_row =
1639            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1640                let sessions = sids.len() as u64;
1641                let sessions_pct = if denom > 0 {
1642                    sessions as f64 * 100.0 / denom as f64
1643                } else {
1644                    0.0
1645                };
1646                let total_cost_usd_e6: i64 = sids
1647                    .iter()
1648                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1649                    .sum();
1650                let avg_cost_per_session_usd = if sessions > 0 {
1651                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1652                } else {
1653                    None
1654                };
1655                let vs_workspace_avg_cost_per_session_usd =
1656                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1657                        (Some(avg), Some(w)) => Some(avg - w),
1658                        _ => None,
1659                    };
1660                rows.push(GuidancePerfRow {
1661                    kind,
1662                    id,
1663                    sessions,
1664                    sessions_pct,
1665                    total_cost_usd_e6,
1666                    avg_cost_per_session_usd,
1667                    vs_workspace_avg_cost_per_session_usd,
1668                    on_disk,
1669                });
1670            };
1671
1672        let mut seen_skills: HashSet<String> = HashSet::new();
1673        for (id, sids) in &skill_sessions {
1674            seen_skills.insert(id.clone());
1675            push_row(
1676                GuidanceKind::Skill,
1677                id.clone(),
1678                sids,
1679                skill_slugs_on_disk.contains(id),
1680            );
1681        }
1682        for slug in skill_slugs_on_disk {
1683            if seen_skills.contains(slug) {
1684                continue;
1685            }
1686            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1687        }
1688
1689        let mut seen_rules: HashSet<String> = HashSet::new();
1690        for (id, sids) in &rule_sessions {
1691            seen_rules.insert(id.clone());
1692            push_row(
1693                GuidanceKind::Rule,
1694                id.clone(),
1695                sids,
1696                rule_slugs_on_disk.contains(id),
1697            );
1698        }
1699        for slug in rule_slugs_on_disk {
1700            if seen_rules.contains(slug) {
1701                continue;
1702            }
1703            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1704        }
1705
1706        rows.sort_by(|a, b| {
1707            b.sessions
1708                .cmp(&a.sessions)
1709                .then_with(|| a.kind.cmp(&b.kind))
1710                .then_with(|| a.id.cmp(&b.id))
1711        });
1712
1713        Ok(GuidanceReport {
1714            workspace: workspace.to_string(),
1715            window_start_ms,
1716            window_end_ms,
1717            sessions_in_window: denom,
1718            workspace_avg_cost_per_session_usd,
1719            rows,
1720        })
1721    }
1722
1723    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1724        let mut stmt = self.conn.prepare(
1725            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1726                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1727                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1728                    repo_file_count, repo_total_loc
1729             FROM sessions WHERE id = ?1",
1730        )?;
1731        let mut rows = stmt.query_map(params![id], |row| {
1732            Ok((
1733                row.get::<_, String>(0)?,
1734                row.get::<_, String>(1)?,
1735                row.get::<_, Option<String>>(2)?,
1736                row.get::<_, String>(3)?,
1737                row.get::<_, i64>(4)?,
1738                row.get::<_, Option<i64>>(5)?,
1739                row.get::<_, String>(6)?,
1740                row.get::<_, String>(7)?,
1741                row.get::<_, Option<String>>(8)?,
1742                row.get::<_, Option<String>>(9)?,
1743                row.get::<_, Option<String>>(10)?,
1744                row.get::<_, Option<i64>>(11)?,
1745                row.get::<_, Option<i64>>(12)?,
1746                row.get::<_, String>(13)?,
1747                row.get::<_, Option<String>>(14)?,
1748                row.get::<_, Option<String>>(15)?,
1749                row.get::<_, Option<String>>(16)?,
1750                row.get::<_, Option<String>>(17)?,
1751                row.get::<_, Option<String>>(18)?,
1752                row.get::<_, Option<i64>>(19)?,
1753                row.get::<_, Option<i64>>(20)?,
1754            ))
1755        })?;
1756
1757        if let Some(row) = rows.next() {
1758            let (
1759                id,
1760                agent,
1761                model,
1762                workspace,
1763                started,
1764                ended,
1765                status_str,
1766                trace,
1767                start_commit,
1768                end_commit,
1769                branch,
1770                dirty_start,
1771                dirty_end,
1772                source,
1773                prompt_fingerprint,
1774                parent_session_id,
1775                agent_version,
1776                os,
1777                arch,
1778                repo_file_count,
1779                repo_total_loc,
1780            ) = row?;
1781            Ok(Some(SessionRecord {
1782                id,
1783                agent,
1784                model,
1785                workspace,
1786                started_at_ms: started as u64,
1787                ended_at_ms: ended.map(|v| v as u64),
1788                status: status_from_str(&status_str),
1789                trace_path: trace,
1790                start_commit,
1791                end_commit,
1792                branch,
1793                dirty_start: dirty_start.map(i64_to_bool),
1794                dirty_end: dirty_end.map(i64_to_bool),
1795                repo_binding_source: empty_to_none(source),
1796                prompt_fingerprint,
1797                parent_session_id,
1798                agent_version,
1799                os,
1800                arch,
1801                repo_file_count: repo_file_count.map(|v| v as u32),
1802                repo_total_loc: repo_total_loc.map(|v| v as u64),
1803            }))
1804        } else {
1805            Ok(None)
1806        }
1807    }
1808
1809    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1810        let mut stmt = self.conn.prepare(
1811            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1812                    indexed_at_ms, dirty, graph_path
1813             FROM repo_snapshots WHERE workspace = ?1
1814             ORDER BY indexed_at_ms DESC LIMIT 1",
1815        )?;
1816        let mut rows = stmt.query_map(params![workspace], |row| {
1817            Ok(RepoSnapshotRecord {
1818                id: row.get(0)?,
1819                workspace: row.get(1)?,
1820                head_commit: row.get(2)?,
1821                dirty_fingerprint: row.get(3)?,
1822                analyzer_version: row.get(4)?,
1823                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1824                dirty: row.get::<_, i64>(6)? != 0,
1825                graph_path: row.get(7)?,
1826            })
1827        })?;
1828        Ok(rows.next().transpose()?)
1829    }
1830
1831    pub fn save_repo_snapshot(
1832        &self,
1833        snapshot: &RepoSnapshotRecord,
1834        facts: &[FileFact],
1835        edges: &[RepoEdge],
1836    ) -> Result<()> {
1837        self.conn.execute(
1838            "INSERT INTO repo_snapshots (
1839                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1840                indexed_at_ms, dirty, graph_path
1841             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1842             ON CONFLICT(id) DO UPDATE SET
1843                workspace=excluded.workspace,
1844                head_commit=excluded.head_commit,
1845                dirty_fingerprint=excluded.dirty_fingerprint,
1846                analyzer_version=excluded.analyzer_version,
1847                indexed_at_ms=excluded.indexed_at_ms,
1848                dirty=excluded.dirty,
1849                graph_path=excluded.graph_path",
1850            params![
1851                snapshot.id,
1852                snapshot.workspace,
1853                snapshot.head_commit,
1854                snapshot.dirty_fingerprint,
1855                snapshot.analyzer_version,
1856                snapshot.indexed_at_ms as i64,
1857                bool_to_i64(snapshot.dirty),
1858                snapshot.graph_path,
1859            ],
1860        )?;
1861        self.conn.execute(
1862            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1863            params![snapshot.id],
1864        )?;
1865        self.conn.execute(
1866            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1867            params![snapshot.id],
1868        )?;
1869        for fact in facts {
1870            self.conn.execute(
1871                "INSERT INTO file_facts (
1872                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1873                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1874                    churn_30d, churn_90d, authors_90d, last_changed_ms
1875                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1876                params![
1877                    fact.snapshot_id,
1878                    fact.path,
1879                    fact.language,
1880                    fact.bytes as i64,
1881                    fact.loc as i64,
1882                    fact.sloc as i64,
1883                    fact.complexity_total as i64,
1884                    fact.max_fn_complexity as i64,
1885                    fact.symbol_count as i64,
1886                    fact.import_count as i64,
1887                    fact.fan_in as i64,
1888                    fact.fan_out as i64,
1889                    fact.churn_30d as i64,
1890                    fact.churn_90d as i64,
1891                    fact.authors_90d as i64,
1892                    fact.last_changed_ms.map(|v| v as i64),
1893                ],
1894            )?;
1895        }
1896        for edge in edges {
1897            self.conn.execute(
1898                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1899                 VALUES (?1, ?2, ?3, ?4, ?5)
1900                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1901                 DO UPDATE SET weight = weight + excluded.weight",
1902                params![
1903                    snapshot.id,
1904                    edge.from_path,
1905                    edge.to_path,
1906                    edge.kind,
1907                    edge.weight as i64,
1908                ],
1909            )?;
1910        }
1911        Ok(())
1912    }
1913
1914    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1915        let mut stmt = self.conn.prepare(
1916            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1917                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1918                    churn_30d, churn_90d, authors_90d, last_changed_ms
1919             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1920        )?;
1921        let rows = stmt.query_map(params![snapshot_id], |row| {
1922            Ok(FileFact {
1923                snapshot_id: row.get(0)?,
1924                path: row.get(1)?,
1925                language: row.get(2)?,
1926                bytes: row.get::<_, i64>(3)? as u64,
1927                loc: row.get::<_, i64>(4)? as u32,
1928                sloc: row.get::<_, i64>(5)? as u32,
1929                complexity_total: row.get::<_, i64>(6)? as u32,
1930                max_fn_complexity: row.get::<_, i64>(7)? as u32,
1931                symbol_count: row.get::<_, i64>(8)? as u32,
1932                import_count: row.get::<_, i64>(9)? as u32,
1933                fan_in: row.get::<_, i64>(10)? as u32,
1934                fan_out: row.get::<_, i64>(11)? as u32,
1935                churn_30d: row.get::<_, i64>(12)? as u32,
1936                churn_90d: row.get::<_, i64>(13)? as u32,
1937                authors_90d: row.get::<_, i64>(14)? as u32,
1938                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1939            })
1940        })?;
1941        Ok(rows.filter_map(|row| row.ok()).collect())
1942    }
1943
1944    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1945        let mut stmt = self.conn.prepare(
1946            "SELECT from_id, to_id, kind, weight
1947             FROM repo_edges WHERE snapshot_id = ?1
1948             ORDER BY kind, from_id, to_id",
1949        )?;
1950        let rows = stmt.query_map(params![snapshot_id], |row| {
1951            Ok(RepoEdge {
1952                from_path: row.get(0)?,
1953                to_path: row.get(1)?,
1954                kind: row.get(2)?,
1955                weight: row.get::<_, i64>(3)? as u32,
1956            })
1957        })?;
1958        Ok(rows.filter_map(|row| row.ok()).collect())
1959    }
1960
1961    pub fn tool_spans_in_window(
1962        &self,
1963        workspace: &str,
1964        start_ms: u64,
1965        end_ms: u64,
1966    ) -> Result<Vec<ToolSpanView>> {
1967        let mut stmt = self.conn.prepare(
1968            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1969                    reasoning_tokens, cost_usd_e6, paths_json,
1970                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1971             FROM (
1972                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1973                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1974                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1975                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1976                        ts.started_at_ms AS sort_ms
1977                 FROM tool_spans ts
1978                 JOIN sessions s ON s.id = ts.session_id
1979                 WHERE s.workspace = ?1
1980                   AND ts.started_at_ms >= ?2
1981                   AND ts.started_at_ms <= ?3
1982                 UNION ALL
1983                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1984                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1985                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1986                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1987                        ts.ended_at_ms AS sort_ms
1988                 FROM tool_spans ts
1989                 JOIN sessions s ON s.id = ts.session_id
1990                 WHERE s.workspace = ?1
1991                   AND ts.started_at_ms IS NULL
1992                   AND ts.ended_at_ms >= ?2
1993                   AND ts.ended_at_ms <= ?3
1994             )
1995             ORDER BY sort_ms DESC",
1996        )?;
1997        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1998            let paths_json: String = row.get(8)?;
1999            Ok(ToolSpanView {
2000                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2001                tool: row
2002                    .get::<_, Option<String>>(1)?
2003                    .unwrap_or_else(|| "unknown".into()),
2004                status: row.get(2)?,
2005                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2006                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2007                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2008                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2009                cost_usd_e6: row.get(7)?,
2010                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2011                parent_span_id: row.get(9)?,
2012                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2013                subtree_cost_usd_e6: row.get(11)?,
2014                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2015            })
2016        })?;
2017        Ok(rows.filter_map(|row| row.ok()).collect())
2018    }
2019
2020    pub fn session_span_tree(
2021        &self,
2022        session_id: &str,
2023    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2024        let last_event_seq = self.last_event_seq_for_session(session_id)?;
2025        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2026            && entry.session_id == session_id
2027            && entry.last_event_seq == last_event_seq
2028        {
2029            return Ok(entry.nodes.clone());
2030        }
2031        let mut stmt = self.conn.prepare(
2032            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2033                    reasoning_tokens, cost_usd_e6, paths_json,
2034                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2035             FROM tool_spans
2036             WHERE session_id = ?1
2037             ORDER BY depth ASC, started_at_ms ASC",
2038        )?;
2039        let rows = stmt.query_map(params![session_id], |row| {
2040            let paths_json: String = row.get(8)?;
2041            Ok(crate::metrics::types::ToolSpanView {
2042                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2043                tool: row
2044                    .get::<_, Option<String>>(1)?
2045                    .unwrap_or_else(|| "unknown".into()),
2046                status: row.get(2)?,
2047                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2048                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2049                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2050                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2051                cost_usd_e6: row.get(7)?,
2052                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2053                parent_span_id: row.get(9)?,
2054                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2055                subtree_cost_usd_e6: row.get(11)?,
2056                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2057            })
2058        })?;
2059        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2060        let nodes = crate::store::span_tree::build_tree(spans);
2061        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2062            session_id: session_id.to_string(),
2063            last_event_seq,
2064            nodes: nodes.clone(),
2065        });
2066        Ok(nodes)
2067    }
2068
2069    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2070        let seq = self
2071            .conn
2072            .query_row(
2073                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2074                params![session_id],
2075                |r| r.get::<_, Option<i64>>(0),
2076            )?
2077            .map(|v| v as u64);
2078        Ok(seq)
2079    }
2080
2081    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2082        let mut stmt = self.conn.prepare(
2083            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2084                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2085             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2086        )?;
2087        let rows = stmt.query_map(params![session_id], |row| {
2088            let paths_json: String = row.get(12)?;
2089            Ok(ToolSpanSyncRow {
2090                span_id: row.get(0)?,
2091                session_id: row.get(1)?,
2092                tool: row.get(2)?,
2093                tool_call_id: row.get(3)?,
2094                status: row.get(4)?,
2095                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2096                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2097                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2098                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2099                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2100                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2101                cost_usd_e6: row.get(11)?,
2102                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2103            })
2104        })?;
2105        Ok(rows.filter_map(|row| row.ok()).collect())
2106    }
2107
2108    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2109        self.conn.execute(
2110            "INSERT OR REPLACE INTO session_evals
2111             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2112             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2113            rusqlite::params![
2114                eval.id,
2115                eval.session_id,
2116                eval.judge_model,
2117                eval.rubric_id,
2118                eval.score,
2119                eval.rationale,
2120                eval.flagged as i64,
2121                eval.created_at_ms as i64,
2122            ],
2123        )?;
2124        Ok(())
2125    }
2126
2127    pub fn list_evals_in_window(
2128        &self,
2129        start_ms: u64,
2130        end_ms: u64,
2131    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2132        let mut stmt = self.conn.prepare(
2133            "SELECT id, session_id, judge_model, rubric_id, score,
2134                    rationale, flagged, created_at_ms
2135             FROM session_evals
2136             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2137             ORDER BY created_at_ms ASC",
2138        )?;
2139        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2140            Ok(crate::eval::types::EvalRow {
2141                id: r.get(0)?,
2142                session_id: r.get(1)?,
2143                judge_model: r.get(2)?,
2144                rubric_id: r.get(3)?,
2145                score: r.get(4)?,
2146                rationale: r.get(5)?,
2147                flagged: r.get::<_, i64>(6)? != 0,
2148                created_at_ms: r.get::<_, i64>(7)? as u64,
2149            })
2150        })?;
2151        rows.collect()
2152    }
2153
2154    pub fn list_evals_for_session(
2155        &self,
2156        session_id: &str,
2157    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2158        let mut stmt = self.conn.prepare(
2159            "SELECT id, session_id, judge_model, rubric_id, score,
2160                    rationale, flagged, created_at_ms
2161             FROM session_evals
2162             WHERE session_id = ?1
2163             ORDER BY created_at_ms DESC",
2164        )?;
2165        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2166            Ok(crate::eval::types::EvalRow {
2167                id: r.get(0)?,
2168                session_id: r.get(1)?,
2169                judge_model: r.get(2)?,
2170                rubric_id: r.get(3)?,
2171                score: r.get(4)?,
2172                rationale: r.get(5)?,
2173                flagged: r.get::<_, i64>(6)? != 0,
2174                created_at_ms: r.get::<_, i64>(7)? as u64,
2175            })
2176        })?;
2177        rows.collect()
2178    }
2179
2180    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2181        use crate::feedback::types::FeedbackLabel;
2182        self.conn.execute(
2183            "INSERT OR REPLACE INTO session_feedback
2184             (id, session_id, score, label, note, created_at_ms)
2185             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2186            rusqlite::params![
2187                r.id,
2188                r.session_id,
2189                r.score.as_ref().map(|s| s.0 as i64),
2190                r.label.as_ref().map(FeedbackLabel::to_db_str),
2191                r.note,
2192                r.created_at_ms as i64,
2193            ],
2194        )?;
2195        let payload = serde_json::to_string(r).unwrap_or_default();
2196        self.conn.execute(
2197            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2198             VALUES (?1, 'session_feedback', ?2, 0)",
2199            rusqlite::params![r.session_id, payload],
2200        )?;
2201        Ok(())
2202    }
2203
2204    pub fn list_feedback_in_window(
2205        &self,
2206        start_ms: u64,
2207        end_ms: u64,
2208    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2209        let mut stmt = self.conn.prepare(
2210            "SELECT id, session_id, score, label, note, created_at_ms
2211             FROM session_feedback
2212             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2213             ORDER BY created_at_ms ASC",
2214        )?;
2215        let rows = stmt.query_map(
2216            rusqlite::params![start_ms as i64, end_ms as i64],
2217            feedback_row,
2218        )?;
2219        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2220    }
2221
2222    pub fn feedback_for_sessions(
2223        &self,
2224        ids: &[String],
2225    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2226        if ids.is_empty() {
2227            return Ok(std::collections::HashMap::new());
2228        }
2229        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2230        let sql = format!(
2231            "SELECT id, session_id, score, label, note, created_at_ms
2232             FROM session_feedback WHERE session_id IN ({placeholders})
2233             ORDER BY created_at_ms DESC"
2234        );
2235        let mut stmt = self.conn.prepare(&sql)?;
2236        let params: Vec<&dyn rusqlite::ToSql> =
2237            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2238        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2239        let mut map = std::collections::HashMap::new();
2240        for row in rows {
2241            let r = row?;
2242            map.entry(r.session_id.clone()).or_insert(r);
2243        }
2244        Ok(map)
2245    }
2246
2247    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2248        self.conn.execute(
2249            "INSERT INTO session_outcomes (
2250                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2251                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2252            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2253            ON CONFLICT(session_id) DO UPDATE SET
2254                test_passed=excluded.test_passed,
2255                test_failed=excluded.test_failed,
2256                test_skipped=excluded.test_skipped,
2257                build_ok=excluded.build_ok,
2258                lint_errors=excluded.lint_errors,
2259                revert_lines_14d=excluded.revert_lines_14d,
2260                pr_open=excluded.pr_open,
2261                ci_ok=excluded.ci_ok,
2262                measured_at_ms=excluded.measured_at_ms,
2263                measure_error=excluded.measure_error",
2264            params![
2265                row.session_id,
2266                row.test_passed,
2267                row.test_failed,
2268                row.test_skipped,
2269                row.build_ok.map(bool_to_i64),
2270                row.lint_errors,
2271                row.revert_lines_14d,
2272                row.pr_open,
2273                row.ci_ok.map(bool_to_i64),
2274                row.measured_at_ms as i64,
2275                row.measure_error.as_deref(),
2276            ],
2277        )?;
2278        Ok(())
2279    }
2280
2281    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2282        let mut stmt = self.conn.prepare(
2283            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2284                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2285             FROM session_outcomes WHERE session_id = ?1",
2286        )?;
2287        let row = stmt
2288            .query_row(params![session_id], outcome_row)
2289            .optional()?;
2290        Ok(row)
2291    }
2292
2293    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2294    pub fn list_session_outcomes_in_window(
2295        &self,
2296        workspace: &str,
2297        start_ms: u64,
2298        end_ms: u64,
2299    ) -> Result<Vec<SessionOutcomeRow>> {
2300        let mut stmt = self.conn.prepare(
2301            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2302                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2303             FROM session_outcomes o
2304             JOIN sessions s ON s.id = o.session_id
2305             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2306             ORDER BY o.measured_at_ms ASC",
2307        )?;
2308        let rows = stmt.query_map(
2309            params![workspace, start_ms as i64, end_ms as i64],
2310            outcome_row,
2311        )?;
2312        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2313    }
2314
2315    pub fn append_session_sample(
2316        &self,
2317        session_id: &str,
2318        ts_ms: u64,
2319        pid: u32,
2320        cpu_percent: Option<f64>,
2321        rss_bytes: Option<u64>,
2322    ) -> Result<()> {
2323        self.conn.execute(
2324            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2325             VALUES (?1, ?2, ?3, ?4, ?5)",
2326            params![
2327                session_id,
2328                ts_ms as i64,
2329                pid as i64,
2330                cpu_percent,
2331                rss_bytes.map(|b| b as i64)
2332            ],
2333        )?;
2334        Ok(())
2335    }
2336
2337    /// Per-session maxima for retro heuristics.
2338    pub fn list_session_sample_aggs_in_window(
2339        &self,
2340        workspace: &str,
2341        start_ms: u64,
2342        end_ms: u64,
2343    ) -> Result<Vec<SessionSampleAgg>> {
2344        let mut stmt = self.conn.prepare(
2345            "SELECT ss.session_id, COUNT(*) AS n,
2346                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2347             FROM session_samples ss
2348             JOIN sessions s ON s.id = ss.session_id
2349             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2350             GROUP BY ss.session_id",
2351        )?;
2352        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2353            let sid: String = r.get(0)?;
2354            let n: i64 = r.get(1)?;
2355            let max_cpu: Option<f64> = r.get(2)?;
2356            let max_rss: Option<i64> = r.get(3)?;
2357            Ok(SessionSampleAgg {
2358                session_id: sid,
2359                sample_count: n as u64,
2360                max_cpu_percent: max_cpu.unwrap_or(0.0),
2361                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2362            })
2363        })?;
2364        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2365    }
2366
2367    pub fn list_sessions_for_eval(
2368        &self,
2369        since_ms: u64,
2370        min_cost_usd: f64,
2371    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2372        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2373        let mut stmt = self.conn.prepare(
2374            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2375                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2376                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2377                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2378             FROM sessions s
2379             WHERE s.started_at_ms >= ?1
2380               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2381               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2382             ORDER BY s.started_at_ms DESC",
2383        )?;
2384        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2385            Ok((
2386                r.get::<_, String>(0)?,
2387                r.get::<_, String>(1)?,
2388                r.get::<_, Option<String>>(2)?,
2389                r.get::<_, String>(3)?,
2390                r.get::<_, i64>(4)?,
2391                r.get::<_, Option<i64>>(5)?,
2392                r.get::<_, String>(6)?,
2393                r.get::<_, String>(7)?,
2394                r.get::<_, Option<String>>(8)?,
2395                r.get::<_, Option<String>>(9)?,
2396                r.get::<_, Option<String>>(10)?,
2397                r.get::<_, Option<i64>>(11)?,
2398                r.get::<_, Option<i64>>(12)?,
2399                r.get::<_, Option<String>>(13)?,
2400                r.get::<_, Option<String>>(14)?,
2401                r.get::<_, Option<String>>(15)?,
2402                r.get::<_, Option<String>>(16)?,
2403                r.get::<_, Option<String>>(17)?,
2404                r.get::<_, Option<String>>(18)?,
2405                r.get::<_, Option<i64>>(19)?,
2406                r.get::<_, Option<i64>>(20)?,
2407            ))
2408        })?;
2409        let mut out = Vec::new();
2410        for row in rows {
2411            let (
2412                id,
2413                agent,
2414                model,
2415                workspace,
2416                started,
2417                ended,
2418                status_str,
2419                trace,
2420                start_commit,
2421                end_commit,
2422                branch,
2423                dirty_start,
2424                dirty_end,
2425                source,
2426                prompt_fingerprint,
2427                parent_session_id,
2428                agent_version,
2429                os,
2430                arch,
2431                repo_file_count,
2432                repo_total_loc,
2433            ) = row?;
2434            out.push(crate::core::event::SessionRecord {
2435                id,
2436                agent,
2437                model,
2438                workspace,
2439                started_at_ms: started as u64,
2440                ended_at_ms: ended.map(|v| v as u64),
2441                status: status_from_str(&status_str),
2442                trace_path: trace,
2443                start_commit,
2444                end_commit,
2445                branch,
2446                dirty_start: dirty_start.map(i64_to_bool),
2447                dirty_end: dirty_end.map(i64_to_bool),
2448                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2449                prompt_fingerprint,
2450                parent_session_id,
2451                agent_version,
2452                os,
2453                arch,
2454                repo_file_count: repo_file_count.map(|v| v as u32),
2455                repo_total_loc: repo_total_loc.map(|v| v as u64),
2456            });
2457        }
2458        Ok(out)
2459    }
2460
2461    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2462        self.conn.execute(
2463            "INSERT OR IGNORE INTO prompt_snapshots
2464             (fingerprint, captured_at_ms, files_json, total_bytes)
2465             VALUES (?1, ?2, ?3, ?4)",
2466            params![
2467                snap.fingerprint,
2468                snap.captured_at_ms as i64,
2469                snap.files_json,
2470                snap.total_bytes as i64
2471            ],
2472        )?;
2473        Ok(())
2474    }
2475
2476    pub fn get_prompt_snapshot(
2477        &self,
2478        fingerprint: &str,
2479    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2480        self.conn
2481            .query_row(
2482                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2483                 FROM prompt_snapshots WHERE fingerprint = ?1",
2484                params![fingerprint],
2485                |r| {
2486                    Ok(crate::prompt::PromptSnapshot {
2487                        fingerprint: r.get(0)?,
2488                        captured_at_ms: r.get::<_, i64>(1)? as u64,
2489                        files_json: r.get(2)?,
2490                        total_bytes: r.get::<_, i64>(3)? as u64,
2491                    })
2492                },
2493            )
2494            .optional()
2495            .map_err(Into::into)
2496    }
2497
2498    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2499        let mut stmt = self.conn.prepare(
2500            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2501             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2502        )?;
2503        let rows = stmt.query_map([], |r| {
2504            Ok(crate::prompt::PromptSnapshot {
2505                fingerprint: r.get(0)?,
2506                captured_at_ms: r.get::<_, i64>(1)? as u64,
2507                files_json: r.get(2)?,
2508                total_bytes: r.get::<_, i64>(3)? as u64,
2509            })
2510        })?;
2511        Ok(rows.filter_map(|r| r.ok()).collect())
2512    }
2513
2514    /// Sessions with a non-null prompt_fingerprint in the given window.
2515    pub fn sessions_with_prompt_fingerprint(
2516        &self,
2517        workspace: &str,
2518        start_ms: u64,
2519        end_ms: u64,
2520    ) -> Result<Vec<(String, String)>> {
2521        let mut stmt = self.conn.prepare(
2522            "SELECT id, prompt_fingerprint FROM sessions
2523             WHERE workspace = ?1
2524               AND started_at_ms >= ?2 AND started_at_ms < ?3
2525               AND prompt_fingerprint IS NOT NULL",
2526        )?;
2527        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2528            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2529        })?;
2530        Ok(rows.filter_map(|r| r.ok()).collect())
2531    }
2532}
2533
2534fn now_ms() -> u64 {
2535    std::time::SystemTime::now()
2536        .duration_since(std::time::UNIX_EPOCH)
2537        .unwrap_or_default()
2538        .as_millis() as u64
2539}
2540
2541fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2542    raw.and_then(|s| s.trim().parse::<u64>().ok())
2543        .unwrap_or(DEFAULT_MMAP_MB)
2544        .saturating_mul(1024)
2545        .saturating_mul(1024)
2546        .min(i64::MAX as u64) as i64
2547}
2548
2549fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2550    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2551    conn.execute_batch(&format!(
2552        "
2553        PRAGMA journal_mode=WAL;
2554        PRAGMA busy_timeout=5000;
2555        PRAGMA synchronous=NORMAL;
2556        PRAGMA cache_size=-65536;
2557        PRAGMA mmap_size={mmap_size};
2558        PRAGMA temp_store=MEMORY;
2559        PRAGMA wal_autocheckpoint=1000;
2560        "
2561    ))?;
2562    if mode == StoreOpenMode::ReadOnlyQuery {
2563        conn.execute_batch("PRAGMA query_only=ON;")?;
2564    }
2565    Ok(())
2566}
2567
2568fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2569    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2570}
2571
2572fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2573    let status_str: String = row.get(6)?;
2574    Ok(SessionRecord {
2575        id: row.get(0)?,
2576        agent: row.get(1)?,
2577        model: row.get(2)?,
2578        workspace: row.get(3)?,
2579        started_at_ms: row.get::<_, i64>(4)? as u64,
2580        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2581        status: status_from_str(&status_str),
2582        trace_path: row.get(7)?,
2583        start_commit: row.get(8)?,
2584        end_commit: row.get(9)?,
2585        branch: row.get(10)?,
2586        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2587        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2588        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2589        prompt_fingerprint: row.get(14)?,
2590        parent_session_id: row.get(15)?,
2591        agent_version: row.get(16)?,
2592        os: row.get(17)?,
2593        arch: row.get(18)?,
2594        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2595        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2596    })
2597}
2598
2599fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2600    let payload_str: String = row.get(12)?;
2601    Ok(Event {
2602        session_id: row.get(0)?,
2603        seq: row.get::<_, i64>(1)? as u64,
2604        ts_ms: row.get::<_, i64>(2)? as u64,
2605        ts_exact: row.get::<_, i64>(3)? != 0,
2606        kind: kind_from_str(&row.get::<_, String>(4)?),
2607        source: source_from_str(&row.get::<_, String>(5)?),
2608        tool: row.get(6)?,
2609        tool_call_id: row.get(7)?,
2610        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2611        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2612        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2613        cost_usd_e6: row.get(11)?,
2614        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2615        stop_reason: row.get(13)?,
2616        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2617        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2618        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2619        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2620        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2621        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2622        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2623        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2624    })
2625}
2626
2627fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2628    let mut clauses = vec!["workspace = ?".to_string()];
2629    let mut args = vec![Value::Text(workspace.to_string())];
2630    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2631        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2632        args.push(Value::Text(format!("{}%", escape_like(prefix))));
2633    }
2634    if let Some(status) = &filter.status {
2635        clauses.push("status = ?".to_string());
2636        args.push(Value::Text(format!("{status:?}")));
2637    }
2638    if let Some(since_ms) = filter.since_ms {
2639        clauses.push("started_at_ms >= ?".to_string());
2640        args.push(Value::Integer(since_ms as i64));
2641    }
2642    (format!("WHERE {}", clauses.join(" AND ")), args)
2643}
2644
2645fn escape_like(raw: &str) -> String {
2646    raw.to_lowercase()
2647        .replace('\\', "\\\\")
2648        .replace('%', "\\%")
2649        .replace('_', "\\_")
2650}
2651
2652fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2653    let cost: i64 = conn.query_row(
2654        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2655        params![workspace], |r| r.get(0),
2656    )?;
2657    let with_cost: i64 = conn.query_row(
2658        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2659        params![workspace], |r| r.get(0),
2660    )?;
2661    Ok((cost, with_cost as u64))
2662}
2663
2664fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2665    let build_raw: Option<i64> = r.get(4)?;
2666    let ci_raw: Option<i64> = r.get(8)?;
2667    Ok(SessionOutcomeRow {
2668        session_id: r.get(0)?,
2669        test_passed: r.get(1)?,
2670        test_failed: r.get(2)?,
2671        test_skipped: r.get(3)?,
2672        build_ok: build_raw.map(|v| v != 0),
2673        lint_errors: r.get(5)?,
2674        revert_lines_14d: r.get(6)?,
2675        pr_open: r.get(7)?,
2676        ci_ok: ci_raw.map(|v| v != 0),
2677        measured_at_ms: r.get::<_, i64>(9)? as u64,
2678        measure_error: r.get(10)?,
2679    })
2680}
2681
2682fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2683    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2684    let score = r
2685        .get::<_, Option<i64>>(2)?
2686        .and_then(|v| FeedbackScore::new(v as u8));
2687    let label = r
2688        .get::<_, Option<String>>(3)?
2689        .and_then(|s| FeedbackLabel::from_str_opt(&s));
2690    Ok(FeedbackRecord {
2691        id: r.get(0)?,
2692        session_id: r.get(1)?,
2693        score,
2694        label,
2695        note: r.get(4)?,
2696        created_at_ms: r.get::<_, i64>(5)? as u64,
2697    })
2698}
2699
2700fn day_label(day_idx: u64) -> &'static str {
2701    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2702}
2703
2704fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2705    let week_ago = now.saturating_sub(7 * 86_400_000);
2706    let mut stmt = conn
2707        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2708    let days: Vec<u64> = stmt
2709        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2710        .filter_map(|r| r.ok())
2711        .map(|v| v as u64 / 86_400_000)
2712        .collect();
2713    let today = now / 86_400_000;
2714    Ok((0u64..7)
2715        .map(|i| {
2716            let d = today.saturating_sub(6 - i);
2717            (
2718                day_label(d).to_string(),
2719                days.iter().filter(|&&x| x == d).count() as u64,
2720            )
2721        })
2722        .collect())
2723}
2724
2725fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2726    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2727               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2728               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2729               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2730               COUNT(e.id) FROM sessions s \
2731               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2732               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2733    let mut stmt = conn.prepare(sql)?;
2734    let out: Vec<(SessionRecord, u64)> = stmt
2735        .query_map(params![workspace], |r| {
2736            let st: String = r.get(6)?;
2737            Ok((
2738                SessionRecord {
2739                    id: r.get(0)?,
2740                    agent: r.get(1)?,
2741                    model: r.get(2)?,
2742                    workspace: r.get(3)?,
2743                    started_at_ms: r.get::<_, i64>(4)? as u64,
2744                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2745                    status: status_from_str(&st),
2746                    trace_path: r.get(7)?,
2747                    start_commit: r.get(8)?,
2748                    end_commit: r.get(9)?,
2749                    branch: r.get(10)?,
2750                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2751                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2752                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2753                    prompt_fingerprint: r.get(14)?,
2754                    parent_session_id: r.get(15)?,
2755                    agent_version: r.get(16)?,
2756                    os: r.get(17)?,
2757                    arch: r.get(18)?,
2758                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2759                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2760                },
2761                r.get::<_, i64>(21)? as u64,
2762            ))
2763        })?
2764        .filter_map(|r| r.ok())
2765        .collect();
2766    Ok(out)
2767}
2768
2769fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2770    let mut stmt = conn.prepare(
2771        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2772         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2773    )?;
2774    let out: Vec<(String, u64)> = stmt
2775        .query_map(params![workspace], |r| {
2776            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2777        })?
2778        .filter_map(|r| r.ok())
2779        .collect();
2780    Ok(out)
2781}
2782
2783fn status_from_str(s: &str) -> SessionStatus {
2784    match s {
2785        "Running" => SessionStatus::Running,
2786        "Waiting" => SessionStatus::Waiting,
2787        "Idle" => SessionStatus::Idle,
2788        _ => SessionStatus::Done,
2789    }
2790}
2791
2792fn projector_legacy_mode() -> bool {
2793    std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
2794}
2795
2796fn is_stop_event(e: &Event) -> bool {
2797    if !matches!(e.kind, EventKind::Hook) {
2798        return false;
2799    }
2800    e.payload
2801        .get("event")
2802        .and_then(|v| v.as_str())
2803        .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
2804        == Some("Stop")
2805}
2806
2807fn kind_from_str(s: &str) -> EventKind {
2808    match s {
2809        "ToolCall" => EventKind::ToolCall,
2810        "ToolResult" => EventKind::ToolResult,
2811        "Message" => EventKind::Message,
2812        "Error" => EventKind::Error,
2813        "Cost" => EventKind::Cost,
2814        "Hook" => EventKind::Hook,
2815        "Lifecycle" => EventKind::Lifecycle,
2816        _ => EventKind::Hook,
2817    }
2818}
2819
2820fn source_from_str(s: &str) -> EventSource {
2821    match s {
2822        "Tail" => EventSource::Tail,
2823        "Hook" => EventSource::Hook,
2824        _ => EventSource::Proxy,
2825    }
2826}
2827
2828fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2829    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2830    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2831    ensure_column(conn, "sessions", "branch", "TEXT")?;
2832    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2833    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2834    ensure_column(
2835        conn,
2836        "sessions",
2837        "repo_binding_source",
2838        "TEXT NOT NULL DEFAULT ''",
2839    )?;
2840    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2841    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2842    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2843    ensure_column(conn, "events", "stop_reason", "TEXT")?;
2844    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2845    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2846    ensure_column(conn, "events", "retry_count", "INTEGER")?;
2847    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2848    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2849    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2850    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2851    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2852    ensure_column(
2853        conn,
2854        "sync_outbox",
2855        "kind",
2856        "TEXT NOT NULL DEFAULT 'events'",
2857    )?;
2858    ensure_column(
2859        conn,
2860        "experiments",
2861        "state",
2862        "TEXT NOT NULL DEFAULT 'Draft'",
2863    )?;
2864    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2865    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2866    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2867    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2868    ensure_column(conn, "sessions", "os", "TEXT")?;
2869    ensure_column(conn, "sessions", "arch", "TEXT")?;
2870    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2871    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2872    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2873    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2874    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2875    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2876    conn.execute_batch(
2877        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2878         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2879    )?;
2880    Ok(())
2881}
2882
2883fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2884    if has_column(conn, table, column)? {
2885        return Ok(());
2886    }
2887    conn.execute(
2888        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2889        [],
2890    )?;
2891    Ok(())
2892}
2893
2894fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2895    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2896    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2897    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2898}
2899
2900fn bool_to_i64(v: bool) -> i64 {
2901    if v { 1 } else { 0 }
2902}
2903
2904fn i64_to_bool(v: i64) -> bool {
2905    v != 0
2906}
2907
2908fn empty_to_none(s: String) -> Option<String> {
2909    if s.is_empty() { None } else { Some(s) }
2910}
2911
2912#[cfg(test)]
2913mod tests {
2914    use super::*;
2915    use serde_json::json;
2916    use std::collections::HashSet;
2917    use tempfile::TempDir;
2918
2919    fn make_session(id: &str) -> SessionRecord {
2920        SessionRecord {
2921            id: id.to_string(),
2922            agent: "cursor".to_string(),
2923            model: None,
2924            workspace: "/ws".to_string(),
2925            started_at_ms: 1000,
2926            ended_at_ms: None,
2927            status: SessionStatus::Done,
2928            trace_path: "/trace".to_string(),
2929            start_commit: None,
2930            end_commit: None,
2931            branch: None,
2932            dirty_start: None,
2933            dirty_end: None,
2934            repo_binding_source: None,
2935            prompt_fingerprint: None,
2936            parent_session_id: None,
2937            agent_version: None,
2938            os: None,
2939            arch: None,
2940            repo_file_count: None,
2941            repo_total_loc: None,
2942        }
2943    }
2944
2945    fn make_event(session_id: &str, seq: u64) -> Event {
2946        Event {
2947            session_id: session_id.to_string(),
2948            seq,
2949            ts_ms: 1000 + seq * 100,
2950            ts_exact: false,
2951            kind: EventKind::ToolCall,
2952            source: EventSource::Tail,
2953            tool: Some("read_file".to_string()),
2954            tool_call_id: Some(format!("call_{seq}")),
2955            tokens_in: None,
2956            tokens_out: None,
2957            reasoning_tokens: None,
2958            cost_usd_e6: None,
2959            stop_reason: None,
2960            latency_ms: None,
2961            ttft_ms: None,
2962            retry_count: None,
2963            context_used_tokens: None,
2964            context_max_tokens: None,
2965            cache_creation_tokens: None,
2966            cache_read_tokens: None,
2967            system_prompt_tokens: None,
2968            payload: json!({}),
2969        }
2970    }
2971
2972    #[test]
2973    fn open_and_wal_mode() {
2974        let dir = TempDir::new().unwrap();
2975        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2976        let mode: String = store
2977            .conn
2978            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2979            .unwrap();
2980        assert_eq!(mode, "wal");
2981    }
2982
2983    #[test]
2984    fn open_applies_phase0_pragmas() {
2985        let dir = TempDir::new().unwrap();
2986        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2987        let synchronous: i64 = store
2988            .conn
2989            .query_row("PRAGMA synchronous", [], |r| r.get(0))
2990            .unwrap();
2991        let cache_size: i64 = store
2992            .conn
2993            .query_row("PRAGMA cache_size", [], |r| r.get(0))
2994            .unwrap();
2995        let temp_store: i64 = store
2996            .conn
2997            .query_row("PRAGMA temp_store", [], |r| r.get(0))
2998            .unwrap();
2999        let wal_autocheckpoint: i64 = store
3000            .conn
3001            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3002            .unwrap();
3003        assert_eq!(synchronous, 1);
3004        assert_eq!(cache_size, -65_536);
3005        assert_eq!(temp_store, 2);
3006        assert_eq!(wal_autocheckpoint, 1_000);
3007        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3008    }
3009
3010    #[test]
3011    fn read_only_open_sets_query_only() {
3012        let dir = TempDir::new().unwrap();
3013        let db = dir.path().join("kaizen.db");
3014        Store::open(&db).unwrap();
3015        let store = Store::open_read_only(&db).unwrap();
3016        let query_only: i64 = store
3017            .conn
3018            .query_row("PRAGMA query_only", [], |r| r.get(0))
3019            .unwrap();
3020        assert_eq!(query_only, 1);
3021    }
3022
3023    #[test]
3024    fn phase0_indexes_exist() {
3025        let dir = TempDir::new().unwrap();
3026        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3027        for name in [
3028            "tool_spans_session_idx",
3029            "tool_spans_started_idx",
3030            "session_samples_ts_idx",
3031            "events_ts_idx",
3032            "feedback_session_idx",
3033        ] {
3034            let found: i64 = store
3035                .conn
3036                .query_row(
3037                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3038                    params![name],
3039                    |r| r.get(0),
3040                )
3041                .unwrap();
3042            assert_eq!(found, 1, "{name}");
3043        }
3044    }
3045
3046    #[test]
3047    fn upsert_and_get_session() {
3048        let dir = TempDir::new().unwrap();
3049        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3050        let s = make_session("s1");
3051        store.upsert_session(&s).unwrap();
3052
3053        let got = store.get_session("s1").unwrap().unwrap();
3054        assert_eq!(got.id, "s1");
3055        assert_eq!(got.status, SessionStatus::Done);
3056    }
3057
3058    #[test]
3059    fn append_and_list_events_round_trip() {
3060        let dir = TempDir::new().unwrap();
3061        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3062        let s = make_session("s2");
3063        store.upsert_session(&s).unwrap();
3064        store.append_event(&make_event("s2", 0)).unwrap();
3065        store.append_event(&make_event("s2", 1)).unwrap();
3066
3067        let sessions = store.list_sessions("/ws").unwrap();
3068        assert_eq!(sessions.len(), 1);
3069        assert_eq!(sessions[0].id, "s2");
3070    }
3071
3072    #[test]
3073    fn list_sessions_page_orders_and_counts() {
3074        let dir = TempDir::new().unwrap();
3075        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3076        let mut a = make_session("a");
3077        a.started_at_ms = 2_000;
3078        let mut b = make_session("b");
3079        b.started_at_ms = 2_000;
3080        let mut c = make_session("c");
3081        c.started_at_ms = 1_000;
3082        store.upsert_session(&c).unwrap();
3083        store.upsert_session(&b).unwrap();
3084        store.upsert_session(&a).unwrap();
3085
3086        let page = store
3087            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3088            .unwrap();
3089        assert_eq!(page.total, 3);
3090        assert_eq!(page.next_offset, Some(2));
3091        assert_eq!(
3092            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3093            vec!["a", "b"]
3094        );
3095
3096        let all = store.list_sessions("/ws").unwrap();
3097        assert_eq!(
3098            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3099            vec!["a", "b", "c"]
3100        );
3101    }
3102
3103    #[test]
3104    fn list_sessions_page_filters_in_sql_shape() {
3105        let dir = TempDir::new().unwrap();
3106        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3107        let mut cursor = make_session("cursor");
3108        cursor.agent = "Cursor".into();
3109        cursor.started_at_ms = 2_000;
3110        cursor.status = SessionStatus::Running;
3111        let mut claude = make_session("claude");
3112        claude.agent = "claude".into();
3113        claude.started_at_ms = 3_000;
3114        store.upsert_session(&cursor).unwrap();
3115        store.upsert_session(&claude).unwrap();
3116
3117        let page = store
3118            .list_sessions_page(
3119                "/ws",
3120                0,
3121                10,
3122                SessionFilter {
3123                    agent_prefix: Some("cur".into()),
3124                    status: Some(SessionStatus::Running),
3125                    since_ms: Some(1_500),
3126                },
3127            )
3128            .unwrap();
3129        assert_eq!(page.total, 1);
3130        assert_eq!(page.rows[0].id, "cursor");
3131    }
3132
3133    #[test]
3134    fn incremental_session_helpers_find_new_rows_and_statuses() {
3135        let dir = TempDir::new().unwrap();
3136        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3137        let mut old = make_session("old");
3138        old.started_at_ms = 1_000;
3139        let mut new = make_session("new");
3140        new.started_at_ms = 2_000;
3141        new.status = SessionStatus::Running;
3142        store.upsert_session(&old).unwrap();
3143        store.upsert_session(&new).unwrap();
3144
3145        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3146        assert_eq!(rows.len(), 1);
3147        assert_eq!(rows[0].id, "new");
3148
3149        store
3150            .update_session_status("new", SessionStatus::Done)
3151            .unwrap();
3152        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3153        assert_eq!(statuses.len(), 1);
3154        assert_eq!(statuses[0].status, SessionStatus::Done);
3155    }
3156
3157    #[test]
3158    fn summary_stats_empty() {
3159        let dir = TempDir::new().unwrap();
3160        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3161        let stats = store.summary_stats("/ws").unwrap();
3162        assert_eq!(stats.session_count, 0);
3163        assert_eq!(stats.total_cost_usd_e6, 0);
3164    }
3165
3166    #[test]
3167    fn summary_stats_counts_sessions() {
3168        let dir = TempDir::new().unwrap();
3169        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3170        store.upsert_session(&make_session("a")).unwrap();
3171        store.upsert_session(&make_session("b")).unwrap();
3172        let stats = store.summary_stats("/ws").unwrap();
3173        assert_eq!(stats.session_count, 2);
3174        assert_eq!(stats.by_agent.len(), 1);
3175        assert_eq!(stats.by_agent[0].0, "cursor");
3176        assert_eq!(stats.by_agent[0].1, 2);
3177    }
3178
3179    #[test]
3180    fn list_events_for_session_round_trip() {
3181        let dir = TempDir::new().unwrap();
3182        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3183        store.upsert_session(&make_session("s4")).unwrap();
3184        store.append_event(&make_event("s4", 0)).unwrap();
3185        store.append_event(&make_event("s4", 1)).unwrap();
3186        let events = store.list_events_for_session("s4").unwrap();
3187        assert_eq!(events.len(), 2);
3188        assert_eq!(events[0].seq, 0);
3189        assert_eq!(events[1].seq, 1);
3190    }
3191
3192    #[test]
3193    fn list_events_page_uses_inclusive_seq_cursor() {
3194        let dir = TempDir::new().unwrap();
3195        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3196        store.upsert_session(&make_session("paged")).unwrap();
3197        for seq in 0..5 {
3198            store.append_event(&make_event("paged", seq)).unwrap();
3199        }
3200        let first = store.list_events_page("paged", 0, 2).unwrap();
3201        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3202        let second = store
3203            .list_events_page("paged", first[1].seq + 1, 2)
3204            .unwrap();
3205        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3206    }
3207
3208    #[test]
3209    fn append_event_dedup() {
3210        let dir = TempDir::new().unwrap();
3211        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3212        store.upsert_session(&make_session("s5")).unwrap();
3213        store.append_event(&make_event("s5", 0)).unwrap();
3214        // Duplicate — should be silently ignored
3215        store.append_event(&make_event("s5", 0)).unwrap();
3216        let events = store.list_events_for_session("s5").unwrap();
3217        assert_eq!(events.len(), 1);
3218    }
3219
3220    #[test]
3221    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3222        let dir = TempDir::new().unwrap();
3223        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3224        assert!(store.session_span_tree("missing").unwrap().is_empty());
3225        assert!(store.span_tree_cache.borrow().is_some());
3226
3227        store.upsert_session(&make_session("tree")).unwrap();
3228        let call = make_event("tree", 0);
3229        store.append_event(&call).unwrap();
3230        assert!(store.span_tree_cache.borrow().is_none());
3231        assert!(store.session_span_tree("tree").unwrap().is_empty());
3232        assert!(store.span_tree_cache.borrow().is_some());
3233        let mut result = make_event("tree", 1);
3234        result.kind = EventKind::ToolResult;
3235        result.tool_call_id = call.tool_call_id.clone();
3236        store.append_event(&result).unwrap();
3237        assert!(store.span_tree_cache.borrow().is_none());
3238        let first = store.session_span_tree("tree").unwrap();
3239        assert_eq!(first.len(), 1);
3240        assert!(store.span_tree_cache.borrow().is_some());
3241        store.append_event(&make_event("tree", 2)).unwrap();
3242        assert!(store.span_tree_cache.borrow().is_none());
3243    }
3244
3245    #[test]
3246    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3247        let dir = TempDir::new().unwrap();
3248        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3249        store.upsert_session(&make_session("spans")).unwrap();
3250        for (id, started, ended) in [
3251            ("started", Some(200_i64), None),
3252            ("fallback", None, Some(250_i64)),
3253            ("outside", Some(400_i64), None),
3254            ("too_old", None, Some(50_i64)),
3255            ("started_wins", Some(500_i64), Some(200_i64)),
3256        ] {
3257            store
3258                .conn
3259                .execute(
3260                    "INSERT INTO tool_spans
3261                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3262                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3263                    params![id, started, ended],
3264                )
3265                .unwrap();
3266        }
3267        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3268        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3269        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3270    }
3271
3272    #[test]
3273    fn upsert_idempotent() {
3274        let dir = TempDir::new().unwrap();
3275        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3276        let mut s = make_session("s3");
3277        store.upsert_session(&s).unwrap();
3278        s.status = SessionStatus::Running;
3279        store.upsert_session(&s).unwrap();
3280
3281        let got = store.get_session("s3").unwrap().unwrap();
3282        assert_eq!(got.status, SessionStatus::Running);
3283    }
3284
3285    #[test]
3286    fn append_event_indexes_path_from_payload() {
3287        let dir = TempDir::new().unwrap();
3288        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3289        store.upsert_session(&make_session("sx")).unwrap();
3290        let mut ev = make_event("sx", 0);
3291        ev.payload = json!({"input": {"path": "src/lib.rs"}});
3292        store.append_event(&ev).unwrap();
3293        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3294        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3295    }
3296
3297    #[test]
3298    fn update_session_status_changes_status() {
3299        let dir = TempDir::new().unwrap();
3300        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3301        store.upsert_session(&make_session("s6")).unwrap();
3302        store
3303            .update_session_status("s6", SessionStatus::Running)
3304            .unwrap();
3305        let got = store.get_session("s6").unwrap().unwrap();
3306        assert_eq!(got.status, SessionStatus::Running);
3307    }
3308
3309    #[test]
3310    fn prune_sessions_removes_old_rows_and_keeps_recent() {
3311        let dir = TempDir::new().unwrap();
3312        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3313        let mut old = make_session("old");
3314        old.started_at_ms = 1_000;
3315        let mut new = make_session("new");
3316        new.started_at_ms = 9_000_000_000_000;
3317        store.upsert_session(&old).unwrap();
3318        store.upsert_session(&new).unwrap();
3319        store.append_event(&make_event("old", 0)).unwrap();
3320
3321        let stats = store.prune_sessions_started_before(5_000).unwrap();
3322        assert_eq!(
3323            stats,
3324            PruneStats {
3325                sessions_removed: 1,
3326                events_removed: 1,
3327            }
3328        );
3329        assert!(store.get_session("old").unwrap().is_none());
3330        assert!(store.get_session("new").unwrap().is_some());
3331        let sessions = store.list_sessions("/ws").unwrap();
3332        assert_eq!(sessions.len(), 1);
3333        assert_eq!(sessions[0].id, "new");
3334    }
3335
3336    #[test]
3337    fn append_event_indexes_rules_from_payload() {
3338        let dir = TempDir::new().unwrap();
3339        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3340        store.upsert_session(&make_session("sr")).unwrap();
3341        let mut ev = make_event("sr", 0);
3342        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3343        store.append_event(&ev).unwrap();
3344        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3345        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3346    }
3347
3348    #[test]
3349    fn guidance_report_counts_skill_and_rule_sessions() {
3350        let dir = TempDir::new().unwrap();
3351        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3352        store.upsert_session(&make_session("sx")).unwrap();
3353        let mut ev = make_event("sx", 0);
3354        ev.payload =
3355            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3356        ev.cost_usd_e6 = Some(500_000);
3357        store.append_event(&ev).unwrap();
3358
3359        let mut skill_slugs = HashSet::new();
3360        skill_slugs.insert("tdd".into());
3361        let mut rule_slugs = HashSet::new();
3362        rule_slugs.insert("style".into());
3363
3364        let rep = store
3365            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3366            .unwrap();
3367        assert_eq!(rep.sessions_in_window, 1);
3368        let tdd = rep
3369            .rows
3370            .iter()
3371            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3372            .unwrap();
3373        assert_eq!(tdd.sessions, 1);
3374        assert!(tdd.on_disk);
3375        let style = rep
3376            .rows
3377            .iter()
3378            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3379            .unwrap();
3380        assert_eq!(style.sessions, 1);
3381        assert!(style.on_disk);
3382    }
3383
3384    #[test]
3385    fn prune_sessions_removes_rules_used_rows() {
3386        let dir = TempDir::new().unwrap();
3387        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3388        let mut old = make_session("old_r");
3389        old.started_at_ms = 1_000;
3390        store.upsert_session(&old).unwrap();
3391        let mut ev = make_event("old_r", 0);
3392        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3393        store.append_event(&ev).unwrap();
3394
3395        store.prune_sessions_started_before(5_000).unwrap();
3396        let n: i64 = store
3397            .conn
3398            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3399            .unwrap();
3400        assert_eq!(n, 0);
3401    }
3402}