Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{
7    FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
8};
9use crate::store::event_index::index_event_derived;
10use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
11use crate::store::tool_span_index::{
12    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
13};
14use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
15use crate::sync::context::SyncIngestContext;
16use crate::sync::outbound::outbound_event_from_row;
17use crate::sync::redact::redact_payload;
18use crate::sync::smart::enqueue_tool_spans_for_session;
19use anyhow::{Context, Result};
20use rusqlite::types::Value;
21use rusqlite::{
22    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
23};
24use std::cell::RefCell;
25use std::collections::{HashMap, HashSet};
26use std::path::{Path, PathBuf};
27
28/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
29/// Rows below this use `sessions.started_at_ms` for time-window matching.
30const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
31const DEFAULT_MMAP_MB: u64 = 256;
32const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
33    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
34    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
35    repo_file_count, repo_total_loc FROM sessions";
36const PAIN_HOTSPOTS_SQL: &str = "
37    SELECT f.path,
38           COUNT(s.id) * f.complexity_total AS value,
39           f.complexity_total,
40           f.churn_30d
41    FROM file_facts f
42    LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
43    LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
44       AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
45         OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
46    LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
47    WHERE f.snapshot_id = ?1
48    GROUP BY f.path, f.complexity_total, f.churn_30d
49    ORDER BY value DESC, f.path ASC
50    LIMIT 10";
51const TOOL_RANK_ROWS_SQL: &str = "
52    WITH scoped AS (
53      SELECT COALESCE(ts.tool, 'unknown') AS tool,
54             ts.lead_time_ms,
55             COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
56                 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
57             COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
58      FROM tool_spans ts
59      JOIN sessions s ON s.id = ts.session_id
60      WHERE s.workspace = ?1
61        AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
62          OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
63    ),
64    agg AS (
65      SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
66             SUM(reasoning_tokens) AS total_reasoning_tokens
67      FROM scoped GROUP BY tool
68    ),
69    lat AS (
70      SELECT tool, lead_time_ms,
71             ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
72             COUNT(*) OVER (PARTITION BY tool) AS n
73      FROM scoped WHERE lead_time_ms IS NOT NULL
74    ),
75    pct AS (
76      SELECT tool,
77             MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
78             MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
79      FROM lat GROUP BY tool
80    )
81    SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
82           agg.total_tokens, agg.total_reasoning_tokens
83    FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
84
85const MIGRATIONS: &[&str] = &[
86    "CREATE TABLE IF NOT EXISTS sessions (
87        id TEXT PRIMARY KEY,
88        agent TEXT NOT NULL,
89        model TEXT,
90        workspace TEXT NOT NULL,
91        started_at_ms INTEGER NOT NULL,
92        ended_at_ms INTEGER,
93        status TEXT NOT NULL,
94        trace_path TEXT NOT NULL
95    )",
96    "CREATE TABLE IF NOT EXISTS events (
97        id INTEGER PRIMARY KEY AUTOINCREMENT,
98        session_id TEXT NOT NULL,
99        seq INTEGER NOT NULL,
100        ts_ms INTEGER NOT NULL,
101        kind TEXT NOT NULL,
102        source TEXT NOT NULL,
103        tool TEXT,
104        tokens_in INTEGER,
105        tokens_out INTEGER,
106        cost_usd_e6 INTEGER,
107        payload TEXT NOT NULL
108    )",
109    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
110    "CREATE TABLE IF NOT EXISTS files_touched (
111        id INTEGER PRIMARY KEY AUTOINCREMENT,
112        session_id TEXT NOT NULL,
113        path TEXT NOT NULL
114    )",
115    "CREATE TABLE IF NOT EXISTS skills_used (
116        id INTEGER PRIMARY KEY AUTOINCREMENT,
117        session_id TEXT NOT NULL,
118        skill TEXT NOT NULL
119    )",
120    "CREATE TABLE IF NOT EXISTS sync_outbox (
121        id INTEGER PRIMARY KEY AUTOINCREMENT,
122        session_id TEXT NOT NULL,
123        payload TEXT NOT NULL,
124        sent INTEGER NOT NULL DEFAULT 0
125    )",
126    "CREATE TABLE IF NOT EXISTS experiments (
127        id TEXT PRIMARY KEY,
128        name TEXT NOT NULL,
129        created_at_ms INTEGER NOT NULL,
130        metadata TEXT NOT NULL DEFAULT '{}'
131    )",
132    "CREATE TABLE IF NOT EXISTS experiment_tags (
133        experiment_id TEXT NOT NULL,
134        session_id TEXT NOT NULL,
135        variant TEXT NOT NULL,
136        PRIMARY KEY (experiment_id, session_id)
137    )",
138    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
139    "CREATE TABLE IF NOT EXISTS sync_state (
140        k TEXT PRIMARY KEY,
141        v TEXT NOT NULL
142    )",
143    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
144    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
145    "CREATE TABLE IF NOT EXISTS tool_spans (
146        span_id TEXT PRIMARY KEY,
147        session_id TEXT NOT NULL,
148        tool TEXT,
149        tool_call_id TEXT,
150        status TEXT NOT NULL,
151        started_at_ms INTEGER,
152        ended_at_ms INTEGER,
153        lead_time_ms INTEGER,
154        tokens_in INTEGER,
155        tokens_out INTEGER,
156        reasoning_tokens INTEGER,
157        cost_usd_e6 INTEGER,
158        paths_json TEXT NOT NULL DEFAULT '[]'
159    )",
160    "CREATE TABLE IF NOT EXISTS tool_span_paths (
161        span_id TEXT NOT NULL,
162        path TEXT NOT NULL,
163        PRIMARY KEY (span_id, path)
164    )",
165    "CREATE TABLE IF NOT EXISTS session_repo_binding (
166        session_id TEXT PRIMARY KEY,
167        start_commit TEXT,
168        end_commit TEXT,
169        branch TEXT,
170        dirty_start INTEGER,
171        dirty_end INTEGER,
172        repo_binding_source TEXT NOT NULL DEFAULT ''
173    )",
174    "CREATE TABLE IF NOT EXISTS repo_snapshots (
175        id TEXT PRIMARY KEY,
176        workspace TEXT NOT NULL,
177        head_commit TEXT,
178        dirty_fingerprint TEXT NOT NULL,
179        analyzer_version TEXT NOT NULL,
180        indexed_at_ms INTEGER NOT NULL,
181        dirty INTEGER NOT NULL DEFAULT 0,
182        graph_path TEXT NOT NULL
183    )",
184    "CREATE TABLE IF NOT EXISTS file_facts (
185        snapshot_id TEXT NOT NULL,
186        path TEXT NOT NULL,
187        language TEXT NOT NULL,
188        bytes INTEGER NOT NULL,
189        loc INTEGER NOT NULL,
190        sloc INTEGER NOT NULL,
191        complexity_total INTEGER NOT NULL,
192        max_fn_complexity INTEGER NOT NULL,
193        symbol_count INTEGER NOT NULL,
194        import_count INTEGER NOT NULL,
195        fan_in INTEGER NOT NULL,
196        fan_out INTEGER NOT NULL,
197        churn_30d INTEGER NOT NULL,
198        churn_90d INTEGER NOT NULL,
199        authors_90d INTEGER NOT NULL,
200        last_changed_ms INTEGER,
201        PRIMARY KEY (snapshot_id, path)
202    )",
203    "CREATE TABLE IF NOT EXISTS repo_edges (
204        snapshot_id TEXT NOT NULL,
205        from_id TEXT NOT NULL,
206        to_id TEXT NOT NULL,
207        kind TEXT NOT NULL,
208        weight INTEGER NOT NULL,
209        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
210    )",
211    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
212    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
213    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
214    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
215    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
216        ON sessions(workspace, started_at_ms DESC, id ASC)",
217    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
218        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
219    "CREATE TABLE IF NOT EXISTS rules_used (
220        id INTEGER PRIMARY KEY AUTOINCREMENT,
221        session_id TEXT NOT NULL,
222        rule TEXT NOT NULL
223    )",
224    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
225    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
226    "CREATE TABLE IF NOT EXISTS remote_pull_state (
227        id INTEGER PRIMARY KEY CHECK (id = 1),
228        query_provider TEXT NOT NULL DEFAULT 'none',
229        cursor_json TEXT NOT NULL DEFAULT '',
230        last_success_ms INTEGER
231    )",
232    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
233    "CREATE TABLE IF NOT EXISTS remote_sessions (
234        team_id TEXT NOT NULL,
235        workspace_hash TEXT NOT NULL,
236        session_id_hash TEXT NOT NULL,
237        json TEXT NOT NULL,
238        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
239    )",
240    "CREATE TABLE IF NOT EXISTS remote_events (
241        team_id TEXT NOT NULL,
242        workspace_hash TEXT NOT NULL,
243        session_id_hash TEXT NOT NULL,
244        event_seq INTEGER NOT NULL,
245        json TEXT NOT NULL,
246        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
247    )",
248    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
249        team_id TEXT NOT NULL,
250        workspace_hash TEXT NOT NULL,
251        span_id_hash TEXT NOT NULL,
252        json TEXT NOT NULL,
253        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
254    )",
255    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
256        team_id TEXT NOT NULL,
257        workspace_hash TEXT NOT NULL,
258        snapshot_id_hash TEXT NOT NULL,
259        chunk_index INTEGER NOT NULL,
260        json TEXT NOT NULL,
261        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
262    )",
263    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
264        team_id TEXT NOT NULL,
265        workspace_hash TEXT NOT NULL,
266        fact_key TEXT NOT NULL,
267        json TEXT NOT NULL,
268        PRIMARY KEY (team_id, workspace_hash, fact_key)
269    )",
270    "CREATE TABLE IF NOT EXISTS session_evals (
271        id            TEXT    PRIMARY KEY,
272        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
273        judge_model   TEXT    NOT NULL,
274        rubric_id     TEXT    NOT NULL,
275        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
276        rationale     TEXT    NOT NULL,
277        flagged       INTEGER NOT NULL DEFAULT 0,
278        created_at_ms INTEGER NOT NULL
279    );
280    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
281    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
282    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
283        fingerprint   TEXT    PRIMARY KEY,
284        captured_at_ms INTEGER NOT NULL,
285        files_json    TEXT    NOT NULL,
286        total_bytes   INTEGER NOT NULL
287    )",
288    "CREATE TABLE IF NOT EXISTS session_feedback (
289        id TEXT PRIMARY KEY,
290        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
291        score INTEGER CHECK(score BETWEEN 1 AND 5),
292        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
293        note TEXT,
294        created_at_ms INTEGER NOT NULL
295    );
296    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
297    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
298    "CREATE TABLE IF NOT EXISTS session_outcomes (
299        session_id TEXT PRIMARY KEY NOT NULL,
300        test_passed INTEGER,
301        test_failed INTEGER,
302        test_skipped INTEGER,
303        build_ok INTEGER,
304        lint_errors INTEGER,
305        revert_lines_14d INTEGER,
306        pr_open INTEGER,
307        ci_ok INTEGER,
308        measured_at_ms INTEGER NOT NULL,
309        measure_error TEXT
310    )",
311    "CREATE TABLE IF NOT EXISTS session_samples (
312        session_id TEXT NOT NULL,
313        ts_ms INTEGER NOT NULL,
314        pid INTEGER NOT NULL,
315        cpu_percent REAL,
316        rss_bytes INTEGER,
317        PRIMARY KEY (session_id, ts_ms, pid)
318    )",
319    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
320    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
321    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
322    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
323    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
324    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
325    "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
326    "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
327    "CREATE INDEX IF NOT EXISTS events_tool_ts_session_seq_idx ON events(tool, ts_ms DESC, session_id, seq)",
328    "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
329    "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
330    "CREATE INDEX IF NOT EXISTS tool_span_paths_path_idx ON tool_span_paths(path, span_id)",
331    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
332];
333
334/// Per-workspace activity dashboard stats.
335#[derive(Clone)]
336pub struct InsightsStats {
337    pub total_sessions: u64,
338    pub running_sessions: u64,
339    pub total_events: u64,
340    /// (day label e.g. "Mon", count) last 7 days oldest first
341    pub sessions_by_day: Vec<(String, u64)>,
342    /// Recent sessions DESC by started_at, max 3; paired with event count
343    pub recent: Vec<(SessionRecord, u64)>,
344    /// Top tools by event count, max 5
345    pub top_tools: Vec<(String, u64)>,
346    pub total_cost_usd_e6: i64,
347    pub sessions_with_cost: u64,
348}
349
350/// Sync daemon / outbox status for `kaizen sync status`.
351pub struct SyncStatusSnapshot {
352    pub pending_outbox: u64,
353    pub last_success_ms: Option<u64>,
354    pub last_error: Option<String>,
355    pub consecutive_failures: u32,
356}
357
358/// Aggregate stats across sessions + events for a workspace.
359#[derive(serde::Serialize)]
360pub struct SummaryStats {
361    pub session_count: u64,
362    pub total_cost_usd_e6: i64,
363    pub by_agent: Vec<(String, u64)>,
364    pub by_model: Vec<(String, u64)>,
365    pub top_tools: Vec<(String, u64)>,
366}
367
368/// Skill vs Cursor rule for [`GuidancePerfRow`].
369#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
370#[serde(rename_all = "lowercase")]
371pub enum GuidanceKind {
372    Skill,
373    Rule,
374}
375
376/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
377#[derive(Clone, Debug, serde::Serialize)]
378pub struct GuidancePerfRow {
379    pub kind: GuidanceKind,
380    pub id: String,
381    pub sessions: u64,
382    pub sessions_pct: f64,
383    pub total_cost_usd_e6: i64,
384    pub avg_cost_per_session_usd: Option<f64>,
385    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
386    pub on_disk: bool,
387}
388
389/// Aggregated skill/rule adoption and cost proxy for a time window.
390#[derive(Clone, Debug, serde::Serialize)]
391pub struct GuidanceReport {
392    pub workspace: String,
393    pub window_start_ms: u64,
394    pub window_end_ms: u64,
395    pub sessions_in_window: u64,
396    pub workspace_avg_cost_per_session_usd: Option<f64>,
397    pub rows: Vec<GuidancePerfRow>,
398}
399
400/// Result of [`Store::prune_sessions_started_before`].
401#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
402pub struct PruneStats {
403    pub sessions_removed: u64,
404    pub events_removed: u64,
405}
406
407/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
408#[derive(Debug, Clone, Eq, PartialEq)]
409pub struct SessionOutcomeRow {
410    pub session_id: String,
411    pub test_passed: Option<i64>,
412    pub test_failed: Option<i64>,
413    pub test_skipped: Option<i64>,
414    pub build_ok: Option<bool>,
415    pub lint_errors: Option<i64>,
416    pub revert_lines_14d: Option<i64>,
417    pub pr_open: Option<i64>,
418    pub ci_ok: Option<bool>,
419    pub measured_at_ms: u64,
420    pub measure_error: Option<String>,
421}
422
423/// Aggregated process samples for retro (Tier D).
424#[derive(Debug, Clone)]
425pub struct SessionSampleAgg {
426    pub session_id: String,
427    pub sample_count: u64,
428    pub max_cpu_percent: f64,
429    pub max_rss_bytes: u64,
430}
431
432/// `sync_state` keys for agent rescan throttling and auto-prune.
433pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
434pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
435pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
436
437pub struct ToolSpanSyncRow {
438    pub span_id: String,
439    pub session_id: String,
440    pub tool: Option<String>,
441    pub tool_call_id: Option<String>,
442    pub status: String,
443    pub started_at_ms: Option<u64>,
444    pub ended_at_ms: Option<u64>,
445    pub lead_time_ms: Option<u64>,
446    pub tokens_in: Option<u32>,
447    pub tokens_out: Option<u32>,
448    pub reasoning_tokens: Option<u32>,
449    pub cost_usd_e6: Option<i64>,
450    pub paths: Vec<String>,
451}
452
453#[derive(Debug, Clone, Copy, Eq, PartialEq)]
454pub enum StoreOpenMode {
455    ReadWrite,
456    ReadOnlyQuery,
457}
458
459#[derive(Debug, Clone)]
460pub struct SessionStatusRow {
461    pub id: String,
462    pub status: SessionStatus,
463    pub ended_at_ms: Option<u64>,
464}
465
466#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
467pub struct SessionFilter {
468    pub agent_prefix: Option<String>,
469    pub status: Option<SessionStatus>,
470    pub since_ms: Option<u64>,
471}
472
473#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
474pub struct SessionPage {
475    pub rows: Vec<SessionRecord>,
476    pub total: usize,
477    pub next_offset: Option<usize>,
478}
479
480#[derive(Clone)]
481struct SpanTreeCacheEntry {
482    session_id: String,
483    last_event_seq: Option<u64>,
484    nodes: Vec<crate::store::span_tree::SpanNode>,
485}
486
487pub struct Store {
488    conn: Connection,
489    root: PathBuf,
490    hot_log: RefCell<Option<HotLog>>,
491    search_writer: RefCell<Option<crate::search::PendingWriter>>,
492    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
493    projector: RefCell<Projector>,
494}
495
496impl Store {
497    pub(crate) fn conn(&self) -> &Connection {
498        &self.conn
499    }
500
501    pub fn open(path: &Path) -> Result<Self> {
502        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
503    }
504
505    pub fn open_read_only(path: &Path) -> Result<Self> {
506        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
507    }
508
509    pub fn open_query(path: &Path) -> Result<Self> {
510        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
511    }
512
513    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
514        if let Some(parent) = path.parent() {
515            std::fs::create_dir_all(parent)?;
516        }
517        let conn = match mode {
518            StoreOpenMode::ReadWrite => Connection::open(path),
519            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
520                path,
521                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
522            ),
523        }
524        .with_context(|| format!("open db: {}", path.display()))?;
525        apply_pragmas(&conn, mode)?;
526        if mode == StoreOpenMode::ReadWrite {
527            for sql in MIGRATIONS {
528                conn.execute_batch(sql)?;
529            }
530            ensure_schema_columns(&conn)?;
531        }
532        let store = Self {
533            conn,
534            root: path
535                .parent()
536                .unwrap_or_else(|| Path::new("."))
537                .to_path_buf(),
538            hot_log: RefCell::new(None),
539            search_writer: RefCell::new(None),
540            span_tree_cache: RefCell::new(None),
541            projector: RefCell::new(Projector::default()),
542        };
543        if mode == StoreOpenMode::ReadWrite {
544            store.warm_projector()?;
545        }
546        Ok(store)
547    }
548
549    fn invalidate_span_tree_cache(&self) {
550        *self.span_tree_cache.borrow_mut() = None;
551    }
552
553    fn warm_projector(&self) -> Result<()> {
554        let ids = self.running_session_ids()?;
555        let mut projector = self.projector.borrow_mut();
556        for id in ids {
557            for event in self.list_events_for_session(&id)? {
558                let _ = projector.apply(&event);
559            }
560        }
561        Ok(())
562    }
563
564    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
565        self.conn.execute(
566            "INSERT INTO sessions (
567                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
568                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
569                prompt_fingerprint, parent_session_id, agent_version, os, arch,
570                repo_file_count, repo_total_loc
571             )
572             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
573                ?16, ?17, ?18, ?19, ?20, ?21)
574             ON CONFLICT(id) DO UPDATE SET
575               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
576               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
577               status=excluded.status, trace_path=excluded.trace_path,
578               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
579               branch=excluded.branch, dirty_start=excluded.dirty_start,
580               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
581               prompt_fingerprint=excluded.prompt_fingerprint,
582               parent_session_id=excluded.parent_session_id,
583               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
584               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
585            params![
586                s.id,
587                s.agent,
588                s.model,
589                s.workspace,
590                s.started_at_ms as i64,
591                s.ended_at_ms.map(|v| v as i64),
592                format!("{:?}", s.status),
593                s.trace_path,
594                s.start_commit,
595                s.end_commit,
596                s.branch,
597                s.dirty_start.map(bool_to_i64),
598                s.dirty_end.map(bool_to_i64),
599                s.repo_binding_source.clone().unwrap_or_default(),
600                s.prompt_fingerprint.as_deref(),
601                s.parent_session_id.as_deref(),
602                s.agent_version.as_deref(),
603                s.os.as_deref(),
604                s.arch.as_deref(),
605                s.repo_file_count.map(|v| v as i64),
606                s.repo_total_loc.map(|v| v as i64),
607            ],
608        )?;
609        self.conn.execute(
610            "INSERT INTO session_repo_binding (
611                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
612             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
613             ON CONFLICT(session_id) DO UPDATE SET
614                start_commit=excluded.start_commit,
615                end_commit=excluded.end_commit,
616                branch=excluded.branch,
617                dirty_start=excluded.dirty_start,
618                dirty_end=excluded.dirty_end,
619                repo_binding_source=excluded.repo_binding_source",
620            params![
621                s.id,
622                s.start_commit,
623                s.end_commit,
624                s.branch,
625                s.dirty_start.map(bool_to_i64),
626                s.dirty_end.map(bool_to_i64),
627                s.repo_binding_source.clone().unwrap_or_default(),
628            ],
629        )?;
630        Ok(())
631    }
632
633    /// Insert a minimal session row if none exists. Used by hook ingestion when
634    /// the first observed event is not `SessionStart` (hooks installed mid-session).
635    pub fn ensure_session_stub(
636        &self,
637        id: &str,
638        agent: &str,
639        workspace: &str,
640        started_at_ms: u64,
641    ) -> Result<()> {
642        self.conn.execute(
643            "INSERT OR IGNORE INTO sessions (
644                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
645                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
646                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
647             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
648                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
649            params![id, agent, workspace, started_at_ms as i64],
650        )?;
651        Ok(())
652    }
653
654    /// Next `seq` for a new event in this session (0 when there are no events yet).
655    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
656        let n: i64 = self.conn.query_row(
657            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
658            [session_id],
659            |r| r.get(0),
660        )?;
661        Ok(n as u64)
662    }
663
664    pub fn append_event(&self, e: &Event) -> Result<()> {
665        self.append_event_with_sync(e, None)
666    }
667
668    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
669    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
670        let last_before = if projector_legacy_mode() {
671            None
672        } else {
673            self.last_event_seq_for_session(&e.session_id)?
674        };
675        let payload = serde_json::to_string(&e.payload)?;
676        self.conn.execute(
677            "INSERT INTO events (
678                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
679                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
680                stop_reason, latency_ms, ttft_ms, retry_count,
681                context_used_tokens, context_max_tokens,
682                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
683             ) VALUES (
684                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
685                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
686             )
687             ON CONFLICT(session_id, seq) DO UPDATE SET
688                ts_ms = excluded.ts_ms,
689                ts_exact = excluded.ts_exact,
690                kind = excluded.kind,
691                source = excluded.source,
692                tool = excluded.tool,
693                tool_call_id = excluded.tool_call_id,
694                tokens_in = excluded.tokens_in,
695                tokens_out = excluded.tokens_out,
696                reasoning_tokens = excluded.reasoning_tokens,
697                cost_usd_e6 = excluded.cost_usd_e6,
698                payload = excluded.payload,
699                stop_reason = excluded.stop_reason,
700                latency_ms = excluded.latency_ms,
701                ttft_ms = excluded.ttft_ms,
702                retry_count = excluded.retry_count,
703                context_used_tokens = excluded.context_used_tokens,
704                context_max_tokens = excluded.context_max_tokens,
705                cache_creation_tokens = excluded.cache_creation_tokens,
706                cache_read_tokens = excluded.cache_read_tokens,
707                system_prompt_tokens = excluded.system_prompt_tokens",
708            params![
709                e.session_id,
710                e.seq as i64,
711                e.ts_ms as i64,
712                bool_to_i64(e.ts_exact),
713                format!("{:?}", e.kind),
714                format!("{:?}", e.source),
715                e.tool,
716                e.tool_call_id,
717                e.tokens_in.map(|v| v as i64),
718                e.tokens_out.map(|v| v as i64),
719                e.reasoning_tokens.map(|v| v as i64),
720                e.cost_usd_e6,
721                payload,
722                e.stop_reason,
723                e.latency_ms.map(|v| v as i64),
724                e.ttft_ms.map(|v| v as i64),
725                e.retry_count.map(|v| v as i64),
726                e.context_used_tokens.map(|v| v as i64),
727                e.context_max_tokens.map(|v| v as i64),
728                e.cache_creation_tokens.map(|v| v as i64),
729                e.cache_read_tokens.map(|v| v as i64),
730                e.system_prompt_tokens.map(|v| v as i64),
731            ],
732        )?;
733        if self.conn.changes() == 0 {
734            return Ok(());
735        }
736        self.append_hot_event(e)?;
737        if projector_legacy_mode() {
738            index_event_derived(&self.conn, e)?;
739            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
740            self.invalidate_span_tree_cache();
741        } else if last_before.is_some_and(|last| e.seq <= last) {
742            self.replay_projector_session(&e.session_id)?;
743        } else {
744            let deltas = self.projector.borrow_mut().apply(e);
745            self.apply_projector_events(&deltas)?;
746            let expired = self
747                .projector
748                .borrow_mut()
749                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
750            self.apply_projector_events(&expired)?;
751            if is_stop_event(e) {
752                let flushed = self
753                    .projector
754                    .borrow_mut()
755                    .flush_session(&e.session_id, e.ts_ms);
756                self.apply_projector_events(&flushed)?;
757            }
758            self.invalidate_span_tree_cache();
759        }
760        self.append_search_event(e);
761        let Some(ctx) = ctx else {
762            return Ok(());
763        };
764        let sync = &ctx.sync;
765        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
766            return Ok(());
767        }
768        let Some(salt) = try_team_salt(sync) else {
769            tracing::warn!(
770                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
771            );
772            return Ok(());
773        };
774        if sync.sample_rate < 1.0 {
775            let u: f64 = rand::random();
776            if u > sync.sample_rate {
777                return Ok(());
778            }
779        }
780        let Some(session) = self.get_session(&e.session_id)? else {
781            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
782            return Ok(());
783        };
784        let mut outbound = outbound_event_from_row(e, &session, &salt);
785        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
786        let row = serde_json::to_string(&outbound)?;
787        self.outbox()?.append(&e.session_id, "events", &row)?;
788        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
789        Ok(())
790    }
791
792    fn append_hot_event(&self, e: &Event) -> Result<()> {
793        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
794            return Ok(());
795        }
796        let mut slot = self.hot_log.borrow_mut();
797        if slot.is_none() {
798            *slot = Some(HotLog::open(&self.root)?);
799        }
800        if let Some(log) = slot.as_mut() {
801            log.append(e)?;
802        }
803        Ok(())
804    }
805
806    fn append_search_event(&self, e: &Event) {
807        if let Err(err) = self.try_append_search_event(e) {
808            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
809            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
810        }
811    }
812
813    fn try_append_search_event(&self, e: &Event) -> Result<()> {
814        let Some(session) = self.get_session(&e.session_id)? else {
815            return Ok(());
816        };
817        let workspace = PathBuf::from(&session.workspace);
818        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
819        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
820        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
821            return Ok(());
822        };
823        let mut slot = self.search_writer.borrow_mut();
824        if slot.is_none() {
825            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
826        }
827        slot.as_mut().expect("writer").add(&doc)
828    }
829
830    pub fn flush_search(&self) -> Result<()> {
831        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
832            writer.commit()?;
833        }
834        Ok(())
835    }
836
837    fn outbox(&self) -> Result<Outbox> {
838        Outbox::open(&self.root)
839    }
840
841    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
842        if projector_legacy_mode() {
843            rebuild_tool_spans_for_session(&self.conn, session_id)?;
844            self.invalidate_span_tree_cache();
845            return Ok(());
846        }
847        let deltas = self
848            .projector
849            .borrow_mut()
850            .flush_session(session_id, now_ms);
851        if self.apply_projector_events(&deltas)? {
852            self.invalidate_span_tree_cache();
853        }
854        Ok(())
855    }
856
857    fn replay_projector_session(&self, session_id: &str) -> Result<()> {
858        clear_session_spans(&self.conn, session_id)?;
859        self.projector.borrow_mut().reset_session(session_id);
860        let events = self.list_events_for_session(session_id)?;
861        let mut changed = false;
862        for event in &events {
863            let deltas = self.projector.borrow_mut().apply(event);
864            changed |= self.apply_projector_events(&deltas)?;
865        }
866        if self
867            .get_session(session_id)?
868            .is_some_and(|session| session.status == SessionStatus::Done)
869        {
870            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
871            let deltas = self
872                .projector
873                .borrow_mut()
874                .flush_session(session_id, now_ms);
875            changed |= self.apply_projector_events(&deltas)?;
876        }
877        if changed {
878            self.invalidate_span_tree_cache();
879        }
880        Ok(())
881    }
882
883    fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
884        let mut changed = false;
885        for delta in deltas {
886            match delta {
887                ProjectorEvent::SpanClosed(span, sample) => {
888                    upsert_tool_span_record(&self.conn, span)?;
889                    tracing::debug!(
890                        session_id = %sample.session_id,
891                        span_id = %sample.span_id,
892                        tool = ?sample.tool,
893                        lead_time_ms = ?sample.lead_time_ms,
894                        tokens_in = ?sample.tokens_in,
895                        tokens_out = ?sample.tokens_out,
896                        reasoning_tokens = ?sample.reasoning_tokens,
897                        cost_usd_e6 = ?sample.cost_usd_e6,
898                        paths = ?sample.paths,
899                        "tool span closed"
900                    );
901                    changed = true;
902                }
903                ProjectorEvent::SpanPatched(span) => {
904                    upsert_tool_span_record(&self.conn, span)?;
905                    changed = true;
906                }
907                ProjectorEvent::FileTouched { session, path } => {
908                    self.conn.execute(
909                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
910                        params![session, path],
911                    )?;
912                    changed = true;
913                }
914                ProjectorEvent::SkillUsed { session, skill } => {
915                    self.conn.execute(
916                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
917                        params![session, skill],
918                    )?;
919                    changed = true;
920                }
921                ProjectorEvent::RuleUsed { session, rule } => {
922                    self.conn.execute(
923                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
924                        params![session, rule],
925                    )?;
926                    changed = true;
927                }
928            }
929        }
930        Ok(changed)
931    }
932
933    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
934        let rows = self.outbox()?.list_pending(limit)?;
935        if !rows.is_empty() {
936            return Ok(rows);
937        }
938        let mut stmt = self.conn.prepare(
939            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
940        )?;
941        let rows = stmt.query_map(params![limit as i64], |row| {
942            Ok((
943                row.get::<_, i64>(0)?,
944                row.get::<_, String>(1)?,
945                row.get::<_, String>(2)?,
946            ))
947        })?;
948        let mut out = Vec::new();
949        for r in rows {
950            out.push(r?);
951        }
952        Ok(out)
953    }
954
955    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
956        self.outbox()?.delete_ids(ids)?;
957        for id in ids {
958            self.conn
959                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
960        }
961        Ok(())
962    }
963
964    pub fn replace_outbox_rows(
965        &self,
966        owner_id: &str,
967        kind: &str,
968        payloads: &[String],
969    ) -> Result<()> {
970        self.outbox()?.replace(owner_id, kind, payloads)?;
971        self.conn.execute(
972            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
973            params![owner_id, kind],
974        )?;
975        for payload in payloads {
976            self.conn.execute(
977                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
978                params![owner_id, kind, payload],
979            )?;
980        }
981        Ok(())
982    }
983
984    pub fn outbox_pending_count(&self) -> Result<u64> {
985        let redb = self.outbox()?.pending_count()?;
986        if redb > 0 {
987            return Ok(redb);
988        }
989        let c: i64 =
990            self.conn
991                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
992                    r.get(0)
993                })?;
994        Ok(c as u64)
995    }
996
997    pub fn set_sync_state_ok(&self) -> Result<()> {
998        let now = now_ms().to_string();
999        self.conn.execute(
1000            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
1001             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1002            params![now],
1003        )?;
1004        self.conn.execute(
1005            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
1006             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1007            [],
1008        )?;
1009        self.conn
1010            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
1011        Ok(())
1012    }
1013
1014    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
1015        let prev: i64 = self
1016            .conn
1017            .query_row(
1018                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1019                [],
1020                |r| {
1021                    let s: String = r.get(0)?;
1022                    Ok(s.parse::<i64>().unwrap_or(0))
1023                },
1024            )
1025            .optional()?
1026            .unwrap_or(0);
1027        let next = prev.saturating_add(1);
1028        self.conn.execute(
1029            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
1030             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1031            params![msg],
1032        )?;
1033        self.conn.execute(
1034            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
1035             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1036            params![next.to_string()],
1037        )?;
1038        Ok(())
1039    }
1040
1041    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
1042        let pending_outbox = self.outbox_pending_count()?;
1043        let last_success_ms = self
1044            .conn
1045            .query_row(
1046                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
1047                [],
1048                |r| r.get::<_, String>(0),
1049            )
1050            .optional()?
1051            .and_then(|s| s.parse().ok());
1052        let last_error = self
1053            .conn
1054            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1055                r.get::<_, String>(0)
1056            })
1057            .optional()?;
1058        let consecutive_failures = self
1059            .conn
1060            .query_row(
1061                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1062                [],
1063                |r| r.get::<_, String>(0),
1064            )
1065            .optional()?
1066            .and_then(|s| s.parse().ok())
1067            .unwrap_or(0);
1068        Ok(SyncStatusSnapshot {
1069            pending_outbox,
1070            last_success_ms,
1071            last_error,
1072            consecutive_failures,
1073        })
1074    }
1075
1076    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1077        let row: Option<String> = self
1078            .conn
1079            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1080                r.get::<_, String>(0)
1081            })
1082            .optional()?;
1083        Ok(row.and_then(|s| s.parse().ok()))
1084    }
1085
1086    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1087        self.conn.execute(
1088            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1089             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1090            params![key, v.to_string()],
1091        )?;
1092        Ok(())
1093    }
1094
1095    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
1096    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1097        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1098        let old_ids = old_session_ids(&tx, cutoff_ms)?;
1099        let sessions_to_remove: i64 = tx.query_row(
1100            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1101            params![cutoff_ms],
1102            |r| r.get(0),
1103        )?;
1104        let events_to_remove: i64 = tx.query_row(
1105            "SELECT COUNT(*) FROM events WHERE session_id IN \
1106             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1107            params![cutoff_ms],
1108            |r| r.get(0),
1109        )?;
1110
1111        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1112        tx.execute(
1113            &format!(
1114                "DELETE FROM tool_span_paths WHERE span_id IN \
1115                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1116            ),
1117            params![cutoff_ms],
1118        )?;
1119        tx.execute(
1120            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1121            params![cutoff_ms],
1122        )?;
1123        tx.execute(
1124            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1125            params![cutoff_ms],
1126        )?;
1127        tx.execute(
1128            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1129            params![cutoff_ms],
1130        )?;
1131        tx.execute(
1132            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1133            params![cutoff_ms],
1134        )?;
1135        tx.execute(
1136            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1137            params![cutoff_ms],
1138        )?;
1139        tx.execute(
1140            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1141            params![cutoff_ms],
1142        )?;
1143        tx.execute(
1144            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1145            params![cutoff_ms],
1146        )?;
1147        tx.execute(
1148            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1149            params![cutoff_ms],
1150        )?;
1151        tx.execute(
1152            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1153            params![cutoff_ms],
1154        )?;
1155        tx.execute(
1156            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1157            params![cutoff_ms],
1158        )?;
1159        tx.execute(
1160            "DELETE FROM sessions WHERE started_at_ms < ?1",
1161            params![cutoff_ms],
1162        )?;
1163        tx.commit()?;
1164        if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1165            let _ = writer.commit();
1166        }
1167        if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1168            tracing::warn!("search prune skipped: {err:#}");
1169            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1170        }
1171        self.invalidate_span_tree_cache();
1172        Ok(PruneStats {
1173            sessions_removed: sessions_to_remove as u64,
1174            events_removed: events_to_remove as u64,
1175        })
1176    }
1177
1178    /// Reclaim file space after large deletes (exclusive lock; can be slow).
1179    pub fn vacuum(&self) -> Result<()> {
1180        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1181        Ok(())
1182    }
1183
1184    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1185        Ok(self
1186            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1187            .rows)
1188    }
1189
1190    pub fn list_sessions_page(
1191        &self,
1192        workspace: &str,
1193        offset: usize,
1194        limit: usize,
1195        filter: SessionFilter,
1196    ) -> Result<SessionPage> {
1197        let (where_sql, args) = session_filter_sql(workspace, &filter);
1198        let total = self.query_session_page_count(&where_sql, &args)?;
1199        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1200        let next = offset.saturating_add(rows.len());
1201        Ok(SessionPage {
1202            rows,
1203            total,
1204            next_offset: (next < total).then_some(next),
1205        })
1206    }
1207
1208    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1209        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1210        let total: i64 = self
1211            .conn
1212            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1213        Ok(total as usize)
1214    }
1215
1216    fn query_session_page_rows(
1217        &self,
1218        where_sql: &str,
1219        args: &[Value],
1220        offset: usize,
1221        limit: usize,
1222    ) -> Result<Vec<SessionRecord>> {
1223        let sql = format!(
1224            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1225        );
1226        let mut values = args.to_vec();
1227        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1228        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1229        let mut stmt = self.conn.prepare(&sql)?;
1230        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1231        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1232    }
1233
1234    pub fn list_sessions_started_after(
1235        &self,
1236        workspace: &str,
1237        after_started_at_ms: u64,
1238    ) -> Result<Vec<SessionRecord>> {
1239        let mut stmt = self.conn.prepare(
1240            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1241                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1242                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1243                    repo_file_count, repo_total_loc
1244             FROM sessions
1245             WHERE workspace = ?1 AND started_at_ms > ?2
1246             ORDER BY started_at_ms DESC, id ASC",
1247        )?;
1248        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1249        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1250    }
1251
1252    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1253        if ids.is_empty() {
1254            return Ok(Vec::new());
1255        }
1256        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1257        let sql =
1258            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1259        let mut stmt = self.conn.prepare(&sql)?;
1260        let params: Vec<&dyn rusqlite::ToSql> =
1261            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1262        let rows = stmt.query_map(params.as_slice(), |r| {
1263            let status: String = r.get(1)?;
1264            Ok(SessionStatusRow {
1265                id: r.get(0)?,
1266                status: status_from_str(&status),
1267                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1268            })
1269        })?;
1270        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1271    }
1272
1273    fn running_session_ids(&self) -> Result<Vec<String>> {
1274        let mut stmt = self
1275            .conn
1276            .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1277        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1278        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1279    }
1280
1281    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1282        let session_count: i64 = self.conn.query_row(
1283            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1284            params![workspace],
1285            |r| r.get(0),
1286        )?;
1287
1288        let total_cost: i64 = self.conn.query_row(
1289            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1290             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1291            params![workspace],
1292            |r| r.get(0),
1293        )?;
1294
1295        let mut stmt = self.conn.prepare(
1296            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1297        )?;
1298        let by_agent: Vec<(String, u64)> = stmt
1299            .query_map(params![workspace], |r| {
1300                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1301            })?
1302            .filter_map(|r| r.ok())
1303            .collect();
1304
1305        let mut stmt = self.conn.prepare(
1306            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1307        )?;
1308        let by_model: Vec<(String, u64)> = stmt
1309            .query_map(params![workspace], |r| {
1310                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1311            })?
1312            .filter_map(|r| r.ok())
1313            .collect();
1314
1315        let mut stmt = self.conn.prepare(
1316            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1317             WHERE s.workspace = ?1 AND tool IS NOT NULL
1318             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1319        )?;
1320        let top_tools: Vec<(String, u64)> = stmt
1321            .query_map(params![workspace], |r| {
1322                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1323            })?
1324            .filter_map(|r| r.ok())
1325            .collect();
1326
1327        Ok(SummaryStats {
1328            session_count: session_count as u64,
1329            total_cost_usd_e6: total_cost,
1330            by_agent,
1331            by_model,
1332            top_tools,
1333        })
1334    }
1335
1336    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1337        self.list_events_page(session_id, 0, i64::MAX as usize)
1338    }
1339
1340    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1341        let mut stmt = self.conn.prepare(
1342            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1343                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1344                    stop_reason, latency_ms, ttft_ms, retry_count,
1345                    context_used_tokens, context_max_tokens,
1346                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1347             FROM events WHERE session_id = ?1 AND seq = ?2",
1348        )?;
1349        stmt.query_row(params![session_id, seq as i64], event_row)
1350            .optional()
1351            .map_err(Into::into)
1352    }
1353
1354    pub fn search_tool_events(
1355        &self,
1356        workspace: &str,
1357        tool: &str,
1358        since_ms: Option<u64>,
1359        agent: Option<&str>,
1360        limit: usize,
1361    ) -> Result<Vec<(String, Event)>> {
1362        let mut stmt = self.conn.prepare(
1363            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1364                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1365                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1366                    e.context_used_tokens, e.context_max_tokens,
1367                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
1368                    s.agent
1369             FROM events e JOIN sessions s ON s.id = e.session_id
1370             WHERE e.tool = ?2
1371               AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
1372               AND (?3 IS NULL OR e.ts_ms >= ?3)
1373               AND (?4 IS NULL OR s.agent = ?4)
1374             ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
1375             LIMIT ?5",
1376        )?;
1377        let since = since_ms.map(|v| v as i64);
1378        let rows = stmt.query_map(
1379            params![workspace, tool, since, agent, limit as i64],
1380            search_tool_event_row,
1381        )?;
1382        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1383    }
1384
1385    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1386        let mut out = Vec::new();
1387        for session in self.list_sessions(workspace)? {
1388            for event in self.list_events_for_session(&session.id)? {
1389                out.push((session.clone(), event));
1390            }
1391        }
1392        out.sort_by(|a, b| {
1393            (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1394        });
1395        Ok(out)
1396    }
1397
1398    pub fn list_events_page(
1399        &self,
1400        session_id: &str,
1401        after_seq: u64,
1402        limit: usize,
1403    ) -> Result<Vec<Event>> {
1404        let mut stmt = self.conn.prepare(
1405            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1406                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1407                    stop_reason, latency_ms, ttft_ms, retry_count,
1408                    context_used_tokens, context_max_tokens,
1409                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1410             FROM events
1411             WHERE session_id = ?1 AND seq >= ?2
1412             ORDER BY seq ASC LIMIT ?3",
1413        )?;
1414        let rows = stmt.query_map(
1415            params![
1416                session_id,
1417                after_seq as i64,
1418                limit.min(i64::MAX as usize) as i64
1419            ],
1420            event_row,
1421        )?;
1422        let mut events = Vec::new();
1423        for row in rows {
1424            events.push(row?);
1425        }
1426        Ok(events)
1427    }
1428
1429    /// Update only status for existing session.
1430    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1431        self.conn.execute(
1432            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1433            params![format!("{:?}", status), id],
1434        )?;
1435        Ok(())
1436    }
1437
1438    /// Workspace activity dashboard — feeds `cmd_insights`.
1439    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1440        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1441        Ok(InsightsStats {
1442            total_sessions: count_q(
1443                &self.conn,
1444                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1445                workspace,
1446            )?,
1447            running_sessions: count_q(
1448                &self.conn,
1449                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1450                workspace,
1451            )?,
1452            total_events: count_q(
1453                &self.conn,
1454                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1455                workspace,
1456            )?,
1457            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1458            recent: recent_sessions_3(&self.conn, workspace)?,
1459            top_tools: top_tools_5(&self.conn, workspace)?,
1460            total_cost_usd_e6,
1461            sessions_with_cost,
1462        })
1463    }
1464
1465    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1466    pub fn retro_events_in_window(
1467        &self,
1468        workspace: &str,
1469        start_ms: u64,
1470        end_ms: u64,
1471    ) -> Result<Vec<(SessionRecord, Event)>> {
1472        let mut stmt = self.conn.prepare(
1473            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1474                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1475                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1476                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1477                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1478                    s.repo_file_count, s.repo_total_loc,
1479                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1480                    e.context_used_tokens, e.context_max_tokens,
1481                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1482             FROM events e
1483             JOIN sessions s ON s.id = e.session_id
1484             WHERE s.workspace = ?1
1485               AND (
1486                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1487                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1488               )
1489             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1490        )?;
1491        let rows = stmt.query_map(
1492            params![
1493                workspace,
1494                start_ms as i64,
1495                end_ms as i64,
1496                SYNTHETIC_TS_CEILING_MS,
1497            ],
1498            |row| {
1499                let payload_str: String = row.get(12)?;
1500                let status_str: String = row.get(19)?;
1501                Ok((
1502                    SessionRecord {
1503                        id: row.get(13)?,
1504                        agent: row.get(14)?,
1505                        model: row.get(15)?,
1506                        workspace: row.get(16)?,
1507                        started_at_ms: row.get::<_, i64>(17)? as u64,
1508                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1509                        status: status_from_str(&status_str),
1510                        trace_path: row.get(20)?,
1511                        start_commit: row.get(21)?,
1512                        end_commit: row.get(22)?,
1513                        branch: row.get(23)?,
1514                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1515                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1516                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1517                        prompt_fingerprint: row.get(27)?,
1518                        parent_session_id: row.get(28)?,
1519                        agent_version: row.get(29)?,
1520                        os: row.get(30)?,
1521                        arch: row.get(31)?,
1522                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1523                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1524                    },
1525                    Event {
1526                        session_id: row.get(0)?,
1527                        seq: row.get::<_, i64>(1)? as u64,
1528                        ts_ms: row.get::<_, i64>(2)? as u64,
1529                        ts_exact: row.get::<_, i64>(3)? != 0,
1530                        kind: kind_from_str(&row.get::<_, String>(4)?),
1531                        source: source_from_str(&row.get::<_, String>(5)?),
1532                        tool: row.get(6)?,
1533                        tool_call_id: row.get(7)?,
1534                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1535                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1536                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1537                        cost_usd_e6: row.get(11)?,
1538                        payload: serde_json::from_str(&payload_str)
1539                            .unwrap_or(serde_json::Value::Null),
1540                        stop_reason: row.get(34)?,
1541                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1542                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1543                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1544                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1545                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1546                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1547                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1548                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1549                    },
1550                ))
1551            },
1552        )?;
1553
1554        let mut out = Vec::new();
1555        for r in rows {
1556            out.push(r?);
1557        }
1558        Ok(out)
1559    }
1560
1561    pub fn experiment_metric_values_in_window(
1562        &self,
1563        workspace: &str,
1564        start_ms: u64,
1565        end_ms: u64,
1566        metric: crate::experiment::types::Metric,
1567    ) -> Result<Vec<(SessionRecord, f64)>> {
1568        use crate::experiment::types::Metric;
1569        let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1570            s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1571            s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1572            s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1573        let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1574            OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1575        let sql = match metric {
1576            Metric::TokensPerSession => format!(
1577                "SELECT {session_cols},
1578                    SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1579                 FROM sessions s JOIN events e ON e.session_id = s.id
1580                 WHERE {window}
1581                 GROUP BY s.id"
1582            ),
1583            Metric::CostPerSession => format!(
1584                "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1585                 FROM sessions s JOIN events e ON e.session_id = s.id
1586                 WHERE {window}
1587                 GROUP BY s.id"
1588            ),
1589            Metric::SuccessRate => format!(
1590                "SELECT {session_cols},
1591                    CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1592                 FROM sessions s JOIN events e ON e.session_id = s.id
1593                 WHERE {window}
1594                 GROUP BY s.id"
1595            ),
1596            Metric::DurationMinutes => format!(
1597                "SELECT {session_cols},
1598                    (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1599                 FROM sessions s
1600                 WHERE s.workspace = ?1
1601                   AND s.ended_at_ms IS NOT NULL
1602                   AND EXISTS (
1603                     SELECT 1 FROM events e
1604                     WHERE e.session_id = s.id
1605                       AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1606                         OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1607                   )"
1608            ),
1609            Metric::FilesPerSession => format!(
1610                "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1611                 FROM sessions s
1612                 JOIN events e ON e.session_id = s.id
1613                 LEFT JOIN files_touched ft ON ft.session_id = s.id
1614                 WHERE {window}
1615                 GROUP BY s.id"
1616            ),
1617            Metric::SuccessRateByPrompt => format!(
1618                "SELECT {session_cols},
1619                    1.0 - (MIN(
1620                      SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1621                      SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1622                    ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1623                 FROM sessions s JOIN events e ON e.session_id = s.id
1624                 WHERE {window}
1625                 GROUP BY s.id
1626                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1627            ),
1628            Metric::CostByPrompt => format!(
1629                "SELECT {session_cols},
1630                    SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1631                    SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1632                 FROM sessions s JOIN events e ON e.session_id = s.id
1633                 WHERE {window}
1634                 GROUP BY s.id
1635                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1636            ),
1637            Metric::ToolLoops => format!(
1638                "WITH calls AS (
1639                   SELECT e.session_id, e.tool,
1640                     LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1641                   FROM events e JOIN sessions s ON s.id = e.session_id
1642                   WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1643                 )
1644                 SELECT {session_cols},
1645                    SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1646                 FROM sessions s JOIN calls ON calls.session_id = s.id
1647                 GROUP BY s.id"
1648            ),
1649        };
1650        let mut stmt = self.conn.prepare(&sql)?;
1651        let rows = stmt.query_map(
1652            params![
1653                workspace,
1654                start_ms as i64,
1655                end_ms as i64,
1656                SYNTHETIC_TS_CEILING_MS,
1657            ],
1658            |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1659        )?;
1660        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1661    }
1662
1663    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1664    pub fn files_touched_in_window(
1665        &self,
1666        workspace: &str,
1667        start_ms: u64,
1668        end_ms: u64,
1669    ) -> Result<Vec<(String, String)>> {
1670        let mut stmt = self.conn.prepare(
1671            "SELECT DISTINCT ft.session_id, ft.path
1672             FROM files_touched ft
1673             JOIN sessions s ON s.id = ft.session_id
1674             WHERE s.workspace = ?1
1675               AND EXISTS (
1676                 SELECT 1 FROM events e
1677                 JOIN sessions ss ON ss.id = e.session_id
1678                 WHERE e.session_id = ft.session_id
1679                   AND (
1680                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1681                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1682                   )
1683               )
1684             ORDER BY ft.session_id, ft.path",
1685        )?;
1686        let out: Vec<(String, String)> = stmt
1687            .query_map(
1688                params![
1689                    workspace,
1690                    start_ms as i64,
1691                    end_ms as i64,
1692                    SYNTHETIC_TS_CEILING_MS,
1693                ],
1694                |r| Ok((r.get(0)?, r.get(1)?)),
1695            )?
1696            .filter_map(|r| r.ok())
1697            .collect();
1698        Ok(out)
1699    }
1700
1701    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1702    /// (any session with an indexed skill row; join events optional — use row existence).
1703    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1704        let mut stmt = self.conn.prepare(
1705            "SELECT DISTINCT su.skill
1706             FROM skills_used su
1707             JOIN sessions s ON s.id = su.session_id
1708             WHERE s.workspace = ?1
1709               AND EXISTS (
1710                 SELECT 1 FROM events e
1711                 JOIN sessions ss ON ss.id = e.session_id
1712                 WHERE e.session_id = su.session_id
1713                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1714               )
1715             ORDER BY su.skill",
1716        )?;
1717        let out: Vec<String> = stmt
1718            .query_map(
1719                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1720                |r| r.get::<_, String>(0),
1721            )?
1722            .filter_map(|r| r.ok())
1723            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1724            .collect();
1725        Ok(out)
1726    }
1727
1728    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1729    pub fn skills_used_in_window(
1730        &self,
1731        workspace: &str,
1732        start_ms: u64,
1733        end_ms: u64,
1734    ) -> Result<Vec<(String, String)>> {
1735        let mut stmt = self.conn.prepare(
1736            "SELECT DISTINCT su.session_id, su.skill
1737             FROM skills_used su
1738             JOIN sessions s ON s.id = su.session_id
1739             WHERE s.workspace = ?1
1740               AND EXISTS (
1741                 SELECT 1 FROM events e
1742                 JOIN sessions ss ON ss.id = e.session_id
1743                 WHERE e.session_id = su.session_id
1744                   AND (
1745                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1746                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1747                   )
1748               )
1749             ORDER BY su.session_id, su.skill",
1750        )?;
1751        let out: Vec<(String, String)> = stmt
1752            .query_map(
1753                params![
1754                    workspace,
1755                    start_ms as i64,
1756                    end_ms as i64,
1757                    SYNTHETIC_TS_CEILING_MS,
1758                ],
1759                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1760            )?
1761            .filter_map(|r| r.ok())
1762            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1763            .collect();
1764        Ok(out)
1765    }
1766
1767    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1768    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1769        let mut stmt = self.conn.prepare(
1770            "SELECT DISTINCT ru.rule
1771             FROM rules_used ru
1772             JOIN sessions s ON s.id = ru.session_id
1773             WHERE s.workspace = ?1
1774               AND EXISTS (
1775                 SELECT 1 FROM events e
1776                 JOIN sessions ss ON ss.id = e.session_id
1777                 WHERE e.session_id = ru.session_id
1778                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1779               )
1780             ORDER BY ru.rule",
1781        )?;
1782        let out: Vec<String> = stmt
1783            .query_map(
1784                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1785                |r| r.get::<_, String>(0),
1786            )?
1787            .filter_map(|r| r.ok())
1788            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1789            .collect();
1790        Ok(out)
1791    }
1792
1793    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1794    pub fn rules_used_in_window(
1795        &self,
1796        workspace: &str,
1797        start_ms: u64,
1798        end_ms: u64,
1799    ) -> Result<Vec<(String, String)>> {
1800        let mut stmt = self.conn.prepare(
1801            "SELECT DISTINCT ru.session_id, ru.rule
1802             FROM rules_used ru
1803             JOIN sessions s ON s.id = ru.session_id
1804             WHERE s.workspace = ?1
1805               AND EXISTS (
1806                 SELECT 1 FROM events e
1807                 JOIN sessions ss ON ss.id = e.session_id
1808                 WHERE e.session_id = ru.session_id
1809                   AND (
1810                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1811                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1812                   )
1813               )
1814             ORDER BY ru.session_id, ru.rule",
1815        )?;
1816        let out: Vec<(String, String)> = stmt
1817            .query_map(
1818                params![
1819                    workspace,
1820                    start_ms as i64,
1821                    end_ms as i64,
1822                    SYNTHETIC_TS_CEILING_MS,
1823                ],
1824                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1825            )?
1826            .filter_map(|r| r.ok())
1827            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1828            .collect();
1829        Ok(out)
1830    }
1831
1832    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1833    pub fn sessions_active_in_window(
1834        &self,
1835        workspace: &str,
1836        start_ms: u64,
1837        end_ms: u64,
1838    ) -> Result<HashSet<String>> {
1839        let mut stmt = self.conn.prepare(
1840            "SELECT DISTINCT s.id
1841             FROM sessions s
1842             WHERE s.workspace = ?1
1843               AND EXISTS (
1844                 SELECT 1 FROM events e
1845                 WHERE e.session_id = s.id
1846                   AND (
1847                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1848                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1849                   )
1850               )",
1851        )?;
1852        let out: HashSet<String> = stmt
1853            .query_map(
1854                params![
1855                    workspace,
1856                    start_ms as i64,
1857                    end_ms as i64,
1858                    SYNTHETIC_TS_CEILING_MS,
1859                ],
1860                |r| r.get(0),
1861            )?
1862            .filter_map(|r| r.ok())
1863            .collect();
1864        Ok(out)
1865    }
1866
1867    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1868    pub fn session_costs_usd_e6_in_window(
1869        &self,
1870        workspace: &str,
1871        start_ms: u64,
1872        end_ms: u64,
1873    ) -> Result<HashMap<String, i64>> {
1874        let mut stmt = self.conn.prepare(
1875            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1876             FROM events e
1877             JOIN sessions s ON s.id = e.session_id
1878             WHERE s.workspace = ?1
1879               AND (
1880                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1881                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1882               )
1883             GROUP BY e.session_id",
1884        )?;
1885        let rows: Vec<(String, i64)> = stmt
1886            .query_map(
1887                params![
1888                    workspace,
1889                    start_ms as i64,
1890                    end_ms as i64,
1891                    SYNTHETIC_TS_CEILING_MS,
1892                ],
1893                |r| Ok((r.get(0)?, r.get(1)?)),
1894            )?
1895            .filter_map(|r| r.ok())
1896            .collect();
1897        Ok(rows.into_iter().collect())
1898    }
1899
1900    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1901    pub fn guidance_report(
1902        &self,
1903        workspace: &str,
1904        window_start_ms: u64,
1905        window_end_ms: u64,
1906        skill_slugs_on_disk: &HashSet<String>,
1907        rule_slugs_on_disk: &HashSet<String>,
1908    ) -> Result<GuidanceReport> {
1909        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1910        let denom = active.len() as u64;
1911        let costs =
1912            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1913
1914        let workspace_avg_cost_per_session_usd = if denom > 0 {
1915            let total_e6: i64 = active
1916                .iter()
1917                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1918                .sum();
1919            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1920        } else {
1921            None
1922        };
1923
1924        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1925        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1926            skill_sessions.entry(skill).or_default().insert(sid);
1927        }
1928        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1929        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1930            rule_sessions.entry(rule).or_default().insert(sid);
1931        }
1932
1933        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1934
1935        let mut push_row =
1936            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1937                let sessions = sids.len() as u64;
1938                let sessions_pct = if denom > 0 {
1939                    sessions as f64 * 100.0 / denom as f64
1940                } else {
1941                    0.0
1942                };
1943                let total_cost_usd_e6: i64 = sids
1944                    .iter()
1945                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1946                    .sum();
1947                let avg_cost_per_session_usd = if sessions > 0 {
1948                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1949                } else {
1950                    None
1951                };
1952                let vs_workspace_avg_cost_per_session_usd =
1953                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1954                        (Some(avg), Some(w)) => Some(avg - w),
1955                        _ => None,
1956                    };
1957                rows.push(GuidancePerfRow {
1958                    kind,
1959                    id,
1960                    sessions,
1961                    sessions_pct,
1962                    total_cost_usd_e6,
1963                    avg_cost_per_session_usd,
1964                    vs_workspace_avg_cost_per_session_usd,
1965                    on_disk,
1966                });
1967            };
1968
1969        let mut seen_skills: HashSet<String> = HashSet::new();
1970        for (id, sids) in &skill_sessions {
1971            seen_skills.insert(id.clone());
1972            push_row(
1973                GuidanceKind::Skill,
1974                id.clone(),
1975                sids,
1976                skill_slugs_on_disk.contains(id),
1977            );
1978        }
1979        for slug in skill_slugs_on_disk {
1980            if seen_skills.contains(slug) {
1981                continue;
1982            }
1983            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1984        }
1985
1986        let mut seen_rules: HashSet<String> = HashSet::new();
1987        for (id, sids) in &rule_sessions {
1988            seen_rules.insert(id.clone());
1989            push_row(
1990                GuidanceKind::Rule,
1991                id.clone(),
1992                sids,
1993                rule_slugs_on_disk.contains(id),
1994            );
1995        }
1996        for slug in rule_slugs_on_disk {
1997            if seen_rules.contains(slug) {
1998                continue;
1999            }
2000            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
2001        }
2002
2003        rows.sort_by(|a, b| {
2004            b.sessions
2005                .cmp(&a.sessions)
2006                .then_with(|| a.kind.cmp(&b.kind))
2007                .then_with(|| a.id.cmp(&b.id))
2008        });
2009
2010        Ok(GuidanceReport {
2011            workspace: workspace.to_string(),
2012            window_start_ms,
2013            window_end_ms,
2014            sessions_in_window: denom,
2015            workspace_avg_cost_per_session_usd,
2016            rows,
2017        })
2018    }
2019
2020    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
2021        let mut stmt = self.conn.prepare(
2022            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
2023                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
2024                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
2025                    repo_file_count, repo_total_loc
2026             FROM sessions WHERE id = ?1",
2027        )?;
2028        let mut rows = stmt.query_map(params![id], |row| {
2029            Ok((
2030                row.get::<_, String>(0)?,
2031                row.get::<_, String>(1)?,
2032                row.get::<_, Option<String>>(2)?,
2033                row.get::<_, String>(3)?,
2034                row.get::<_, i64>(4)?,
2035                row.get::<_, Option<i64>>(5)?,
2036                row.get::<_, String>(6)?,
2037                row.get::<_, String>(7)?,
2038                row.get::<_, Option<String>>(8)?,
2039                row.get::<_, Option<String>>(9)?,
2040                row.get::<_, Option<String>>(10)?,
2041                row.get::<_, Option<i64>>(11)?,
2042                row.get::<_, Option<i64>>(12)?,
2043                row.get::<_, String>(13)?,
2044                row.get::<_, Option<String>>(14)?,
2045                row.get::<_, Option<String>>(15)?,
2046                row.get::<_, Option<String>>(16)?,
2047                row.get::<_, Option<String>>(17)?,
2048                row.get::<_, Option<String>>(18)?,
2049                row.get::<_, Option<i64>>(19)?,
2050                row.get::<_, Option<i64>>(20)?,
2051            ))
2052        })?;
2053
2054        if let Some(row) = rows.next() {
2055            let (
2056                id,
2057                agent,
2058                model,
2059                workspace,
2060                started,
2061                ended,
2062                status_str,
2063                trace,
2064                start_commit,
2065                end_commit,
2066                branch,
2067                dirty_start,
2068                dirty_end,
2069                source,
2070                prompt_fingerprint,
2071                parent_session_id,
2072                agent_version,
2073                os,
2074                arch,
2075                repo_file_count,
2076                repo_total_loc,
2077            ) = row?;
2078            Ok(Some(SessionRecord {
2079                id,
2080                agent,
2081                model,
2082                workspace,
2083                started_at_ms: started as u64,
2084                ended_at_ms: ended.map(|v| v as u64),
2085                status: status_from_str(&status_str),
2086                trace_path: trace,
2087                start_commit,
2088                end_commit,
2089                branch,
2090                dirty_start: dirty_start.map(i64_to_bool),
2091                dirty_end: dirty_end.map(i64_to_bool),
2092                repo_binding_source: empty_to_none(source),
2093                prompt_fingerprint,
2094                parent_session_id,
2095                agent_version,
2096                os,
2097                arch,
2098                repo_file_count: repo_file_count.map(|v| v as u32),
2099                repo_total_loc: repo_total_loc.map(|v| v as u64),
2100            }))
2101        } else {
2102            Ok(None)
2103        }
2104    }
2105
2106    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2107        let mut stmt = self.conn.prepare(
2108            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2109                    indexed_at_ms, dirty, graph_path
2110             FROM repo_snapshots WHERE workspace = ?1
2111             ORDER BY indexed_at_ms DESC LIMIT 1",
2112        )?;
2113        let mut rows = stmt.query_map(params![workspace], |row| {
2114            Ok(RepoSnapshotRecord {
2115                id: row.get(0)?,
2116                workspace: row.get(1)?,
2117                head_commit: row.get(2)?,
2118                dirty_fingerprint: row.get(3)?,
2119                analyzer_version: row.get(4)?,
2120                indexed_at_ms: row.get::<_, i64>(5)? as u64,
2121                dirty: row.get::<_, i64>(6)? != 0,
2122                graph_path: row.get(7)?,
2123            })
2124        })?;
2125        Ok(rows.next().transpose()?)
2126    }
2127
2128    pub fn save_repo_snapshot(
2129        &self,
2130        snapshot: &RepoSnapshotRecord,
2131        facts: &[FileFact],
2132        edges: &[RepoEdge],
2133    ) -> Result<()> {
2134        self.conn.execute(
2135            "INSERT INTO repo_snapshots (
2136                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2137                indexed_at_ms, dirty, graph_path
2138             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2139             ON CONFLICT(id) DO UPDATE SET
2140                workspace=excluded.workspace,
2141                head_commit=excluded.head_commit,
2142                dirty_fingerprint=excluded.dirty_fingerprint,
2143                analyzer_version=excluded.analyzer_version,
2144                indexed_at_ms=excluded.indexed_at_ms,
2145                dirty=excluded.dirty,
2146                graph_path=excluded.graph_path",
2147            params![
2148                snapshot.id,
2149                snapshot.workspace,
2150                snapshot.head_commit,
2151                snapshot.dirty_fingerprint,
2152                snapshot.analyzer_version,
2153                snapshot.indexed_at_ms as i64,
2154                bool_to_i64(snapshot.dirty),
2155                snapshot.graph_path,
2156            ],
2157        )?;
2158        self.conn.execute(
2159            "DELETE FROM file_facts WHERE snapshot_id = ?1",
2160            params![snapshot.id],
2161        )?;
2162        self.conn.execute(
2163            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2164            params![snapshot.id],
2165        )?;
2166        for fact in facts {
2167            self.conn.execute(
2168                "INSERT INTO file_facts (
2169                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2170                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2171                    churn_30d, churn_90d, authors_90d, last_changed_ms
2172                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2173                params![
2174                    fact.snapshot_id,
2175                    fact.path,
2176                    fact.language,
2177                    fact.bytes as i64,
2178                    fact.loc as i64,
2179                    fact.sloc as i64,
2180                    fact.complexity_total as i64,
2181                    fact.max_fn_complexity as i64,
2182                    fact.symbol_count as i64,
2183                    fact.import_count as i64,
2184                    fact.fan_in as i64,
2185                    fact.fan_out as i64,
2186                    fact.churn_30d as i64,
2187                    fact.churn_90d as i64,
2188                    fact.authors_90d as i64,
2189                    fact.last_changed_ms.map(|v| v as i64),
2190                ],
2191            )?;
2192        }
2193        for edge in edges {
2194            self.conn.execute(
2195                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2196                 VALUES (?1, ?2, ?3, ?4, ?5)
2197                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2198                 DO UPDATE SET weight = weight + excluded.weight",
2199                params![
2200                    snapshot.id,
2201                    edge.from_path,
2202                    edge.to_path,
2203                    edge.kind,
2204                    edge.weight as i64,
2205                ],
2206            )?;
2207        }
2208        Ok(())
2209    }
2210
2211    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2212        let mut stmt = self.conn.prepare(
2213            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2214                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2215                    churn_30d, churn_90d, authors_90d, last_changed_ms
2216             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2217        )?;
2218        let rows = stmt.query_map(params![snapshot_id], |row| {
2219            Ok(FileFact {
2220                snapshot_id: row.get(0)?,
2221                path: row.get(1)?,
2222                language: row.get(2)?,
2223                bytes: row.get::<_, i64>(3)? as u64,
2224                loc: row.get::<_, i64>(4)? as u32,
2225                sloc: row.get::<_, i64>(5)? as u32,
2226                complexity_total: row.get::<_, i64>(6)? as u32,
2227                max_fn_complexity: row.get::<_, i64>(7)? as u32,
2228                symbol_count: row.get::<_, i64>(8)? as u32,
2229                import_count: row.get::<_, i64>(9)? as u32,
2230                fan_in: row.get::<_, i64>(10)? as u32,
2231                fan_out: row.get::<_, i64>(11)? as u32,
2232                churn_30d: row.get::<_, i64>(12)? as u32,
2233                churn_90d: row.get::<_, i64>(13)? as u32,
2234                authors_90d: row.get::<_, i64>(14)? as u32,
2235                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2236            })
2237        })?;
2238        Ok(rows.filter_map(|row| row.ok()).collect())
2239    }
2240
2241    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2242        let mut stmt = self.conn.prepare(
2243            "SELECT from_id, to_id, kind, weight
2244             FROM repo_edges WHERE snapshot_id = ?1
2245             ORDER BY kind, from_id, to_id",
2246        )?;
2247        let rows = stmt.query_map(params![snapshot_id], |row| {
2248            Ok(RepoEdge {
2249                from_path: row.get(0)?,
2250                to_path: row.get(1)?,
2251                kind: row.get(2)?,
2252                weight: row.get::<_, i64>(3)? as u32,
2253            })
2254        })?;
2255        Ok(rows.filter_map(|row| row.ok()).collect())
2256    }
2257
2258    pub fn hottest_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2259        self.ranked_files_for_snapshot(snapshot_id, "churn_30d * complexity_total")
2260    }
2261
2262    pub fn most_changed_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2263        self.ranked_files_for_snapshot(snapshot_id, "churn_30d")
2264    }
2265
2266    pub fn most_complex_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2267        self.ranked_files_for_snapshot(snapshot_id, "complexity_total")
2268    }
2269
2270    pub fn highest_risk_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2271        self.ranked_files_for_snapshot(snapshot_id, "churn_30d * authors_90d * complexity_total")
2272    }
2273
2274    fn ranked_files_for_snapshot(
2275        &self,
2276        snapshot_id: &str,
2277        value_sql: &str,
2278    ) -> Result<Vec<RankedFile>> {
2279        let sql = format!(
2280            "SELECT path, {value_sql}, complexity_total, churn_30d
2281             FROM file_facts WHERE snapshot_id = ?1
2282             ORDER BY {value_sql} DESC, path ASC LIMIT 10"
2283        );
2284        let mut stmt = self.conn.prepare(&sql)?;
2285        let rows = stmt.query_map(params![snapshot_id], ranked_file_row)?;
2286        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2287    }
2288
2289    pub fn pain_hotspots_for_snapshot(
2290        &self,
2291        snapshot_id: &str,
2292        workspace: &str,
2293        start_ms: u64,
2294        end_ms: u64,
2295    ) -> Result<Vec<RankedFile>> {
2296        let mut stmt = self.conn.prepare(PAIN_HOTSPOTS_SQL)?;
2297        let rows = stmt.query_map(
2298            params![snapshot_id, workspace, start_ms as i64, end_ms as i64],
2299            ranked_file_row,
2300        )?;
2301        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2302    }
2303
2304    pub fn tool_rank_rows_in_window(
2305        &self,
2306        workspace: &str,
2307        start_ms: u64,
2308        end_ms: u64,
2309    ) -> Result<Vec<RankedTool>> {
2310        let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
2311        let rows = stmt.query_map(
2312            params![workspace, start_ms as i64, end_ms as i64],
2313            ranked_tool_row,
2314        )?;
2315        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2316    }
2317
2318    pub fn tool_spans_in_window(
2319        &self,
2320        workspace: &str,
2321        start_ms: u64,
2322        end_ms: u64,
2323    ) -> Result<Vec<ToolSpanView>> {
2324        let mut stmt = self.conn.prepare(
2325            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2326                    reasoning_tokens, cost_usd_e6, paths_json,
2327                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2328             FROM (
2329                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2330                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2331                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2332                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2333                        ts.started_at_ms AS sort_ms
2334                 FROM tool_spans ts
2335                 JOIN sessions s ON s.id = ts.session_id
2336                 WHERE s.workspace = ?1
2337                   AND ts.started_at_ms >= ?2
2338                   AND ts.started_at_ms <= ?3
2339                 UNION ALL
2340                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2341                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2342                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2343                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2344                        ts.ended_at_ms AS sort_ms
2345                 FROM tool_spans ts
2346                 JOIN sessions s ON s.id = ts.session_id
2347                 WHERE s.workspace = ?1
2348                   AND ts.started_at_ms IS NULL
2349                   AND ts.ended_at_ms >= ?2
2350                   AND ts.ended_at_ms <= ?3
2351             )
2352             ORDER BY sort_ms DESC",
2353        )?;
2354        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2355            let paths_json: String = row.get(8)?;
2356            Ok(ToolSpanView {
2357                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2358                tool: row
2359                    .get::<_, Option<String>>(1)?
2360                    .unwrap_or_else(|| "unknown".into()),
2361                status: row.get(2)?,
2362                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2363                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2364                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2365                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2366                cost_usd_e6: row.get(7)?,
2367                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2368                parent_span_id: row.get(9)?,
2369                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2370                subtree_cost_usd_e6: row.get(11)?,
2371                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2372            })
2373        })?;
2374        Ok(rows.filter_map(|row| row.ok()).collect())
2375    }
2376
2377    pub fn session_span_tree(
2378        &self,
2379        session_id: &str,
2380    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2381        let last_event_seq = self.last_event_seq_for_session(session_id)?;
2382        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2383            && entry.session_id == session_id
2384            && entry.last_event_seq == last_event_seq
2385        {
2386            return Ok(entry.nodes.clone());
2387        }
2388        let mut stmt = self.conn.prepare(
2389            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2390                    reasoning_tokens, cost_usd_e6, paths_json,
2391                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2392             FROM tool_spans
2393             WHERE session_id = ?1
2394             ORDER BY depth ASC, started_at_ms ASC",
2395        )?;
2396        let rows = stmt.query_map(params![session_id], |row| {
2397            let paths_json: String = row.get(8)?;
2398            Ok(crate::metrics::types::ToolSpanView {
2399                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2400                tool: row
2401                    .get::<_, Option<String>>(1)?
2402                    .unwrap_or_else(|| "unknown".into()),
2403                status: row.get(2)?,
2404                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2405                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2406                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2407                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2408                cost_usd_e6: row.get(7)?,
2409                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2410                parent_span_id: row.get(9)?,
2411                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2412                subtree_cost_usd_e6: row.get(11)?,
2413                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2414            })
2415        })?;
2416        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2417        let nodes = crate::store::span_tree::build_tree(spans);
2418        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2419            session_id: session_id.to_string(),
2420            last_event_seq,
2421            nodes: nodes.clone(),
2422        });
2423        Ok(nodes)
2424    }
2425
2426    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2427        let seq = self
2428            .conn
2429            .query_row(
2430                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2431                params![session_id],
2432                |r| r.get::<_, Option<i64>>(0),
2433            )?
2434            .map(|v| v as u64);
2435        Ok(seq)
2436    }
2437
2438    /// Sync-shaped tool spans whose session falls in `[start_ms, end_ms]`. Mirrors
2439    /// `retro_events_in_window` for the spans table so `kaizen telemetry push` can ship
2440    /// `IngestExportBatch::ToolSpans` next to the events batch. Window matches on
2441    /// `started_at_ms` first, falling back to `ended_at_ms` for spans that never started a
2442    /// timer (status-only rows). Workspace filter joins through `sessions.workspace`.
2443    pub fn tool_spans_sync_rows_in_window(
2444        &self,
2445        workspace: &str,
2446        start_ms: u64,
2447        end_ms: u64,
2448    ) -> Result<Vec<ToolSpanSyncRow>> {
2449        let mut stmt = self.conn.prepare(
2450            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2451                    lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2452             FROM (
2453                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2454                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2455                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2456                        ts.started_at_ms AS sort_ms
2457                 FROM tool_spans ts
2458                 JOIN sessions s ON s.id = ts.session_id
2459                 WHERE s.workspace = ?1
2460                   AND ts.started_at_ms IS NOT NULL
2461                   AND ts.started_at_ms >= ?2
2462                   AND ts.started_at_ms <= ?3
2463                 UNION ALL
2464                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2465                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2466                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2467                        ts.ended_at_ms AS sort_ms
2468                 FROM tool_spans ts
2469                 JOIN sessions s ON s.id = ts.session_id
2470                 WHERE s.workspace = ?1
2471                   AND ts.started_at_ms IS NULL
2472                   AND ts.ended_at_ms IS NOT NULL
2473                   AND ts.ended_at_ms >= ?2
2474                   AND ts.ended_at_ms <= ?3
2475             )
2476             ORDER BY sort_ms ASC, span_id ASC",
2477        )?;
2478        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2479            let paths_json: String = row.get(12)?;
2480            Ok(ToolSpanSyncRow {
2481                span_id: row.get(0)?,
2482                session_id: row.get(1)?,
2483                tool: row.get(2)?,
2484                tool_call_id: row.get(3)?,
2485                status: row.get(4)?,
2486                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2487                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2488                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2489                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2490                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2491                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2492                cost_usd_e6: row.get(11)?,
2493                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2494            })
2495        })?;
2496        Ok(rows.filter_map(|row| row.ok()).collect())
2497    }
2498
2499    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2500        let mut stmt = self.conn.prepare(
2501            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2502                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2503             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2504        )?;
2505        let rows = stmt.query_map(params![session_id], |row| {
2506            let paths_json: String = row.get(12)?;
2507            Ok(ToolSpanSyncRow {
2508                span_id: row.get(0)?,
2509                session_id: row.get(1)?,
2510                tool: row.get(2)?,
2511                tool_call_id: row.get(3)?,
2512                status: row.get(4)?,
2513                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2514                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2515                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2516                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2517                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2518                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2519                cost_usd_e6: row.get(11)?,
2520                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2521            })
2522        })?;
2523        Ok(rows.filter_map(|row| row.ok()).collect())
2524    }
2525
2526    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2527        self.conn.execute(
2528            "INSERT OR REPLACE INTO session_evals
2529             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2530             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2531            rusqlite::params![
2532                eval.id,
2533                eval.session_id,
2534                eval.judge_model,
2535                eval.rubric_id,
2536                eval.score,
2537                eval.rationale,
2538                eval.flagged as i64,
2539                eval.created_at_ms as i64,
2540            ],
2541        )?;
2542        Ok(())
2543    }
2544
2545    pub fn list_evals_in_window(
2546        &self,
2547        start_ms: u64,
2548        end_ms: u64,
2549    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2550        let mut stmt = self.conn.prepare(
2551            "SELECT id, session_id, judge_model, rubric_id, score,
2552                    rationale, flagged, created_at_ms
2553             FROM session_evals
2554             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2555             ORDER BY created_at_ms ASC",
2556        )?;
2557        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2558            Ok(crate::eval::types::EvalRow {
2559                id: r.get(0)?,
2560                session_id: r.get(1)?,
2561                judge_model: r.get(2)?,
2562                rubric_id: r.get(3)?,
2563                score: r.get(4)?,
2564                rationale: r.get(5)?,
2565                flagged: r.get::<_, i64>(6)? != 0,
2566                created_at_ms: r.get::<_, i64>(7)? as u64,
2567            })
2568        })?;
2569        rows.collect()
2570    }
2571
2572    pub fn list_evals_for_session(
2573        &self,
2574        session_id: &str,
2575    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2576        let mut stmt = self.conn.prepare(
2577            "SELECT id, session_id, judge_model, rubric_id, score,
2578                    rationale, flagged, created_at_ms
2579             FROM session_evals
2580             WHERE session_id = ?1
2581             ORDER BY created_at_ms DESC",
2582        )?;
2583        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2584            Ok(crate::eval::types::EvalRow {
2585                id: r.get(0)?,
2586                session_id: r.get(1)?,
2587                judge_model: r.get(2)?,
2588                rubric_id: r.get(3)?,
2589                score: r.get(4)?,
2590                rationale: r.get(5)?,
2591                flagged: r.get::<_, i64>(6)? != 0,
2592                created_at_ms: r.get::<_, i64>(7)? as u64,
2593            })
2594        })?;
2595        rows.collect()
2596    }
2597
2598    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2599        use crate::feedback::types::FeedbackLabel;
2600        self.conn.execute(
2601            "INSERT OR REPLACE INTO session_feedback
2602             (id, session_id, score, label, note, created_at_ms)
2603             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2604            rusqlite::params![
2605                r.id,
2606                r.session_id,
2607                r.score.as_ref().map(|s| s.0 as i64),
2608                r.label.as_ref().map(FeedbackLabel::to_db_str),
2609                r.note,
2610                r.created_at_ms as i64,
2611            ],
2612        )?;
2613        let payload = serde_json::to_string(r).unwrap_or_default();
2614        self.conn.execute(
2615            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2616             VALUES (?1, 'session_feedback', ?2, 0)",
2617            rusqlite::params![r.session_id, payload],
2618        )?;
2619        Ok(())
2620    }
2621
2622    pub fn list_feedback_in_window(
2623        &self,
2624        start_ms: u64,
2625        end_ms: u64,
2626    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2627        let mut stmt = self.conn.prepare(
2628            "SELECT id, session_id, score, label, note, created_at_ms
2629             FROM session_feedback
2630             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2631             ORDER BY created_at_ms ASC",
2632        )?;
2633        let rows = stmt.query_map(
2634            rusqlite::params![start_ms as i64, end_ms as i64],
2635            feedback_row,
2636        )?;
2637        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2638    }
2639
2640    pub fn feedback_for_sessions(
2641        &self,
2642        ids: &[String],
2643    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2644        if ids.is_empty() {
2645            return Ok(std::collections::HashMap::new());
2646        }
2647        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2648        let sql = format!(
2649            "SELECT id, session_id, score, label, note, created_at_ms
2650             FROM session_feedback WHERE session_id IN ({placeholders})
2651             ORDER BY created_at_ms DESC"
2652        );
2653        let mut stmt = self.conn.prepare(&sql)?;
2654        let params: Vec<&dyn rusqlite::ToSql> =
2655            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2656        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2657        let mut map = std::collections::HashMap::new();
2658        for row in rows {
2659            let r = row?;
2660            map.entry(r.session_id.clone()).or_insert(r);
2661        }
2662        Ok(map)
2663    }
2664
2665    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2666        self.conn.execute(
2667            "INSERT INTO session_outcomes (
2668                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2669                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2670            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2671            ON CONFLICT(session_id) DO UPDATE SET
2672                test_passed=excluded.test_passed,
2673                test_failed=excluded.test_failed,
2674                test_skipped=excluded.test_skipped,
2675                build_ok=excluded.build_ok,
2676                lint_errors=excluded.lint_errors,
2677                revert_lines_14d=excluded.revert_lines_14d,
2678                pr_open=excluded.pr_open,
2679                ci_ok=excluded.ci_ok,
2680                measured_at_ms=excluded.measured_at_ms,
2681                measure_error=excluded.measure_error",
2682            params![
2683                row.session_id,
2684                row.test_passed,
2685                row.test_failed,
2686                row.test_skipped,
2687                row.build_ok.map(bool_to_i64),
2688                row.lint_errors,
2689                row.revert_lines_14d,
2690                row.pr_open,
2691                row.ci_ok.map(bool_to_i64),
2692                row.measured_at_ms as i64,
2693                row.measure_error.as_deref(),
2694            ],
2695        )?;
2696        Ok(())
2697    }
2698
2699    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2700        let mut stmt = self.conn.prepare(
2701            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2702                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2703             FROM session_outcomes WHERE session_id = ?1",
2704        )?;
2705        let row = stmt
2706            .query_row(params![session_id], outcome_row)
2707            .optional()?;
2708        Ok(row)
2709    }
2710
2711    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2712    pub fn list_session_outcomes_in_window(
2713        &self,
2714        workspace: &str,
2715        start_ms: u64,
2716        end_ms: u64,
2717    ) -> Result<Vec<SessionOutcomeRow>> {
2718        let mut stmt = self.conn.prepare(
2719            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2720                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2721             FROM session_outcomes o
2722             JOIN sessions s ON s.id = o.session_id
2723             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2724             ORDER BY o.measured_at_ms ASC",
2725        )?;
2726        let rows = stmt.query_map(
2727            params![workspace, start_ms as i64, end_ms as i64],
2728            outcome_row,
2729        )?;
2730        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2731    }
2732
2733    pub fn append_session_sample(
2734        &self,
2735        session_id: &str,
2736        ts_ms: u64,
2737        pid: u32,
2738        cpu_percent: Option<f64>,
2739        rss_bytes: Option<u64>,
2740    ) -> Result<()> {
2741        self.conn.execute(
2742            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2743             VALUES (?1, ?2, ?3, ?4, ?5)",
2744            params![
2745                session_id,
2746                ts_ms as i64,
2747                pid as i64,
2748                cpu_percent,
2749                rss_bytes.map(|b| b as i64)
2750            ],
2751        )?;
2752        Ok(())
2753    }
2754
2755    /// Per-session maxima for retro heuristics.
2756    pub fn list_session_sample_aggs_in_window(
2757        &self,
2758        workspace: &str,
2759        start_ms: u64,
2760        end_ms: u64,
2761    ) -> Result<Vec<SessionSampleAgg>> {
2762        let mut stmt = self.conn.prepare(
2763            "SELECT ss.session_id, COUNT(*) AS n,
2764                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2765             FROM session_samples ss
2766             JOIN sessions s ON s.id = ss.session_id
2767             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2768             GROUP BY ss.session_id",
2769        )?;
2770        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2771            let sid: String = r.get(0)?;
2772            let n: i64 = r.get(1)?;
2773            let max_cpu: Option<f64> = r.get(2)?;
2774            let max_rss: Option<i64> = r.get(3)?;
2775            Ok(SessionSampleAgg {
2776                session_id: sid,
2777                sample_count: n as u64,
2778                max_cpu_percent: max_cpu.unwrap_or(0.0),
2779                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2780            })
2781        })?;
2782        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2783    }
2784
2785    pub fn list_sessions_for_eval(
2786        &self,
2787        since_ms: u64,
2788        min_cost_usd: f64,
2789    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2790        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2791        let mut stmt = self.conn.prepare(
2792            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2793                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2794                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2795                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2796             FROM sessions s
2797             WHERE s.started_at_ms >= ?1
2798               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2799               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2800             ORDER BY s.started_at_ms DESC",
2801        )?;
2802        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2803            Ok((
2804                r.get::<_, String>(0)?,
2805                r.get::<_, String>(1)?,
2806                r.get::<_, Option<String>>(2)?,
2807                r.get::<_, String>(3)?,
2808                r.get::<_, i64>(4)?,
2809                r.get::<_, Option<i64>>(5)?,
2810                r.get::<_, String>(6)?,
2811                r.get::<_, String>(7)?,
2812                r.get::<_, Option<String>>(8)?,
2813                r.get::<_, Option<String>>(9)?,
2814                r.get::<_, Option<String>>(10)?,
2815                r.get::<_, Option<i64>>(11)?,
2816                r.get::<_, Option<i64>>(12)?,
2817                r.get::<_, Option<String>>(13)?,
2818                r.get::<_, Option<String>>(14)?,
2819                r.get::<_, Option<String>>(15)?,
2820                r.get::<_, Option<String>>(16)?,
2821                r.get::<_, Option<String>>(17)?,
2822                r.get::<_, Option<String>>(18)?,
2823                r.get::<_, Option<i64>>(19)?,
2824                r.get::<_, Option<i64>>(20)?,
2825            ))
2826        })?;
2827        let mut out = Vec::new();
2828        for row in rows {
2829            let (
2830                id,
2831                agent,
2832                model,
2833                workspace,
2834                started,
2835                ended,
2836                status_str,
2837                trace,
2838                start_commit,
2839                end_commit,
2840                branch,
2841                dirty_start,
2842                dirty_end,
2843                source,
2844                prompt_fingerprint,
2845                parent_session_id,
2846                agent_version,
2847                os,
2848                arch,
2849                repo_file_count,
2850                repo_total_loc,
2851            ) = row?;
2852            out.push(crate::core::event::SessionRecord {
2853                id,
2854                agent,
2855                model,
2856                workspace,
2857                started_at_ms: started as u64,
2858                ended_at_ms: ended.map(|v| v as u64),
2859                status: status_from_str(&status_str),
2860                trace_path: trace,
2861                start_commit,
2862                end_commit,
2863                branch,
2864                dirty_start: dirty_start.map(i64_to_bool),
2865                dirty_end: dirty_end.map(i64_to_bool),
2866                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2867                prompt_fingerprint,
2868                parent_session_id,
2869                agent_version,
2870                os,
2871                arch,
2872                repo_file_count: repo_file_count.map(|v| v as u32),
2873                repo_total_loc: repo_total_loc.map(|v| v as u64),
2874            });
2875        }
2876        Ok(out)
2877    }
2878
2879    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2880        self.conn.execute(
2881            "INSERT OR IGNORE INTO prompt_snapshots
2882             (fingerprint, captured_at_ms, files_json, total_bytes)
2883             VALUES (?1, ?2, ?3, ?4)",
2884            params![
2885                snap.fingerprint,
2886                snap.captured_at_ms as i64,
2887                snap.files_json,
2888                snap.total_bytes as i64
2889            ],
2890        )?;
2891        Ok(())
2892    }
2893
2894    pub fn get_prompt_snapshot(
2895        &self,
2896        fingerprint: &str,
2897    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2898        self.conn
2899            .query_row(
2900                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2901                 FROM prompt_snapshots WHERE fingerprint = ?1",
2902                params![fingerprint],
2903                |r| {
2904                    Ok(crate::prompt::PromptSnapshot {
2905                        fingerprint: r.get(0)?,
2906                        captured_at_ms: r.get::<_, i64>(1)? as u64,
2907                        files_json: r.get(2)?,
2908                        total_bytes: r.get::<_, i64>(3)? as u64,
2909                    })
2910                },
2911            )
2912            .optional()
2913            .map_err(Into::into)
2914    }
2915
2916    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2917        let mut stmt = self.conn.prepare(
2918            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2919             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2920        )?;
2921        let rows = stmt.query_map([], |r| {
2922            Ok(crate::prompt::PromptSnapshot {
2923                fingerprint: r.get(0)?,
2924                captured_at_ms: r.get::<_, i64>(1)? as u64,
2925                files_json: r.get(2)?,
2926                total_bytes: r.get::<_, i64>(3)? as u64,
2927            })
2928        })?;
2929        Ok(rows.filter_map(|r| r.ok()).collect())
2930    }
2931
2932    /// Sessions with a non-null prompt_fingerprint in the given window.
2933    pub fn sessions_with_prompt_fingerprint(
2934        &self,
2935        workspace: &str,
2936        start_ms: u64,
2937        end_ms: u64,
2938    ) -> Result<Vec<(String, String)>> {
2939        let mut stmt = self.conn.prepare(
2940            "SELECT id, prompt_fingerprint FROM sessions
2941             WHERE workspace = ?1
2942               AND started_at_ms >= ?2 AND started_at_ms < ?3
2943               AND prompt_fingerprint IS NOT NULL",
2944        )?;
2945        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2946            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2947        })?;
2948        Ok(rows.filter_map(|r| r.ok()).collect())
2949    }
2950}
2951
2952impl Drop for Store {
2953    fn drop(&mut self) {
2954        if let Some(writer) = self.search_writer.get_mut().as_mut() {
2955            let _ = writer.commit();
2956        }
2957    }
2958}
2959
2960fn now_ms() -> u64 {
2961    std::time::SystemTime::now()
2962        .duration_since(std::time::UNIX_EPOCH)
2963        .unwrap_or_default()
2964        .as_millis() as u64
2965}
2966
2967fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
2968    let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
2969    let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
2970    Ok(rows.filter_map(|r| r.ok()).collect())
2971}
2972
2973fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2974    raw.and_then(|s| s.trim().parse::<u64>().ok())
2975        .unwrap_or(DEFAULT_MMAP_MB)
2976        .saturating_mul(1024)
2977        .saturating_mul(1024)
2978        .min(i64::MAX as u64) as i64
2979}
2980
2981fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2982    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2983    conn.execute_batch(&format!(
2984        "
2985        PRAGMA journal_mode=WAL;
2986        PRAGMA busy_timeout=5000;
2987        PRAGMA synchronous=NORMAL;
2988        PRAGMA cache_size=-65536;
2989        PRAGMA mmap_size={mmap_size};
2990        PRAGMA temp_store=MEMORY;
2991        PRAGMA wal_autocheckpoint=1000;
2992        "
2993    ))?;
2994    if mode == StoreOpenMode::ReadOnlyQuery {
2995        conn.execute_batch("PRAGMA query_only=ON;")?;
2996    }
2997    Ok(())
2998}
2999
3000fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
3001    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
3002}
3003
3004fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
3005    let status_str: String = row.get(6)?;
3006    Ok(SessionRecord {
3007        id: row.get(0)?,
3008        agent: row.get(1)?,
3009        model: row.get(2)?,
3010        workspace: row.get(3)?,
3011        started_at_ms: row.get::<_, i64>(4)? as u64,
3012        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3013        status: status_from_str(&status_str),
3014        trace_path: row.get(7)?,
3015        start_commit: row.get(8)?,
3016        end_commit: row.get(9)?,
3017        branch: row.get(10)?,
3018        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3019        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3020        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
3021        prompt_fingerprint: row.get(14)?,
3022        parent_session_id: row.get(15)?,
3023        agent_version: row.get(16)?,
3024        os: row.get(17)?,
3025        arch: row.get(18)?,
3026        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3027        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3028    })
3029}
3030
3031fn ranked_file_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedFile> {
3032    Ok(RankedFile {
3033        path: row.get(0)?,
3034        value: row.get::<_, i64>(1)? as u64,
3035        complexity_total: row.get::<_, i64>(2)? as u32,
3036        churn_30d: row.get::<_, i64>(3)? as u32,
3037    })
3038}
3039
3040fn ranked_tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedTool> {
3041    Ok(RankedTool {
3042        tool: row.get(0)?,
3043        calls: row.get::<_, i64>(1)? as u64,
3044        p50_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
3045        p95_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
3046        total_tokens: row.get::<_, i64>(4)? as u64,
3047        total_reasoning_tokens: row.get::<_, i64>(5)? as u64,
3048    })
3049}
3050
3051fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
3052    let payload_str: String = row.get(12)?;
3053    Ok(Event {
3054        session_id: row.get(0)?,
3055        seq: row.get::<_, i64>(1)? as u64,
3056        ts_ms: row.get::<_, i64>(2)? as u64,
3057        ts_exact: row.get::<_, i64>(3)? != 0,
3058        kind: kind_from_str(&row.get::<_, String>(4)?),
3059        source: source_from_str(&row.get::<_, String>(5)?),
3060        tool: row.get(6)?,
3061        tool_call_id: row.get(7)?,
3062        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
3063        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3064        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
3065        cost_usd_e6: row.get(11)?,
3066        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
3067        stop_reason: row.get(13)?,
3068        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3069        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
3070        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
3071        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3072        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
3073        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3074        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
3075        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
3076    })
3077}
3078
3079fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> {
3080    Ok((row.get(22)?, event_row(row)?))
3081}
3082
3083fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
3084    let mut clauses = vec!["workspace = ?".to_string()];
3085    let mut args = vec![Value::Text(workspace.to_string())];
3086    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
3087        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
3088        args.push(Value::Text(format!("{}%", escape_like(prefix))));
3089    }
3090    if let Some(status) = &filter.status {
3091        clauses.push("status = ?".to_string());
3092        args.push(Value::Text(format!("{status:?}")));
3093    }
3094    if let Some(since_ms) = filter.since_ms {
3095        clauses.push("started_at_ms >= ?".to_string());
3096        args.push(Value::Integer(since_ms as i64));
3097    }
3098    (format!("WHERE {}", clauses.join(" AND ")), args)
3099}
3100
3101fn escape_like(raw: &str) -> String {
3102    raw.to_lowercase()
3103        .replace('\\', "\\\\")
3104        .replace('%', "\\%")
3105        .replace('_', "\\_")
3106}
3107
3108fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
3109    let cost: i64 = conn.query_row(
3110        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
3111        params![workspace], |r| r.get(0),
3112    )?;
3113    let with_cost: i64 = conn.query_row(
3114        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
3115        params![workspace], |r| r.get(0),
3116    )?;
3117    Ok((cost, with_cost as u64))
3118}
3119
3120fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
3121    let build_raw: Option<i64> = r.get(4)?;
3122    let ci_raw: Option<i64> = r.get(8)?;
3123    Ok(SessionOutcomeRow {
3124        session_id: r.get(0)?,
3125        test_passed: r.get(1)?,
3126        test_failed: r.get(2)?,
3127        test_skipped: r.get(3)?,
3128        build_ok: build_raw.map(|v| v != 0),
3129        lint_errors: r.get(5)?,
3130        revert_lines_14d: r.get(6)?,
3131        pr_open: r.get(7)?,
3132        ci_ok: ci_raw.map(|v| v != 0),
3133        measured_at_ms: r.get::<_, i64>(9)? as u64,
3134        measure_error: r.get(10)?,
3135    })
3136}
3137
3138fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
3139    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
3140    let score = r
3141        .get::<_, Option<i64>>(2)?
3142        .and_then(|v| FeedbackScore::new(v as u8));
3143    let label = r
3144        .get::<_, Option<String>>(3)?
3145        .and_then(|s| FeedbackLabel::from_str_opt(&s));
3146    Ok(FeedbackRecord {
3147        id: r.get(0)?,
3148        session_id: r.get(1)?,
3149        score,
3150        label,
3151        note: r.get(4)?,
3152        created_at_ms: r.get::<_, i64>(5)? as u64,
3153    })
3154}
3155
3156fn day_label(day_idx: u64) -> &'static str {
3157    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
3158}
3159
3160fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
3161    let week_ago = now.saturating_sub(7 * 86_400_000);
3162    let mut stmt = conn
3163        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
3164    let days: Vec<u64> = stmt
3165        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
3166        .filter_map(|r| r.ok())
3167        .map(|v| v as u64 / 86_400_000)
3168        .collect();
3169    let today = now / 86_400_000;
3170    Ok((0u64..7)
3171        .map(|i| {
3172            let d = today.saturating_sub(6 - i);
3173            (
3174                day_label(d).to_string(),
3175                days.iter().filter(|&&x| x == d).count() as u64,
3176            )
3177        })
3178        .collect())
3179}
3180
3181fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3182    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3183               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3184               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3185               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3186               COUNT(e.id) FROM sessions s \
3187               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3188               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3189    let mut stmt = conn.prepare(sql)?;
3190    let out: Vec<(SessionRecord, u64)> = stmt
3191        .query_map(params![workspace], |r| {
3192            let st: String = r.get(6)?;
3193            Ok((
3194                SessionRecord {
3195                    id: r.get(0)?,
3196                    agent: r.get(1)?,
3197                    model: r.get(2)?,
3198                    workspace: r.get(3)?,
3199                    started_at_ms: r.get::<_, i64>(4)? as u64,
3200                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3201                    status: status_from_str(&st),
3202                    trace_path: r.get(7)?,
3203                    start_commit: r.get(8)?,
3204                    end_commit: r.get(9)?,
3205                    branch: r.get(10)?,
3206                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3207                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3208                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3209                    prompt_fingerprint: r.get(14)?,
3210                    parent_session_id: r.get(15)?,
3211                    agent_version: r.get(16)?,
3212                    os: r.get(17)?,
3213                    arch: r.get(18)?,
3214                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3215                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3216                },
3217                r.get::<_, i64>(21)? as u64,
3218            ))
3219        })?
3220        .filter_map(|r| r.ok())
3221        .collect();
3222    Ok(out)
3223}
3224
3225fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3226    let mut stmt = conn.prepare(
3227        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3228         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3229    )?;
3230    let out: Vec<(String, u64)> = stmt
3231        .query_map(params![workspace], |r| {
3232            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3233        })?
3234        .filter_map(|r| r.ok())
3235        .collect();
3236    Ok(out)
3237}
3238
3239fn status_from_str(s: &str) -> SessionStatus {
3240    match s {
3241        "Running" => SessionStatus::Running,
3242        "Waiting" => SessionStatus::Waiting,
3243        "Idle" => SessionStatus::Idle,
3244        _ => SessionStatus::Done,
3245    }
3246}
3247
3248fn projector_legacy_mode() -> bool {
3249    std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3250}
3251
3252fn is_stop_event(e: &Event) -> bool {
3253    if !matches!(e.kind, EventKind::Hook) {
3254        return false;
3255    }
3256    e.payload
3257        .get("event")
3258        .and_then(|v| v.as_str())
3259        .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3260        == Some("Stop")
3261}
3262
3263fn kind_from_str(s: &str) -> EventKind {
3264    match s {
3265        "ToolCall" => EventKind::ToolCall,
3266        "ToolResult" => EventKind::ToolResult,
3267        "Message" => EventKind::Message,
3268        "Error" => EventKind::Error,
3269        "Cost" => EventKind::Cost,
3270        "Hook" => EventKind::Hook,
3271        "Lifecycle" => EventKind::Lifecycle,
3272        _ => EventKind::Hook,
3273    }
3274}
3275
3276fn source_from_str(s: &str) -> EventSource {
3277    match s {
3278        "Tail" => EventSource::Tail,
3279        "Hook" => EventSource::Hook,
3280        _ => EventSource::Proxy,
3281    }
3282}
3283
3284fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3285    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3286    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3287    ensure_column(conn, "sessions", "branch", "TEXT")?;
3288    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3289    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3290    ensure_column(
3291        conn,
3292        "sessions",
3293        "repo_binding_source",
3294        "TEXT NOT NULL DEFAULT ''",
3295    )?;
3296    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3297    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3298    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3299    ensure_column(conn, "events", "stop_reason", "TEXT")?;
3300    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3301    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3302    ensure_column(conn, "events", "retry_count", "INTEGER")?;
3303    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3304    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3305    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3306    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3307    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3308    ensure_column(
3309        conn,
3310        "sync_outbox",
3311        "kind",
3312        "TEXT NOT NULL DEFAULT 'events'",
3313    )?;
3314    ensure_column(
3315        conn,
3316        "experiments",
3317        "state",
3318        "TEXT NOT NULL DEFAULT 'Draft'",
3319    )?;
3320    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3321    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3322    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3323    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3324    ensure_column(conn, "sessions", "os", "TEXT")?;
3325    ensure_column(conn, "sessions", "arch", "TEXT")?;
3326    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3327    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3328    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3329    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3330    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3331    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3332    conn.execute_batch(
3333        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3334         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3335    )?;
3336    Ok(())
3337}
3338
3339fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3340    if has_column(conn, table, column)? {
3341        return Ok(());
3342    }
3343    conn.execute(
3344        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
3345        [],
3346    )?;
3347    Ok(())
3348}
3349
3350fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3351    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3352    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3353    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3354}
3355
3356fn bool_to_i64(v: bool) -> i64 {
3357    if v { 1 } else { 0 }
3358}
3359
3360fn i64_to_bool(v: i64) -> bool {
3361    v != 0
3362}
3363
3364fn empty_to_none(s: String) -> Option<String> {
3365    if s.is_empty() { None } else { Some(s) }
3366}
3367
3368#[cfg(test)]
3369mod tests {
3370    use super::*;
3371    use serde_json::json;
3372    use std::collections::HashSet;
3373    use tempfile::TempDir;
3374
3375    fn make_session(id: &str) -> SessionRecord {
3376        SessionRecord {
3377            id: id.to_string(),
3378            agent: "cursor".to_string(),
3379            model: None,
3380            workspace: "/ws".to_string(),
3381            started_at_ms: 1000,
3382            ended_at_ms: None,
3383            status: SessionStatus::Done,
3384            trace_path: "/trace".to_string(),
3385            start_commit: None,
3386            end_commit: None,
3387            branch: None,
3388            dirty_start: None,
3389            dirty_end: None,
3390            repo_binding_source: None,
3391            prompt_fingerprint: None,
3392            parent_session_id: None,
3393            agent_version: None,
3394            os: None,
3395            arch: None,
3396            repo_file_count: None,
3397            repo_total_loc: None,
3398        }
3399    }
3400
3401    fn make_event(session_id: &str, seq: u64) -> Event {
3402        Event {
3403            session_id: session_id.to_string(),
3404            seq,
3405            ts_ms: 1000 + seq * 100,
3406            ts_exact: false,
3407            kind: EventKind::ToolCall,
3408            source: EventSource::Tail,
3409            tool: Some("read_file".to_string()),
3410            tool_call_id: Some(format!("call_{seq}")),
3411            tokens_in: None,
3412            tokens_out: None,
3413            reasoning_tokens: None,
3414            cost_usd_e6: None,
3415            stop_reason: None,
3416            latency_ms: None,
3417            ttft_ms: None,
3418            retry_count: None,
3419            context_used_tokens: None,
3420            context_max_tokens: None,
3421            cache_creation_tokens: None,
3422            cache_read_tokens: None,
3423            system_prompt_tokens: None,
3424            payload: json!({}),
3425        }
3426    }
3427
3428    #[test]
3429    fn open_and_wal_mode() {
3430        let dir = TempDir::new().unwrap();
3431        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3432        let mode: String = store
3433            .conn
3434            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3435            .unwrap();
3436        assert_eq!(mode, "wal");
3437    }
3438
3439    #[test]
3440    fn open_applies_phase0_pragmas() {
3441        let dir = TempDir::new().unwrap();
3442        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3443        let synchronous: i64 = store
3444            .conn
3445            .query_row("PRAGMA synchronous", [], |r| r.get(0))
3446            .unwrap();
3447        let cache_size: i64 = store
3448            .conn
3449            .query_row("PRAGMA cache_size", [], |r| r.get(0))
3450            .unwrap();
3451        let temp_store: i64 = store
3452            .conn
3453            .query_row("PRAGMA temp_store", [], |r| r.get(0))
3454            .unwrap();
3455        let wal_autocheckpoint: i64 = store
3456            .conn
3457            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3458            .unwrap();
3459        assert_eq!(synchronous, 1);
3460        assert_eq!(cache_size, -65_536);
3461        assert_eq!(temp_store, 2);
3462        assert_eq!(wal_autocheckpoint, 1_000);
3463        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3464    }
3465
3466    #[test]
3467    fn read_only_open_sets_query_only() {
3468        let dir = TempDir::new().unwrap();
3469        let db = dir.path().join("kaizen.db");
3470        Store::open(&db).unwrap();
3471        let store = Store::open_read_only(&db).unwrap();
3472        let query_only: i64 = store
3473            .conn
3474            .query_row("PRAGMA query_only", [], |r| r.get(0))
3475            .unwrap();
3476        assert_eq!(query_only, 1);
3477    }
3478
3479    #[test]
3480    fn phase0_indexes_exist() {
3481        let dir = TempDir::new().unwrap();
3482        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3483        for name in [
3484            "tool_spans_session_idx",
3485            "tool_spans_started_idx",
3486            "session_samples_ts_idx",
3487            "events_ts_idx",
3488            "feedback_session_idx",
3489        ] {
3490            let found: i64 = store
3491                .conn
3492                .query_row(
3493                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3494                    params![name],
3495                    |r| r.get(0),
3496                )
3497                .unwrap();
3498            assert_eq!(found, 1, "{name}");
3499        }
3500    }
3501
3502    #[test]
3503    fn upsert_and_get_session() {
3504        let dir = TempDir::new().unwrap();
3505        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3506        let s = make_session("s1");
3507        store.upsert_session(&s).unwrap();
3508
3509        let got = store.get_session("s1").unwrap().unwrap();
3510        assert_eq!(got.id, "s1");
3511        assert_eq!(got.status, SessionStatus::Done);
3512    }
3513
3514    #[test]
3515    fn append_and_list_events_round_trip() {
3516        let dir = TempDir::new().unwrap();
3517        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3518        let s = make_session("s2");
3519        store.upsert_session(&s).unwrap();
3520        store.append_event(&make_event("s2", 0)).unwrap();
3521        store.append_event(&make_event("s2", 1)).unwrap();
3522
3523        let sessions = store.list_sessions("/ws").unwrap();
3524        assert_eq!(sessions.len(), 1);
3525        assert_eq!(sessions[0].id, "s2");
3526    }
3527
3528    #[test]
3529    fn list_sessions_page_orders_and_counts() {
3530        let dir = TempDir::new().unwrap();
3531        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3532        let mut a = make_session("a");
3533        a.started_at_ms = 2_000;
3534        let mut b = make_session("b");
3535        b.started_at_ms = 2_000;
3536        let mut c = make_session("c");
3537        c.started_at_ms = 1_000;
3538        store.upsert_session(&c).unwrap();
3539        store.upsert_session(&b).unwrap();
3540        store.upsert_session(&a).unwrap();
3541
3542        let page = store
3543            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3544            .unwrap();
3545        assert_eq!(page.total, 3);
3546        assert_eq!(page.next_offset, Some(2));
3547        assert_eq!(
3548            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3549            vec!["a", "b"]
3550        );
3551
3552        let all = store.list_sessions("/ws").unwrap();
3553        assert_eq!(
3554            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3555            vec!["a", "b", "c"]
3556        );
3557    }
3558
3559    #[test]
3560    fn list_sessions_page_filters_in_sql_shape() {
3561        let dir = TempDir::new().unwrap();
3562        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3563        let mut cursor = make_session("cursor");
3564        cursor.agent = "Cursor".into();
3565        cursor.started_at_ms = 2_000;
3566        cursor.status = SessionStatus::Running;
3567        let mut claude = make_session("claude");
3568        claude.agent = "claude".into();
3569        claude.started_at_ms = 3_000;
3570        store.upsert_session(&cursor).unwrap();
3571        store.upsert_session(&claude).unwrap();
3572
3573        let page = store
3574            .list_sessions_page(
3575                "/ws",
3576                0,
3577                10,
3578                SessionFilter {
3579                    agent_prefix: Some("cur".into()),
3580                    status: Some(SessionStatus::Running),
3581                    since_ms: Some(1_500),
3582                },
3583            )
3584            .unwrap();
3585        assert_eq!(page.total, 1);
3586        assert_eq!(page.rows[0].id, "cursor");
3587    }
3588
3589    #[test]
3590    fn incremental_session_helpers_find_new_rows_and_statuses() {
3591        let dir = TempDir::new().unwrap();
3592        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3593        let mut old = make_session("old");
3594        old.started_at_ms = 1_000;
3595        let mut new = make_session("new");
3596        new.started_at_ms = 2_000;
3597        new.status = SessionStatus::Running;
3598        store.upsert_session(&old).unwrap();
3599        store.upsert_session(&new).unwrap();
3600
3601        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3602        assert_eq!(rows.len(), 1);
3603        assert_eq!(rows[0].id, "new");
3604
3605        store
3606            .update_session_status("new", SessionStatus::Done)
3607            .unwrap();
3608        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3609        assert_eq!(statuses.len(), 1);
3610        assert_eq!(statuses[0].status, SessionStatus::Done);
3611    }
3612
3613    #[test]
3614    fn summary_stats_empty() {
3615        let dir = TempDir::new().unwrap();
3616        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3617        let stats = store.summary_stats("/ws").unwrap();
3618        assert_eq!(stats.session_count, 0);
3619        assert_eq!(stats.total_cost_usd_e6, 0);
3620    }
3621
3622    #[test]
3623    fn summary_stats_counts_sessions() {
3624        let dir = TempDir::new().unwrap();
3625        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3626        store.upsert_session(&make_session("a")).unwrap();
3627        store.upsert_session(&make_session("b")).unwrap();
3628        let stats = store.summary_stats("/ws").unwrap();
3629        assert_eq!(stats.session_count, 2);
3630        assert_eq!(stats.by_agent.len(), 1);
3631        assert_eq!(stats.by_agent[0].0, "cursor");
3632        assert_eq!(stats.by_agent[0].1, 2);
3633    }
3634
3635    #[test]
3636    fn list_events_for_session_round_trip() {
3637        let dir = TempDir::new().unwrap();
3638        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3639        store.upsert_session(&make_session("s4")).unwrap();
3640        store.append_event(&make_event("s4", 0)).unwrap();
3641        store.append_event(&make_event("s4", 1)).unwrap();
3642        let events = store.list_events_for_session("s4").unwrap();
3643        assert_eq!(events.len(), 2);
3644        assert_eq!(events[0].seq, 0);
3645        assert_eq!(events[1].seq, 1);
3646    }
3647
3648    #[test]
3649    fn list_events_page_uses_inclusive_seq_cursor() {
3650        let dir = TempDir::new().unwrap();
3651        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3652        store.upsert_session(&make_session("paged")).unwrap();
3653        for seq in 0..5 {
3654            store.append_event(&make_event("paged", seq)).unwrap();
3655        }
3656        let first = store.list_events_page("paged", 0, 2).unwrap();
3657        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3658        let second = store
3659            .list_events_page("paged", first[1].seq + 1, 2)
3660            .unwrap();
3661        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3662    }
3663
3664    #[test]
3665    fn append_event_dedup() {
3666        let dir = TempDir::new().unwrap();
3667        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3668        store.upsert_session(&make_session("s5")).unwrap();
3669        store.append_event(&make_event("s5", 0)).unwrap();
3670        // Duplicate — should be silently ignored
3671        store.append_event(&make_event("s5", 0)).unwrap();
3672        let events = store.list_events_for_session("s5").unwrap();
3673        assert_eq!(events.len(), 1);
3674    }
3675
3676    #[test]
3677    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3678        let dir = TempDir::new().unwrap();
3679        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3680        assert!(store.session_span_tree("missing").unwrap().is_empty());
3681        assert!(store.span_tree_cache.borrow().is_some());
3682
3683        store.upsert_session(&make_session("tree")).unwrap();
3684        let call = make_event("tree", 0);
3685        store.append_event(&call).unwrap();
3686        assert!(store.span_tree_cache.borrow().is_none());
3687        assert!(store.session_span_tree("tree").unwrap().is_empty());
3688        assert!(store.span_tree_cache.borrow().is_some());
3689        let mut result = make_event("tree", 1);
3690        result.kind = EventKind::ToolResult;
3691        result.tool_call_id = call.tool_call_id.clone();
3692        store.append_event(&result).unwrap();
3693        assert!(store.span_tree_cache.borrow().is_none());
3694        let first = store.session_span_tree("tree").unwrap();
3695        assert_eq!(first.len(), 1);
3696        assert!(store.span_tree_cache.borrow().is_some());
3697        store.append_event(&make_event("tree", 2)).unwrap();
3698        assert!(store.span_tree_cache.borrow().is_none());
3699    }
3700
3701    #[test]
3702    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3703        let dir = TempDir::new().unwrap();
3704        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3705        store.upsert_session(&make_session("spans")).unwrap();
3706        for (id, started, ended) in [
3707            ("started", Some(200_i64), None),
3708            ("fallback", None, Some(250_i64)),
3709            ("outside", Some(400_i64), None),
3710            ("too_old", None, Some(50_i64)),
3711            ("started_wins", Some(500_i64), Some(200_i64)),
3712        ] {
3713            store
3714                .conn
3715                .execute(
3716                    "INSERT INTO tool_spans
3717                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3718                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3719                    params![id, started, ended],
3720                )
3721                .unwrap();
3722        }
3723        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3724        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3725        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3726    }
3727
3728    #[test]
3729    fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3730        let dir = TempDir::new().unwrap();
3731        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3732        store.upsert_session(&make_session("s1")).unwrap();
3733        for (id, started, ended) in [
3734            ("inside_started", Some(150_i64), None),
3735            ("inside_ended_only", None, Some(220_i64)),
3736            ("after_window", Some(400_i64), None),
3737            ("before_window", None, Some(50_i64)),
3738        ] {
3739            store
3740                .conn
3741                .execute(
3742                    "INSERT INTO tool_spans
3743                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3744                     VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3745                    params![id, started, ended],
3746                )
3747                .unwrap();
3748        }
3749        let rows = store
3750            .tool_spans_sync_rows_in_window("/ws", 100, 300)
3751            .unwrap();
3752        let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
3753        assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
3754        assert!(rows.iter().all(|r| r.session_id == "s1"));
3755    }
3756
3757    #[test]
3758    fn upsert_idempotent() {
3759        let dir = TempDir::new().unwrap();
3760        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3761        let mut s = make_session("s3");
3762        store.upsert_session(&s).unwrap();
3763        s.status = SessionStatus::Running;
3764        store.upsert_session(&s).unwrap();
3765
3766        let got = store.get_session("s3").unwrap().unwrap();
3767        assert_eq!(got.status, SessionStatus::Running);
3768    }
3769
3770    #[test]
3771    fn append_event_indexes_path_from_payload() {
3772        let dir = TempDir::new().unwrap();
3773        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3774        store.upsert_session(&make_session("sx")).unwrap();
3775        let mut ev = make_event("sx", 0);
3776        ev.payload = json!({"input": {"path": "src/lib.rs"}});
3777        store.append_event(&ev).unwrap();
3778        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3779        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3780    }
3781
3782    #[test]
3783    fn update_session_status_changes_status() {
3784        let dir = TempDir::new().unwrap();
3785        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3786        store.upsert_session(&make_session("s6")).unwrap();
3787        store
3788            .update_session_status("s6", SessionStatus::Running)
3789            .unwrap();
3790        let got = store.get_session("s6").unwrap().unwrap();
3791        assert_eq!(got.status, SessionStatus::Running);
3792    }
3793
3794    #[test]
3795    fn prune_sessions_removes_old_rows_and_keeps_recent() {
3796        let dir = TempDir::new().unwrap();
3797        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3798        let mut old = make_session("old");
3799        old.started_at_ms = 1_000;
3800        let mut new = make_session("new");
3801        new.started_at_ms = 9_000_000_000_000;
3802        store.upsert_session(&old).unwrap();
3803        store.upsert_session(&new).unwrap();
3804        store.append_event(&make_event("old", 0)).unwrap();
3805
3806        let stats = store.prune_sessions_started_before(5_000).unwrap();
3807        assert_eq!(
3808            stats,
3809            PruneStats {
3810                sessions_removed: 1,
3811                events_removed: 1,
3812            }
3813        );
3814        assert!(store.get_session("old").unwrap().is_none());
3815        assert!(store.get_session("new").unwrap().is_some());
3816        let sessions = store.list_sessions("/ws").unwrap();
3817        assert_eq!(sessions.len(), 1);
3818        assert_eq!(sessions[0].id, "new");
3819    }
3820
3821    #[test]
3822    fn append_event_indexes_rules_from_payload() {
3823        let dir = TempDir::new().unwrap();
3824        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3825        store.upsert_session(&make_session("sr")).unwrap();
3826        let mut ev = make_event("sr", 0);
3827        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3828        store.append_event(&ev).unwrap();
3829        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3830        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3831    }
3832
3833    #[test]
3834    fn guidance_report_counts_skill_and_rule_sessions() {
3835        let dir = TempDir::new().unwrap();
3836        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3837        store.upsert_session(&make_session("sx")).unwrap();
3838        let mut ev = make_event("sx", 0);
3839        ev.payload =
3840            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3841        ev.cost_usd_e6 = Some(500_000);
3842        store.append_event(&ev).unwrap();
3843
3844        let mut skill_slugs = HashSet::new();
3845        skill_slugs.insert("tdd".into());
3846        let mut rule_slugs = HashSet::new();
3847        rule_slugs.insert("style".into());
3848
3849        let rep = store
3850            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3851            .unwrap();
3852        assert_eq!(rep.sessions_in_window, 1);
3853        let tdd = rep
3854            .rows
3855            .iter()
3856            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3857            .unwrap();
3858        assert_eq!(tdd.sessions, 1);
3859        assert!(tdd.on_disk);
3860        let style = rep
3861            .rows
3862            .iter()
3863            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3864            .unwrap();
3865        assert_eq!(style.sessions, 1);
3866        assert!(style.on_disk);
3867    }
3868
3869    #[test]
3870    fn prune_sessions_removes_rules_used_rows() {
3871        let dir = TempDir::new().unwrap();
3872        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3873        let mut old = make_session("old_r");
3874        old.started_at_ms = 1_000;
3875        store.upsert_session(&old).unwrap();
3876        let mut ev = make_event("old_r", 0);
3877        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3878        store.append_event(&ev).unwrap();
3879
3880        store.prune_sessions_started_before(5_000).unwrap();
3881        let n: i64 = store
3882            .conn
3883            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3884            .unwrap();
3885        assert_eq!(n, 0);
3886    }
3887}