Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8    FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
16use crate::sync::context::SyncIngestContext;
17use crate::sync::outbound::outbound_event_from_row;
18use crate::sync::redact::redact_payload;
19use crate::sync::smart::enqueue_tool_spans_for_session;
20use anyhow::{Context, Result};
21use rusqlite::types::Value;
22use rusqlite::{
23    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
24};
25use std::cell::RefCell;
26use std::collections::{HashMap, HashSet};
27use std::path::{Path, PathBuf};
28
29/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
30/// Rows below this use `sessions.started_at_ms` for time-window matching.
31const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
32const DEFAULT_MMAP_MB: u64 = 256;
33const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
34    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
35    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
36    repo_file_count, repo_total_loc FROM sessions";
37const PAIN_HOTSPOTS_SQL: &str = "
38    SELECT f.path,
39           COUNT(s.id) * f.complexity_total AS value,
40           f.complexity_total,
41           f.churn_30d
42    FROM file_facts f
43    LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
44    LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
45       AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
46         OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
47    LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
48    WHERE f.snapshot_id = ?1
49    GROUP BY f.path, f.complexity_total, f.churn_30d
50    ORDER BY value DESC, f.path ASC
51    LIMIT 10";
52const TOOL_RANK_ROWS_SQL: &str = "
53    WITH scoped AS (
54      SELECT COALESCE(ts.tool, 'unknown') AS tool,
55             ts.lead_time_ms,
56             COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
57                 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
58             COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
59      FROM tool_spans ts
60      JOIN sessions s ON s.id = ts.session_id
61      WHERE s.workspace = ?1
62        AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
63          OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
64    ),
65    agg AS (
66      SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
67             SUM(reasoning_tokens) AS total_reasoning_tokens
68      FROM scoped GROUP BY tool
69    ),
70    lat AS (
71      SELECT tool, lead_time_ms,
72             ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
73             COUNT(*) OVER (PARTITION BY tool) AS n
74      FROM scoped WHERE lead_time_ms IS NOT NULL
75    ),
76    pct AS (
77      SELECT tool,
78             MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
79             MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
80      FROM lat GROUP BY tool
81    )
82    SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
83           agg.total_tokens, agg.total_reasoning_tokens
84    FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
85
86const MIGRATIONS: &[&str] = &[
87    "CREATE TABLE IF NOT EXISTS sessions (
88        id TEXT PRIMARY KEY,
89        agent TEXT NOT NULL,
90        model TEXT,
91        workspace TEXT NOT NULL,
92        started_at_ms INTEGER NOT NULL,
93        ended_at_ms INTEGER,
94        status TEXT NOT NULL,
95        trace_path TEXT NOT NULL
96    )",
97    "CREATE TABLE IF NOT EXISTS events (
98        id INTEGER PRIMARY KEY AUTOINCREMENT,
99        session_id TEXT NOT NULL,
100        seq INTEGER NOT NULL,
101        ts_ms INTEGER NOT NULL,
102        kind TEXT NOT NULL,
103        source TEXT NOT NULL,
104        tool TEXT,
105        tokens_in INTEGER,
106        tokens_out INTEGER,
107        cost_usd_e6 INTEGER,
108        payload TEXT NOT NULL
109    )",
110    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
111    "CREATE TABLE IF NOT EXISTS files_touched (
112        id INTEGER PRIMARY KEY AUTOINCREMENT,
113        session_id TEXT NOT NULL,
114        path TEXT NOT NULL
115    )",
116    "CREATE TABLE IF NOT EXISTS skills_used (
117        id INTEGER PRIMARY KEY AUTOINCREMENT,
118        session_id TEXT NOT NULL,
119        skill TEXT NOT NULL
120    )",
121    "CREATE TABLE IF NOT EXISTS sync_outbox (
122        id INTEGER PRIMARY KEY AUTOINCREMENT,
123        session_id TEXT NOT NULL,
124        payload TEXT NOT NULL,
125        sent INTEGER NOT NULL DEFAULT 0
126    )",
127    "CREATE TABLE IF NOT EXISTS experiments (
128        id TEXT PRIMARY KEY,
129        name TEXT NOT NULL,
130        created_at_ms INTEGER NOT NULL,
131        metadata TEXT NOT NULL DEFAULT '{}'
132    )",
133    "CREATE TABLE IF NOT EXISTS experiment_tags (
134        experiment_id TEXT NOT NULL,
135        session_id TEXT NOT NULL,
136        variant TEXT NOT NULL,
137        PRIMARY KEY (experiment_id, session_id)
138    )",
139    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
140    "CREATE TABLE IF NOT EXISTS sync_state (
141        k TEXT PRIMARY KEY,
142        v TEXT NOT NULL
143    )",
144    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
145    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
146    "CREATE TABLE IF NOT EXISTS tool_spans (
147        span_id TEXT PRIMARY KEY,
148        session_id TEXT NOT NULL,
149        tool TEXT,
150        tool_call_id TEXT,
151        status TEXT NOT NULL,
152        started_at_ms INTEGER,
153        ended_at_ms INTEGER,
154        lead_time_ms INTEGER,
155        tokens_in INTEGER,
156        tokens_out INTEGER,
157        reasoning_tokens INTEGER,
158        cost_usd_e6 INTEGER,
159        paths_json TEXT NOT NULL DEFAULT '[]'
160    )",
161    "CREATE TABLE IF NOT EXISTS tool_span_paths (
162        span_id TEXT NOT NULL,
163        path TEXT NOT NULL,
164        PRIMARY KEY (span_id, path)
165    )",
166    "CREATE TABLE IF NOT EXISTS trace_spans (
167        span_id TEXT PRIMARY KEY,
168        trace_id TEXT NOT NULL,
169        parent_span_id TEXT,
170        session_id TEXT NOT NULL,
171        kind TEXT NOT NULL,
172        name TEXT NOT NULL,
173        status TEXT NOT NULL,
174        started_at_ms INTEGER,
175        ended_at_ms INTEGER,
176        duration_ms INTEGER,
177        model TEXT,
178        tool TEXT,
179        tokens_in INTEGER,
180        tokens_out INTEGER,
181        reasoning_tokens INTEGER,
182        cost_usd_e6 INTEGER,
183        context_used_tokens INTEGER,
184        context_max_tokens INTEGER,
185        payload TEXT NOT NULL DEFAULT '{}'
186    )",
187    "CREATE INDEX IF NOT EXISTS trace_spans_session_idx ON trace_spans(session_id)",
188    "CREATE INDEX IF NOT EXISTS trace_spans_trace_idx ON trace_spans(trace_id)",
189    "CREATE INDEX IF NOT EXISTS trace_spans_started_idx ON trace_spans(started_at_ms)",
190    "CREATE TABLE IF NOT EXISTS session_repo_binding (
191        session_id TEXT PRIMARY KEY,
192        start_commit TEXT,
193        end_commit TEXT,
194        branch TEXT,
195        dirty_start INTEGER,
196        dirty_end INTEGER,
197        repo_binding_source TEXT NOT NULL DEFAULT ''
198    )",
199    "CREATE TABLE IF NOT EXISTS repo_snapshots (
200        id TEXT PRIMARY KEY,
201        workspace TEXT NOT NULL,
202        head_commit TEXT,
203        dirty_fingerprint TEXT NOT NULL,
204        analyzer_version TEXT NOT NULL,
205        indexed_at_ms INTEGER NOT NULL,
206        dirty INTEGER NOT NULL DEFAULT 0,
207        graph_path TEXT NOT NULL
208    )",
209    "CREATE TABLE IF NOT EXISTS file_facts (
210        snapshot_id TEXT NOT NULL,
211        path TEXT NOT NULL,
212        language TEXT NOT NULL,
213        bytes INTEGER NOT NULL,
214        loc INTEGER NOT NULL,
215        sloc INTEGER NOT NULL,
216        complexity_total INTEGER NOT NULL,
217        max_fn_complexity INTEGER NOT NULL,
218        symbol_count INTEGER NOT NULL,
219        import_count INTEGER NOT NULL,
220        fan_in INTEGER NOT NULL,
221        fan_out INTEGER NOT NULL,
222        churn_30d INTEGER NOT NULL,
223        churn_90d INTEGER NOT NULL,
224        authors_90d INTEGER NOT NULL,
225        last_changed_ms INTEGER,
226        PRIMARY KEY (snapshot_id, path)
227    )",
228    "CREATE TABLE IF NOT EXISTS repo_edges (
229        snapshot_id TEXT NOT NULL,
230        from_id TEXT NOT NULL,
231        to_id TEXT NOT NULL,
232        kind TEXT NOT NULL,
233        weight INTEGER NOT NULL,
234        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
235    )",
236    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
237    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
238    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
239    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
240    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
241        ON sessions(workspace, started_at_ms DESC, id ASC)",
242    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
243        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
244    "CREATE TABLE IF NOT EXISTS rules_used (
245        id INTEGER PRIMARY KEY AUTOINCREMENT,
246        session_id TEXT NOT NULL,
247        rule TEXT NOT NULL
248    )",
249    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
250    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
251    "CREATE TABLE IF NOT EXISTS remote_pull_state (
252        id INTEGER PRIMARY KEY CHECK (id = 1),
253        query_provider TEXT NOT NULL DEFAULT 'none',
254        cursor_json TEXT NOT NULL DEFAULT '',
255        last_success_ms INTEGER
256    )",
257    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
258    "CREATE TABLE IF NOT EXISTS remote_sessions (
259        team_id TEXT NOT NULL,
260        workspace_hash TEXT NOT NULL,
261        session_id_hash TEXT NOT NULL,
262        json TEXT NOT NULL,
263        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
264    )",
265    "CREATE TABLE IF NOT EXISTS remote_events (
266        team_id TEXT NOT NULL,
267        workspace_hash TEXT NOT NULL,
268        session_id_hash TEXT NOT NULL,
269        event_seq INTEGER NOT NULL,
270        json TEXT NOT NULL,
271        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
272    )",
273    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
274        team_id TEXT NOT NULL,
275        workspace_hash TEXT NOT NULL,
276        span_id_hash TEXT NOT NULL,
277        json TEXT NOT NULL,
278        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
279    )",
280    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
281        team_id TEXT NOT NULL,
282        workspace_hash TEXT NOT NULL,
283        snapshot_id_hash TEXT NOT NULL,
284        chunk_index INTEGER NOT NULL,
285        json TEXT NOT NULL,
286        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
287    )",
288    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
289        team_id TEXT NOT NULL,
290        workspace_hash TEXT NOT NULL,
291        fact_key TEXT NOT NULL,
292        json TEXT NOT NULL,
293        PRIMARY KEY (team_id, workspace_hash, fact_key)
294    )",
295    "CREATE TABLE IF NOT EXISTS session_evals (
296        id            TEXT    PRIMARY KEY,
297        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
298        judge_model   TEXT    NOT NULL,
299        rubric_id     TEXT    NOT NULL,
300        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
301        rationale     TEXT    NOT NULL,
302        flagged       INTEGER NOT NULL DEFAULT 0,
303        created_at_ms INTEGER NOT NULL
304    );
305    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
306    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
307    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
308        fingerprint   TEXT    PRIMARY KEY,
309        captured_at_ms INTEGER NOT NULL,
310        files_json    TEXT    NOT NULL,
311        total_bytes   INTEGER NOT NULL
312    )",
313    "CREATE TABLE IF NOT EXISTS session_feedback (
314        id TEXT PRIMARY KEY,
315        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
316        score INTEGER CHECK(score BETWEEN 1 AND 5),
317        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
318        note TEXT,
319        created_at_ms INTEGER NOT NULL
320    );
321    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
322    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
323    "CREATE TABLE IF NOT EXISTS session_outcomes (
324        session_id TEXT PRIMARY KEY NOT NULL,
325        test_passed INTEGER,
326        test_failed INTEGER,
327        test_skipped INTEGER,
328        build_ok INTEGER,
329        lint_errors INTEGER,
330        revert_lines_14d INTEGER,
331        pr_open INTEGER,
332        ci_ok INTEGER,
333        measured_at_ms INTEGER NOT NULL,
334        measure_error TEXT
335    )",
336    "CREATE TABLE IF NOT EXISTS session_samples (
337        session_id TEXT NOT NULL,
338        ts_ms INTEGER NOT NULL,
339        pid INTEGER NOT NULL,
340        cpu_percent REAL,
341        rss_bytes INTEGER,
342        PRIMARY KEY (session_id, ts_ms, pid)
343    )",
344    "CREATE TABLE IF NOT EXISTS cases (
345        id TEXT PRIMARY KEY,
346        source_key TEXT NOT NULL UNIQUE,
347        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
348        reason TEXT NOT NULL,
349        label TEXT,
350        status TEXT NOT NULL CHECK(status IN ('open','archived')),
351        prompt_fingerprint TEXT,
352        metadata_json TEXT NOT NULL,
353        created_at_ms INTEGER NOT NULL
354    )",
355    "CREATE TABLE IF NOT EXISTS case_refs (
356        case_id TEXT NOT NULL REFERENCES cases(id) ON DELETE CASCADE,
357        ref_kind TEXT NOT NULL,
358        ref_key TEXT NOT NULL,
359        PRIMARY KEY (case_id, ref_kind, ref_key)
360    )",
361    "CREATE TABLE IF NOT EXISTS rules (
362        id TEXT PRIMARY KEY,
363        name TEXT NOT NULL,
364        filter TEXT NOT NULL,
365        action_json TEXT NOT NULL,
366        enabled INTEGER NOT NULL DEFAULT 1,
367        created_at_ms INTEGER NOT NULL
368    )",
369    "CREATE TABLE IF NOT EXISTS review_items (
370        id TEXT PRIMARY KEY,
371        source_key TEXT NOT NULL UNIQUE,
372        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
373        title TEXT NOT NULL,
374        status TEXT NOT NULL CHECK(status IN ('open','resolved','dismissed')),
375        created_at_ms INTEGER NOT NULL,
376        resolved_at_ms INTEGER
377    )",
378    "CREATE TABLE IF NOT EXISTS alert_events (
379        id TEXT PRIMARY KEY,
380        source_key TEXT NOT NULL UNIQUE,
381        name TEXT NOT NULL,
382        severity TEXT NOT NULL CHECK(severity IN ('info','warning','critical')),
383        message TEXT NOT NULL,
384        session_id TEXT REFERENCES sessions(id) ON DELETE SET NULL,
385        created_at_ms INTEGER NOT NULL
386    )",
387    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
388    "CREATE INDEX IF NOT EXISTS cases_status_idx ON cases(status, created_at_ms)",
389    "CREATE INDEX IF NOT EXISTS review_items_status_idx ON review_items(status, created_at_ms)",
390    "CREATE INDEX IF NOT EXISTS alert_events_name_idx ON alert_events(name, created_at_ms)",
391    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
392    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
393    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
394    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
395    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
396    "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
397    "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
398    "CREATE INDEX IF NOT EXISTS events_tool_ts_session_seq_idx ON events(tool, ts_ms DESC, session_id, seq)",
399    "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
400    "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
401    "CREATE INDEX IF NOT EXISTS tool_span_paths_path_idx ON tool_span_paths(path, span_id)",
402    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
403];
404
405/// Per-workspace activity dashboard stats.
406#[derive(Clone)]
407pub struct InsightsStats {
408    pub total_sessions: u64,
409    pub running_sessions: u64,
410    pub total_events: u64,
411    /// (day label e.g. "Mon", count) last 7 days oldest first
412    pub sessions_by_day: Vec<(String, u64)>,
413    /// Recent sessions DESC by started_at, max 3; paired with event count
414    pub recent: Vec<(SessionRecord, u64)>,
415    /// Top tools by event count, max 5
416    pub top_tools: Vec<(String, u64)>,
417    pub total_cost_usd_e6: i64,
418    pub sessions_with_cost: u64,
419}
420
421/// Sync daemon / outbox status for `kaizen sync status`.
422pub struct SyncStatusSnapshot {
423    pub pending_outbox: u64,
424    pub last_success_ms: Option<u64>,
425    pub last_error: Option<String>,
426    pub consecutive_failures: u32,
427}
428
429/// Aggregate stats across sessions + events for a workspace.
430#[derive(serde::Serialize)]
431pub struct SummaryStats {
432    pub session_count: u64,
433    pub total_cost_usd_e6: i64,
434    pub by_agent: Vec<(String, u64)>,
435    pub by_model: Vec<(String, u64)>,
436    pub top_tools: Vec<(String, u64)>,
437}
438
439/// Skill vs Cursor rule for [`GuidancePerfRow`].
440#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
441#[serde(rename_all = "lowercase")]
442pub enum GuidanceKind {
443    Skill,
444    Rule,
445}
446
447/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
448#[derive(Clone, Debug, serde::Serialize)]
449pub struct GuidancePerfRow {
450    pub kind: GuidanceKind,
451    pub id: String,
452    pub sessions: u64,
453    pub sessions_pct: f64,
454    pub total_cost_usd_e6: i64,
455    pub avg_cost_per_session_usd: Option<f64>,
456    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
457    pub on_disk: bool,
458}
459
460/// Aggregated skill/rule adoption and cost proxy for a time window.
461#[derive(Clone, Debug, serde::Serialize)]
462pub struct GuidanceReport {
463    pub workspace: String,
464    pub window_start_ms: u64,
465    pub window_end_ms: u64,
466    pub sessions_in_window: u64,
467    pub workspace_avg_cost_per_session_usd: Option<f64>,
468    pub rows: Vec<GuidancePerfRow>,
469}
470
471/// Result of [`Store::prune_sessions_started_before`].
472#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
473pub struct PruneStats {
474    pub sessions_removed: u64,
475    pub events_removed: u64,
476}
477
478/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
479#[derive(Debug, Clone, Eq, PartialEq)]
480pub struct SessionOutcomeRow {
481    pub session_id: String,
482    pub test_passed: Option<i64>,
483    pub test_failed: Option<i64>,
484    pub test_skipped: Option<i64>,
485    pub build_ok: Option<bool>,
486    pub lint_errors: Option<i64>,
487    pub revert_lines_14d: Option<i64>,
488    pub pr_open: Option<i64>,
489    pub ci_ok: Option<bool>,
490    pub measured_at_ms: u64,
491    pub measure_error: Option<String>,
492}
493
494/// Aggregated process samples for retro (Tier D).
495#[derive(Debug, Clone)]
496pub struct SessionSampleAgg {
497    pub session_id: String,
498    pub sample_count: u64,
499    pub max_cpu_percent: f64,
500    pub max_rss_bytes: u64,
501}
502
503/// `sync_state` keys for agent rescan throttling and auto-prune.
504pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
505pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
506pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
507
508pub struct ToolSpanSyncRow {
509    pub span_id: String,
510    pub session_id: String,
511    pub tool: Option<String>,
512    pub tool_call_id: Option<String>,
513    pub status: String,
514    pub started_at_ms: Option<u64>,
515    pub ended_at_ms: Option<u64>,
516    pub lead_time_ms: Option<u64>,
517    pub tokens_in: Option<u32>,
518    pub tokens_out: Option<u32>,
519    pub reasoning_tokens: Option<u32>,
520    pub cost_usd_e6: Option<i64>,
521    pub paths: Vec<String>,
522}
523
524pub(crate) struct CaptureQualityRow {
525    pub source: String,
526    pub has_tokens: bool,
527    pub has_latency: bool,
528    pub has_context: bool,
529}
530
531pub(crate) struct TraceSpanQualityRow {
532    pub kind: String,
533    pub is_orphan: bool,
534}
535
536#[derive(Debug, Clone, Copy, Eq, PartialEq)]
537pub enum StoreOpenMode {
538    ReadWrite,
539    ReadOnlyQuery,
540}
541
542#[derive(Debug, Clone)]
543pub struct SessionStatusRow {
544    pub id: String,
545    pub status: SessionStatus,
546    pub ended_at_ms: Option<u64>,
547}
548
549#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
550pub struct SessionFilter {
551    pub agent_prefix: Option<String>,
552    pub status: Option<SessionStatus>,
553    pub since_ms: Option<u64>,
554}
555
556#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
557pub struct SessionPage {
558    pub rows: Vec<SessionRecord>,
559    pub total: usize,
560    pub next_offset: Option<usize>,
561}
562
563#[derive(Clone)]
564struct SpanTreeCacheEntry {
565    session_id: String,
566    last_event_seq: Option<u64>,
567    nodes: Vec<crate::store::span_tree::SpanNode>,
568}
569
570pub struct Store {
571    conn: Connection,
572    root: PathBuf,
573    hot_log: RefCell<Option<HotLog>>,
574    search_writer: RefCell<Option<crate::search::PendingWriter>>,
575    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
576    projector: RefCell<Projector>,
577}
578
579impl Store {
580    pub(crate) fn conn(&self) -> &Connection {
581        &self.conn
582    }
583
584    pub fn open(path: &Path) -> Result<Self> {
585        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
586    }
587
588    pub fn open_read_only(path: &Path) -> Result<Self> {
589        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
590    }
591
592    pub fn open_query(path: &Path) -> Result<Self> {
593        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
594    }
595
596    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
597        if let Some(parent) = path.parent() {
598            std::fs::create_dir_all(parent)?;
599        }
600        let conn = match mode {
601            StoreOpenMode::ReadWrite => Connection::open(path),
602            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
603                path,
604                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
605            ),
606        }
607        .with_context(|| format!("open db: {}", path.display()))?;
608        apply_pragmas(&conn, mode)?;
609        if mode == StoreOpenMode::ReadWrite {
610            for sql in MIGRATIONS {
611                conn.execute_batch(sql)?;
612            }
613            ensure_schema_columns(&conn)?;
614        }
615        let store = Self {
616            conn,
617            root: path
618                .parent()
619                .unwrap_or_else(|| Path::new("."))
620                .to_path_buf(),
621            hot_log: RefCell::new(None),
622            search_writer: RefCell::new(None),
623            span_tree_cache: RefCell::new(None),
624            projector: RefCell::new(Projector::default()),
625        };
626        if mode == StoreOpenMode::ReadWrite {
627            store.warm_projector()?;
628        }
629        Ok(store)
630    }
631
632    fn invalidate_span_tree_cache(&self) {
633        *self.span_tree_cache.borrow_mut() = None;
634    }
635
636    fn warm_projector(&self) -> Result<()> {
637        let ids = self.running_session_ids()?;
638        let mut projector = self.projector.borrow_mut();
639        for id in ids {
640            for event in self.list_events_for_session(&id)? {
641                let _ = projector.apply(&event);
642            }
643        }
644        Ok(())
645    }
646
647    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
648        self.conn.execute(
649            "INSERT INTO sessions (
650                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
651                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
652                prompt_fingerprint, parent_session_id, agent_version, os, arch,
653                repo_file_count, repo_total_loc
654             )
655             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
656                ?16, ?17, ?18, ?19, ?20, ?21)
657             ON CONFLICT(id) DO UPDATE SET
658               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
659               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
660               status=excluded.status, trace_path=excluded.trace_path,
661               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
662               branch=excluded.branch, dirty_start=excluded.dirty_start,
663               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
664               prompt_fingerprint=excluded.prompt_fingerprint,
665               parent_session_id=excluded.parent_session_id,
666               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
667               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
668            params![
669                s.id,
670                s.agent,
671                s.model,
672                s.workspace,
673                s.started_at_ms as i64,
674                s.ended_at_ms.map(|v| v as i64),
675                format!("{:?}", s.status),
676                s.trace_path,
677                s.start_commit,
678                s.end_commit,
679                s.branch,
680                s.dirty_start.map(bool_to_i64),
681                s.dirty_end.map(bool_to_i64),
682                s.repo_binding_source.clone().unwrap_or_default(),
683                s.prompt_fingerprint.as_deref(),
684                s.parent_session_id.as_deref(),
685                s.agent_version.as_deref(),
686                s.os.as_deref(),
687                s.arch.as_deref(),
688                s.repo_file_count.map(|v| v as i64),
689                s.repo_total_loc.map(|v| v as i64),
690            ],
691        )?;
692        self.conn.execute(
693            "INSERT INTO session_repo_binding (
694                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
695             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
696             ON CONFLICT(session_id) DO UPDATE SET
697                start_commit=excluded.start_commit,
698                end_commit=excluded.end_commit,
699                branch=excluded.branch,
700                dirty_start=excluded.dirty_start,
701                dirty_end=excluded.dirty_end,
702                repo_binding_source=excluded.repo_binding_source",
703            params![
704                s.id,
705                s.start_commit,
706                s.end_commit,
707                s.branch,
708                s.dirty_start.map(bool_to_i64),
709                s.dirty_end.map(bool_to_i64),
710                s.repo_binding_source.clone().unwrap_or_default(),
711            ],
712        )?;
713        Ok(())
714    }
715
716    /// Insert a minimal session row if none exists. Used by hook ingestion when
717    /// the first observed event is not `SessionStart` (hooks installed mid-session).
718    pub fn ensure_session_stub(
719        &self,
720        id: &str,
721        agent: &str,
722        workspace: &str,
723        started_at_ms: u64,
724    ) -> Result<()> {
725        self.conn.execute(
726            "INSERT OR IGNORE INTO sessions (
727                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
728                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
729                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
730             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
731                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
732            params![id, agent, workspace, started_at_ms as i64],
733        )?;
734        Ok(())
735    }
736
737    /// Next `seq` for a new event in this session (0 when there are no events yet).
738    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
739        let n: i64 = self.conn.query_row(
740            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
741            [session_id],
742            |r| r.get(0),
743        )?;
744        Ok(n as u64)
745    }
746
747    pub fn append_event(&self, e: &Event) -> Result<()> {
748        self.append_event_with_sync(e, None)
749    }
750
751    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
752    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
753        let last_before = if projector_legacy_mode() {
754            None
755        } else {
756            self.last_event_seq_for_session(&e.session_id)?
757        };
758        let payload = serde_json::to_string(&e.payload)?;
759        self.conn.execute(
760            "INSERT INTO events (
761                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
762                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
763                stop_reason, latency_ms, ttft_ms, retry_count,
764                context_used_tokens, context_max_tokens,
765                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
766             ) VALUES (
767                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
768                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
769             )
770             ON CONFLICT(session_id, seq) DO UPDATE SET
771                ts_ms = excluded.ts_ms,
772                ts_exact = excluded.ts_exact,
773                kind = excluded.kind,
774                source = excluded.source,
775                tool = excluded.tool,
776                tool_call_id = excluded.tool_call_id,
777                tokens_in = excluded.tokens_in,
778                tokens_out = excluded.tokens_out,
779                reasoning_tokens = excluded.reasoning_tokens,
780                cost_usd_e6 = excluded.cost_usd_e6,
781                payload = excluded.payload,
782                stop_reason = excluded.stop_reason,
783                latency_ms = excluded.latency_ms,
784                ttft_ms = excluded.ttft_ms,
785                retry_count = excluded.retry_count,
786                context_used_tokens = excluded.context_used_tokens,
787                context_max_tokens = excluded.context_max_tokens,
788                cache_creation_tokens = excluded.cache_creation_tokens,
789                cache_read_tokens = excluded.cache_read_tokens,
790                system_prompt_tokens = excluded.system_prompt_tokens",
791            params![
792                e.session_id,
793                e.seq as i64,
794                e.ts_ms as i64,
795                bool_to_i64(e.ts_exact),
796                format!("{:?}", e.kind),
797                format!("{:?}", e.source),
798                e.tool,
799                e.tool_call_id,
800                e.tokens_in.map(|v| v as i64),
801                e.tokens_out.map(|v| v as i64),
802                e.reasoning_tokens.map(|v| v as i64),
803                e.cost_usd_e6,
804                payload,
805                e.stop_reason,
806                e.latency_ms.map(|v| v as i64),
807                e.ttft_ms.map(|v| v as i64),
808                e.retry_count.map(|v| v as i64),
809                e.context_used_tokens.map(|v| v as i64),
810                e.context_max_tokens.map(|v| v as i64),
811                e.cache_creation_tokens.map(|v| v as i64),
812                e.cache_read_tokens.map(|v| v as i64),
813                e.system_prompt_tokens.map(|v| v as i64),
814            ],
815        )?;
816        if self.conn.changes() == 0 {
817            return Ok(());
818        }
819        self.append_hot_event(e)?;
820        if projector_legacy_mode() {
821            index_event_derived(&self.conn, e)?;
822            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
823            self.invalidate_span_tree_cache();
824        } else if last_before.is_some_and(|last| e.seq <= last) {
825            self.replay_projector_session(&e.session_id)?;
826        } else {
827            let deltas = self.projector.borrow_mut().apply(e);
828            self.apply_projector_events(&deltas)?;
829            let expired = self
830                .projector
831                .borrow_mut()
832                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
833            self.apply_projector_events(&expired)?;
834            if is_stop_event(e) {
835                let flushed = self
836                    .projector
837                    .borrow_mut()
838                    .flush_session(&e.session_id, e.ts_ms);
839                self.apply_projector_events(&flushed)?;
840            }
841            self.invalidate_span_tree_cache();
842        }
843        self.append_search_event(e);
844        let Some(ctx) = ctx else {
845            return Ok(());
846        };
847        let sync = &ctx.sync;
848        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
849            return Ok(());
850        }
851        let Some(salt) = try_team_salt(sync) else {
852            tracing::warn!(
853                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
854            );
855            return Ok(());
856        };
857        if sync.sample_rate < 1.0 {
858            let u: f64 = rand::random();
859            if u > sync.sample_rate {
860                return Ok(());
861            }
862        }
863        let Some(session) = self.get_session(&e.session_id)? else {
864            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
865            return Ok(());
866        };
867        let mut outbound = outbound_event_from_row(e, &session, &salt);
868        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
869        let row = serde_json::to_string(&outbound)?;
870        self.outbox()?.append(&e.session_id, "events", &row)?;
871        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
872        Ok(())
873    }
874
875    fn append_hot_event(&self, e: &Event) -> Result<()> {
876        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
877            return Ok(());
878        }
879        let mut slot = self.hot_log.borrow_mut();
880        if slot.is_none() {
881            *slot = Some(HotLog::open(&self.root)?);
882        }
883        if let Some(log) = slot.as_mut() {
884            log.append(e)?;
885        }
886        Ok(())
887    }
888
889    fn append_search_event(&self, e: &Event) {
890        if let Err(err) = self.try_append_search_event(e) {
891            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
892            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
893        }
894    }
895
896    fn try_append_search_event(&self, e: &Event) -> Result<()> {
897        let Some(session) = self.get_session(&e.session_id)? else {
898            return Ok(());
899        };
900        let workspace = PathBuf::from(&session.workspace);
901        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
902        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
903        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
904            return Ok(());
905        };
906        let mut slot = self.search_writer.borrow_mut();
907        if slot.is_none() {
908            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
909        }
910        slot.as_mut().expect("writer").add(&doc)
911    }
912
913    pub fn flush_search(&self) -> Result<()> {
914        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
915            writer.commit()?;
916        }
917        Ok(())
918    }
919
920    fn outbox(&self) -> Result<Outbox> {
921        Outbox::open(&self.root)
922    }
923
924    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
925        if projector_legacy_mode() {
926            rebuild_tool_spans_for_session(&self.conn, session_id)?;
927            self.invalidate_span_tree_cache();
928            return Ok(());
929        }
930        let deltas = self
931            .projector
932            .borrow_mut()
933            .flush_session(session_id, now_ms);
934        if self.apply_projector_events(&deltas)? {
935            self.invalidate_span_tree_cache();
936        }
937        Ok(())
938    }
939
940    fn replay_projector_session(&self, session_id: &str) -> Result<()> {
941        clear_session_spans(&self.conn, session_id)?;
942        self.projector.borrow_mut().reset_session(session_id);
943        let events = self.list_events_for_session(session_id)?;
944        let mut changed = false;
945        for event in &events {
946            let deltas = self.projector.borrow_mut().apply(event);
947            changed |= self.apply_projector_events(&deltas)?;
948        }
949        if self
950            .get_session(session_id)?
951            .is_some_and(|session| session.status == SessionStatus::Done)
952        {
953            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
954            let deltas = self
955                .projector
956                .borrow_mut()
957                .flush_session(session_id, now_ms);
958            changed |= self.apply_projector_events(&deltas)?;
959        }
960        if changed {
961            self.invalidate_span_tree_cache();
962        }
963        Ok(())
964    }
965
966    fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
967        let mut changed = false;
968        for delta in deltas {
969            match delta {
970                ProjectorEvent::SpanClosed(span, sample) => {
971                    upsert_tool_span_record(&self.conn, span)?;
972                    tracing::debug!(
973                        session_id = %sample.session_id,
974                        span_id = %sample.span_id,
975                        tool = ?sample.tool,
976                        lead_time_ms = ?sample.lead_time_ms,
977                        tokens_in = ?sample.tokens_in,
978                        tokens_out = ?sample.tokens_out,
979                        reasoning_tokens = ?sample.reasoning_tokens,
980                        cost_usd_e6 = ?sample.cost_usd_e6,
981                        paths = ?sample.paths,
982                        "tool span closed"
983                    );
984                    changed = true;
985                }
986                ProjectorEvent::SpanPatched(span) => {
987                    upsert_tool_span_record(&self.conn, span)?;
988                    changed = true;
989                }
990                ProjectorEvent::FileTouched { session, path } => {
991                    self.conn.execute(
992                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
993                        params![session, path],
994                    )?;
995                    changed = true;
996                }
997                ProjectorEvent::SkillUsed { session, skill } => {
998                    self.conn.execute(
999                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
1000                        params![session, skill],
1001                    )?;
1002                    changed = true;
1003                }
1004                ProjectorEvent::RuleUsed { session, rule } => {
1005                    self.conn.execute(
1006                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
1007                        params![session, rule],
1008                    )?;
1009                    changed = true;
1010                }
1011            }
1012        }
1013        Ok(changed)
1014    }
1015
1016    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
1017        let rows = self.outbox()?.list_pending(limit)?;
1018        if !rows.is_empty() {
1019            return Ok(rows);
1020        }
1021        let mut stmt = self.conn.prepare(
1022            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
1023        )?;
1024        let rows = stmt.query_map(params![limit as i64], |row| {
1025            Ok((
1026                row.get::<_, i64>(0)?,
1027                row.get::<_, String>(1)?,
1028                row.get::<_, String>(2)?,
1029            ))
1030        })?;
1031        let mut out = Vec::new();
1032        for r in rows {
1033            out.push(r?);
1034        }
1035        Ok(out)
1036    }
1037
1038    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
1039        self.outbox()?.delete_ids(ids)?;
1040        for id in ids {
1041            self.conn
1042                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
1043        }
1044        Ok(())
1045    }
1046
1047    pub fn replace_outbox_rows(
1048        &self,
1049        owner_id: &str,
1050        kind: &str,
1051        payloads: &[String],
1052    ) -> Result<()> {
1053        self.outbox()?.replace(owner_id, kind, payloads)?;
1054        self.conn.execute(
1055            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
1056            params![owner_id, kind],
1057        )?;
1058        for payload in payloads {
1059            self.conn.execute(
1060                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
1061                params![owner_id, kind, payload],
1062            )?;
1063        }
1064        Ok(())
1065    }
1066
1067    pub fn outbox_pending_count(&self) -> Result<u64> {
1068        let redb = self.outbox()?.pending_count()?;
1069        if redb > 0 {
1070            return Ok(redb);
1071        }
1072        let c: i64 =
1073            self.conn
1074                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
1075                    r.get(0)
1076                })?;
1077        Ok(c as u64)
1078    }
1079
1080    pub fn set_sync_state_ok(&self) -> Result<()> {
1081        let now = now_ms().to_string();
1082        self.conn.execute(
1083            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
1084             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1085            params![now],
1086        )?;
1087        self.conn.execute(
1088            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
1089             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1090            [],
1091        )?;
1092        self.conn
1093            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
1094        Ok(())
1095    }
1096
1097    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
1098        let prev: i64 = self
1099            .conn
1100            .query_row(
1101                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1102                [],
1103                |r| {
1104                    let s: String = r.get(0)?;
1105                    Ok(s.parse::<i64>().unwrap_or(0))
1106                },
1107            )
1108            .optional()?
1109            .unwrap_or(0);
1110        let next = prev.saturating_add(1);
1111        self.conn.execute(
1112            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
1113             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1114            params![msg],
1115        )?;
1116        self.conn.execute(
1117            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
1118             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1119            params![next.to_string()],
1120        )?;
1121        Ok(())
1122    }
1123
1124    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
1125        let pending_outbox = self.outbox_pending_count()?;
1126        let last_success_ms = self
1127            .conn
1128            .query_row(
1129                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
1130                [],
1131                |r| r.get::<_, String>(0),
1132            )
1133            .optional()?
1134            .and_then(|s| s.parse().ok());
1135        let last_error = self
1136            .conn
1137            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1138                r.get::<_, String>(0)
1139            })
1140            .optional()?;
1141        let consecutive_failures = self
1142            .conn
1143            .query_row(
1144                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1145                [],
1146                |r| r.get::<_, String>(0),
1147            )
1148            .optional()?
1149            .and_then(|s| s.parse().ok())
1150            .unwrap_or(0);
1151        Ok(SyncStatusSnapshot {
1152            pending_outbox,
1153            last_success_ms,
1154            last_error,
1155            consecutive_failures,
1156        })
1157    }
1158
1159    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1160        let row: Option<String> = self
1161            .conn
1162            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1163                r.get::<_, String>(0)
1164            })
1165            .optional()?;
1166        Ok(row.and_then(|s| s.parse().ok()))
1167    }
1168
1169    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1170        self.conn.execute(
1171            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1172             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1173            params![key, v.to_string()],
1174        )?;
1175        Ok(())
1176    }
1177
1178    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
1179    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1180        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1181        let old_ids = old_session_ids(&tx, cutoff_ms)?;
1182        let sessions_to_remove: i64 = tx.query_row(
1183            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1184            params![cutoff_ms],
1185            |r| r.get(0),
1186        )?;
1187        let events_to_remove: i64 = tx.query_row(
1188            "SELECT COUNT(*) FROM events WHERE session_id IN \
1189             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1190            params![cutoff_ms],
1191            |r| r.get(0),
1192        )?;
1193
1194        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1195        tx.execute(
1196            &format!(
1197                "DELETE FROM tool_span_paths WHERE span_id IN \
1198                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1199            ),
1200            params![cutoff_ms],
1201        )?;
1202        tx.execute(
1203            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1204            params![cutoff_ms],
1205        )?;
1206        tx.execute(
1207            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1208            params![cutoff_ms],
1209        )?;
1210        tx.execute(
1211            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1212            params![cutoff_ms],
1213        )?;
1214        tx.execute(
1215            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1216            params![cutoff_ms],
1217        )?;
1218        tx.execute(
1219            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1220            params![cutoff_ms],
1221        )?;
1222        tx.execute(
1223            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1224            params![cutoff_ms],
1225        )?;
1226        tx.execute(
1227            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1228            params![cutoff_ms],
1229        )?;
1230        tx.execute(
1231            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1232            params![cutoff_ms],
1233        )?;
1234        tx.execute(
1235            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1236            params![cutoff_ms],
1237        )?;
1238        tx.execute(
1239            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1240            params![cutoff_ms],
1241        )?;
1242        tx.execute(
1243            "DELETE FROM sessions WHERE started_at_ms < ?1",
1244            params![cutoff_ms],
1245        )?;
1246        tx.commit()?;
1247        if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1248            let _ = writer.commit();
1249        }
1250        if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1251            tracing::warn!("search prune skipped: {err:#}");
1252            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1253        }
1254        self.invalidate_span_tree_cache();
1255        Ok(PruneStats {
1256            sessions_removed: sessions_to_remove as u64,
1257            events_removed: events_to_remove as u64,
1258        })
1259    }
1260
1261    /// Reclaim file space after large deletes (exclusive lock; can be slow).
1262    pub fn vacuum(&self) -> Result<()> {
1263        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1264        Ok(())
1265    }
1266
1267    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1268        Ok(self
1269            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1270            .rows)
1271    }
1272
1273    pub fn list_sessions_page(
1274        &self,
1275        workspace: &str,
1276        offset: usize,
1277        limit: usize,
1278        filter: SessionFilter,
1279    ) -> Result<SessionPage> {
1280        let (where_sql, args) = session_filter_sql(workspace, &filter);
1281        let total = self.query_session_page_count(&where_sql, &args)?;
1282        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1283        let next = offset.saturating_add(rows.len());
1284        Ok(SessionPage {
1285            rows,
1286            total,
1287            next_offset: (next < total).then_some(next),
1288        })
1289    }
1290
1291    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1292        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1293        let total: i64 = self
1294            .conn
1295            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1296        Ok(total as usize)
1297    }
1298
1299    fn query_session_page_rows(
1300        &self,
1301        where_sql: &str,
1302        args: &[Value],
1303        offset: usize,
1304        limit: usize,
1305    ) -> Result<Vec<SessionRecord>> {
1306        let sql = format!(
1307            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1308        );
1309        let mut values = args.to_vec();
1310        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1311        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1312        let mut stmt = self.conn.prepare(&sql)?;
1313        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1314        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1315    }
1316
1317    pub fn list_sessions_started_after(
1318        &self,
1319        workspace: &str,
1320        after_started_at_ms: u64,
1321    ) -> Result<Vec<SessionRecord>> {
1322        let mut stmt = self.conn.prepare(
1323            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1324                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1325                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1326                    repo_file_count, repo_total_loc
1327             FROM sessions
1328             WHERE workspace = ?1 AND started_at_ms > ?2
1329             ORDER BY started_at_ms DESC, id ASC",
1330        )?;
1331        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1332        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1333    }
1334
1335    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1336        if ids.is_empty() {
1337            return Ok(Vec::new());
1338        }
1339        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1340        let sql =
1341            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1342        let mut stmt = self.conn.prepare(&sql)?;
1343        let params: Vec<&dyn rusqlite::ToSql> =
1344            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1345        let rows = stmt.query_map(params.as_slice(), |r| {
1346            let status: String = r.get(1)?;
1347            Ok(SessionStatusRow {
1348                id: r.get(0)?,
1349                status: status_from_str(&status),
1350                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1351            })
1352        })?;
1353        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1354    }
1355
1356    fn running_session_ids(&self) -> Result<Vec<String>> {
1357        let mut stmt = self
1358            .conn
1359            .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1360        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1361        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1362    }
1363
1364    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1365        let session_count: i64 = self.conn.query_row(
1366            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1367            params![workspace],
1368            |r| r.get(0),
1369        )?;
1370
1371        let total_cost: i64 = self.conn.query_row(
1372            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1373             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1374            params![workspace],
1375            |r| r.get(0),
1376        )?;
1377
1378        let mut stmt = self.conn.prepare(
1379            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1380        )?;
1381        let by_agent: Vec<(String, u64)> = stmt
1382            .query_map(params![workspace], |r| {
1383                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1384            })?
1385            .filter_map(|r| r.ok())
1386            .collect();
1387
1388        let mut stmt = self.conn.prepare(
1389            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1390        )?;
1391        let by_model: Vec<(String, u64)> = stmt
1392            .query_map(params![workspace], |r| {
1393                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1394            })?
1395            .filter_map(|r| r.ok())
1396            .collect();
1397
1398        let mut stmt = self.conn.prepare(
1399            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1400             WHERE s.workspace = ?1 AND tool IS NOT NULL
1401             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1402        )?;
1403        let top_tools: Vec<(String, u64)> = stmt
1404            .query_map(params![workspace], |r| {
1405                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1406            })?
1407            .filter_map(|r| r.ok())
1408            .collect();
1409
1410        Ok(SummaryStats {
1411            session_count: session_count as u64,
1412            total_cost_usd_e6: total_cost,
1413            by_agent,
1414            by_model,
1415            top_tools,
1416        })
1417    }
1418
1419    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1420        self.list_events_page(session_id, 0, i64::MAX as usize)
1421    }
1422
1423    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1424        let mut stmt = self.conn.prepare(
1425            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1426                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1427                    stop_reason, latency_ms, ttft_ms, retry_count,
1428                    context_used_tokens, context_max_tokens,
1429                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1430             FROM events WHERE session_id = ?1 AND seq = ?2",
1431        )?;
1432        stmt.query_row(params![session_id, seq as i64], event_row)
1433            .optional()
1434            .map_err(Into::into)
1435    }
1436
1437    pub fn search_tool_events(
1438        &self,
1439        workspace: &str,
1440        tool: &str,
1441        since_ms: Option<u64>,
1442        agent: Option<&str>,
1443        limit: usize,
1444    ) -> Result<Vec<(String, Event)>> {
1445        let mut stmt = self.conn.prepare(
1446            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1447                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1448                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1449                    e.context_used_tokens, e.context_max_tokens,
1450                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
1451                    s.agent
1452             FROM events e JOIN sessions s ON s.id = e.session_id
1453             WHERE e.tool = ?2
1454               AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
1455               AND (?3 IS NULL OR e.ts_ms >= ?3)
1456               AND (?4 IS NULL OR s.agent = ?4)
1457             ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
1458             LIMIT ?5",
1459        )?;
1460        let since = since_ms.map(|v| v as i64);
1461        let rows = stmt.query_map(
1462            params![workspace, tool, since, agent, limit as i64],
1463            search_tool_event_row,
1464        )?;
1465        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1466    }
1467
1468    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1469        let mut out = Vec::new();
1470        for session in self.list_sessions(workspace)? {
1471            for event in self.list_events_for_session(&session.id)? {
1472                out.push((session.clone(), event));
1473            }
1474        }
1475        out.sort_by(|a, b| {
1476            (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1477        });
1478        Ok(out)
1479    }
1480
1481    pub fn list_events_page(
1482        &self,
1483        session_id: &str,
1484        after_seq: u64,
1485        limit: usize,
1486    ) -> Result<Vec<Event>> {
1487        let mut stmt = self.conn.prepare(
1488            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1489                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1490                    stop_reason, latency_ms, ttft_ms, retry_count,
1491                    context_used_tokens, context_max_tokens,
1492                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1493             FROM events
1494             WHERE session_id = ?1 AND seq >= ?2
1495             ORDER BY seq ASC LIMIT ?3",
1496        )?;
1497        let rows = stmt.query_map(
1498            params![
1499                session_id,
1500                after_seq as i64,
1501                limit.min(i64::MAX as usize) as i64
1502            ],
1503            event_row,
1504        )?;
1505        let mut events = Vec::new();
1506        for row in rows {
1507            events.push(row?);
1508        }
1509        Ok(events)
1510    }
1511
1512    /// Update only status for existing session.
1513    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1514        self.conn.execute(
1515            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1516            params![format!("{:?}", status), id],
1517        )?;
1518        Ok(())
1519    }
1520
1521    /// Workspace activity dashboard — feeds `cmd_insights`.
1522    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1523        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1524        Ok(InsightsStats {
1525            total_sessions: count_q(
1526                &self.conn,
1527                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1528                workspace,
1529            )?,
1530            running_sessions: count_q(
1531                &self.conn,
1532                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1533                workspace,
1534            )?,
1535            total_events: count_q(
1536                &self.conn,
1537                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1538                workspace,
1539            )?,
1540            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1541            recent: recent_sessions_3(&self.conn, workspace)?,
1542            top_tools: top_tools_5(&self.conn, workspace)?,
1543            total_cost_usd_e6,
1544            sessions_with_cost,
1545        })
1546    }
1547
1548    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1549    pub fn retro_events_in_window(
1550        &self,
1551        workspace: &str,
1552        start_ms: u64,
1553        end_ms: u64,
1554    ) -> Result<Vec<(SessionRecord, Event)>> {
1555        let mut stmt = self.conn.prepare(
1556            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1557                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1558                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1559                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1560                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1561                    s.repo_file_count, s.repo_total_loc,
1562                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1563                    e.context_used_tokens, e.context_max_tokens,
1564                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1565             FROM events e
1566             JOIN sessions s ON s.id = e.session_id
1567             WHERE s.workspace = ?1
1568               AND (
1569                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1570                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1571               )
1572             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1573        )?;
1574        let rows = stmt.query_map(
1575            params![
1576                workspace,
1577                start_ms as i64,
1578                end_ms as i64,
1579                SYNTHETIC_TS_CEILING_MS,
1580            ],
1581            |row| {
1582                let payload_str: String = row.get(12)?;
1583                let status_str: String = row.get(19)?;
1584                Ok((
1585                    SessionRecord {
1586                        id: row.get(13)?,
1587                        agent: row.get(14)?,
1588                        model: row.get(15)?,
1589                        workspace: row.get(16)?,
1590                        started_at_ms: row.get::<_, i64>(17)? as u64,
1591                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1592                        status: status_from_str(&status_str),
1593                        trace_path: row.get(20)?,
1594                        start_commit: row.get(21)?,
1595                        end_commit: row.get(22)?,
1596                        branch: row.get(23)?,
1597                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1598                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1599                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1600                        prompt_fingerprint: row.get(27)?,
1601                        parent_session_id: row.get(28)?,
1602                        agent_version: row.get(29)?,
1603                        os: row.get(30)?,
1604                        arch: row.get(31)?,
1605                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1606                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1607                    },
1608                    Event {
1609                        session_id: row.get(0)?,
1610                        seq: row.get::<_, i64>(1)? as u64,
1611                        ts_ms: row.get::<_, i64>(2)? as u64,
1612                        ts_exact: row.get::<_, i64>(3)? != 0,
1613                        kind: kind_from_str(&row.get::<_, String>(4)?),
1614                        source: source_from_str(&row.get::<_, String>(5)?),
1615                        tool: row.get(6)?,
1616                        tool_call_id: row.get(7)?,
1617                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1618                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1619                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1620                        cost_usd_e6: row.get(11)?,
1621                        payload: serde_json::from_str(&payload_str)
1622                            .unwrap_or(serde_json::Value::Null),
1623                        stop_reason: row.get(34)?,
1624                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1625                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1626                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1627                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1628                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1629                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1630                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1631                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1632                    },
1633                ))
1634            },
1635        )?;
1636
1637        let mut out = Vec::new();
1638        for r in rows {
1639            out.push(r?);
1640        }
1641        Ok(out)
1642    }
1643
1644    pub fn experiment_metric_values_in_window(
1645        &self,
1646        workspace: &str,
1647        start_ms: u64,
1648        end_ms: u64,
1649        metric: crate::experiment::types::Metric,
1650    ) -> Result<Vec<(SessionRecord, f64)>> {
1651        use crate::experiment::types::Metric;
1652        let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1653            s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1654            s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1655            s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1656        let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1657            OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1658        let sql = match metric {
1659            Metric::TokensPerSession => format!(
1660                "SELECT {session_cols},
1661                    SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1662                 FROM sessions s JOIN events e ON e.session_id = s.id
1663                 WHERE {window}
1664                 GROUP BY s.id"
1665            ),
1666            Metric::CostPerSession => format!(
1667                "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1668                 FROM sessions s JOIN events e ON e.session_id = s.id
1669                 WHERE {window}
1670                 GROUP BY s.id"
1671            ),
1672            Metric::SuccessRate => format!(
1673                "SELECT {session_cols},
1674                    CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1675                 FROM sessions s JOIN events e ON e.session_id = s.id
1676                 WHERE {window}
1677                 GROUP BY s.id"
1678            ),
1679            Metric::DurationMinutes => format!(
1680                "SELECT {session_cols},
1681                    (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1682                 FROM sessions s
1683                 WHERE s.workspace = ?1
1684                   AND s.ended_at_ms IS NOT NULL
1685                   AND EXISTS (
1686                     SELECT 1 FROM events e
1687                     WHERE e.session_id = s.id
1688                       AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1689                         OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1690                   )"
1691            ),
1692            Metric::FilesPerSession => format!(
1693                "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1694                 FROM sessions s
1695                 JOIN events e ON e.session_id = s.id
1696                 LEFT JOIN files_touched ft ON ft.session_id = s.id
1697                 WHERE {window}
1698                 GROUP BY s.id"
1699            ),
1700            Metric::SuccessRateByPrompt => format!(
1701                "SELECT {session_cols},
1702                    1.0 - (MIN(
1703                      SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1704                      SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1705                    ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1706                 FROM sessions s JOIN events e ON e.session_id = s.id
1707                 WHERE {window}
1708                 GROUP BY s.id
1709                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1710            ),
1711            Metric::CostByPrompt => format!(
1712                "SELECT {session_cols},
1713                    SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1714                    SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1715                 FROM sessions s JOIN events e ON e.session_id = s.id
1716                 WHERE {window}
1717                 GROUP BY s.id
1718                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1719            ),
1720            Metric::ToolLoops => format!(
1721                "WITH calls AS (
1722                   SELECT e.session_id, e.tool,
1723                     LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1724                   FROM events e JOIN sessions s ON s.id = e.session_id
1725                   WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1726                 )
1727                 SELECT {session_cols},
1728                    SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1729                 FROM sessions s JOIN calls ON calls.session_id = s.id
1730                 GROUP BY s.id"
1731            ),
1732        };
1733        let mut stmt = self.conn.prepare(&sql)?;
1734        let rows = stmt.query_map(
1735            params![
1736                workspace,
1737                start_ms as i64,
1738                end_ms as i64,
1739                SYNTHETIC_TS_CEILING_MS,
1740            ],
1741            |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1742        )?;
1743        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1744    }
1745
1746    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1747    pub fn files_touched_in_window(
1748        &self,
1749        workspace: &str,
1750        start_ms: u64,
1751        end_ms: u64,
1752    ) -> Result<Vec<(String, String)>> {
1753        let mut stmt = self.conn.prepare(
1754            "SELECT DISTINCT ft.session_id, ft.path
1755             FROM files_touched ft
1756             JOIN sessions s ON s.id = ft.session_id
1757             WHERE s.workspace = ?1
1758               AND EXISTS (
1759                 SELECT 1 FROM events e
1760                 JOIN sessions ss ON ss.id = e.session_id
1761                 WHERE e.session_id = ft.session_id
1762                   AND (
1763                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1764                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1765                   )
1766               )
1767             ORDER BY ft.session_id, ft.path",
1768        )?;
1769        let out: Vec<(String, String)> = stmt
1770            .query_map(
1771                params![
1772                    workspace,
1773                    start_ms as i64,
1774                    end_ms as i64,
1775                    SYNTHETIC_TS_CEILING_MS,
1776                ],
1777                |r| Ok((r.get(0)?, r.get(1)?)),
1778            )?
1779            .filter_map(|r| r.ok())
1780            .collect();
1781        Ok(out)
1782    }
1783
1784    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1785    /// (any session with an indexed skill row; join events optional — use row existence).
1786    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1787        let mut stmt = self.conn.prepare(
1788            "SELECT DISTINCT su.skill
1789             FROM skills_used su
1790             JOIN sessions s ON s.id = su.session_id
1791             WHERE s.workspace = ?1
1792               AND EXISTS (
1793                 SELECT 1 FROM events e
1794                 JOIN sessions ss ON ss.id = e.session_id
1795                 WHERE e.session_id = su.session_id
1796                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1797               )
1798             ORDER BY su.skill",
1799        )?;
1800        let out: Vec<String> = stmt
1801            .query_map(
1802                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1803                |r| r.get::<_, String>(0),
1804            )?
1805            .filter_map(|r| r.ok())
1806            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1807            .collect();
1808        Ok(out)
1809    }
1810
1811    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1812    pub fn skills_used_in_window(
1813        &self,
1814        workspace: &str,
1815        start_ms: u64,
1816        end_ms: u64,
1817    ) -> Result<Vec<(String, String)>> {
1818        let mut stmt = self.conn.prepare(
1819            "SELECT DISTINCT su.session_id, su.skill
1820             FROM skills_used su
1821             JOIN sessions s ON s.id = su.session_id
1822             WHERE s.workspace = ?1
1823               AND EXISTS (
1824                 SELECT 1 FROM events e
1825                 JOIN sessions ss ON ss.id = e.session_id
1826                 WHERE e.session_id = su.session_id
1827                   AND (
1828                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1829                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1830                   )
1831               )
1832             ORDER BY su.session_id, su.skill",
1833        )?;
1834        let out: Vec<(String, String)> = stmt
1835            .query_map(
1836                params![
1837                    workspace,
1838                    start_ms as i64,
1839                    end_ms as i64,
1840                    SYNTHETIC_TS_CEILING_MS,
1841                ],
1842                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1843            )?
1844            .filter_map(|r| r.ok())
1845            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1846            .collect();
1847        Ok(out)
1848    }
1849
1850    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1851    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1852        let mut stmt = self.conn.prepare(
1853            "SELECT DISTINCT ru.rule
1854             FROM rules_used ru
1855             JOIN sessions s ON s.id = ru.session_id
1856             WHERE s.workspace = ?1
1857               AND EXISTS (
1858                 SELECT 1 FROM events e
1859                 JOIN sessions ss ON ss.id = e.session_id
1860                 WHERE e.session_id = ru.session_id
1861                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1862               )
1863             ORDER BY ru.rule",
1864        )?;
1865        let out: Vec<String> = stmt
1866            .query_map(
1867                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1868                |r| r.get::<_, String>(0),
1869            )?
1870            .filter_map(|r| r.ok())
1871            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1872            .collect();
1873        Ok(out)
1874    }
1875
1876    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1877    pub fn rules_used_in_window(
1878        &self,
1879        workspace: &str,
1880        start_ms: u64,
1881        end_ms: u64,
1882    ) -> Result<Vec<(String, String)>> {
1883        let mut stmt = self.conn.prepare(
1884            "SELECT DISTINCT ru.session_id, ru.rule
1885             FROM rules_used ru
1886             JOIN sessions s ON s.id = ru.session_id
1887             WHERE s.workspace = ?1
1888               AND EXISTS (
1889                 SELECT 1 FROM events e
1890                 JOIN sessions ss ON ss.id = e.session_id
1891                 WHERE e.session_id = ru.session_id
1892                   AND (
1893                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1894                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1895                   )
1896               )
1897             ORDER BY ru.session_id, ru.rule",
1898        )?;
1899        let out: Vec<(String, String)> = stmt
1900            .query_map(
1901                params![
1902                    workspace,
1903                    start_ms as i64,
1904                    end_ms as i64,
1905                    SYNTHETIC_TS_CEILING_MS,
1906                ],
1907                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1908            )?
1909            .filter_map(|r| r.ok())
1910            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1911            .collect();
1912        Ok(out)
1913    }
1914
1915    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1916    pub fn sessions_active_in_window(
1917        &self,
1918        workspace: &str,
1919        start_ms: u64,
1920        end_ms: u64,
1921    ) -> Result<HashSet<String>> {
1922        let mut stmt = self.conn.prepare(
1923            "SELECT DISTINCT s.id
1924             FROM sessions s
1925             WHERE s.workspace = ?1
1926               AND EXISTS (
1927                 SELECT 1 FROM events e
1928                 WHERE e.session_id = s.id
1929                   AND (
1930                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1931                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1932                   )
1933               )",
1934        )?;
1935        let out: HashSet<String> = stmt
1936            .query_map(
1937                params![
1938                    workspace,
1939                    start_ms as i64,
1940                    end_ms as i64,
1941                    SYNTHETIC_TS_CEILING_MS,
1942                ],
1943                |r| r.get(0),
1944            )?
1945            .filter_map(|r| r.ok())
1946            .collect();
1947        Ok(out)
1948    }
1949
1950    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1951    pub fn session_costs_usd_e6_in_window(
1952        &self,
1953        workspace: &str,
1954        start_ms: u64,
1955        end_ms: u64,
1956    ) -> Result<HashMap<String, i64>> {
1957        let mut stmt = self.conn.prepare(
1958            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1959             FROM events e
1960             JOIN sessions s ON s.id = e.session_id
1961             WHERE s.workspace = ?1
1962               AND (
1963                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1964                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1965               )
1966             GROUP BY e.session_id",
1967        )?;
1968        let rows: Vec<(String, i64)> = stmt
1969            .query_map(
1970                params![
1971                    workspace,
1972                    start_ms as i64,
1973                    end_ms as i64,
1974                    SYNTHETIC_TS_CEILING_MS,
1975                ],
1976                |r| Ok((r.get(0)?, r.get(1)?)),
1977            )?
1978            .filter_map(|r| r.ok())
1979            .collect();
1980        Ok(rows.into_iter().collect())
1981    }
1982
1983    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1984    pub fn guidance_report(
1985        &self,
1986        workspace: &str,
1987        window_start_ms: u64,
1988        window_end_ms: u64,
1989        skill_slugs_on_disk: &HashSet<String>,
1990        rule_slugs_on_disk: &HashSet<String>,
1991    ) -> Result<GuidanceReport> {
1992        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1993        let denom = active.len() as u64;
1994        let costs =
1995            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1996
1997        let workspace_avg_cost_per_session_usd = if denom > 0 {
1998            let total_e6: i64 = active
1999                .iter()
2000                .map(|sid| costs.get(sid).copied().unwrap_or(0))
2001                .sum();
2002            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
2003        } else {
2004            None
2005        };
2006
2007        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
2008        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
2009            skill_sessions.entry(skill).or_default().insert(sid);
2010        }
2011        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
2012        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
2013            rule_sessions.entry(rule).or_default().insert(sid);
2014        }
2015
2016        let mut rows: Vec<GuidancePerfRow> = Vec::new();
2017
2018        let mut push_row =
2019            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
2020                let sessions = sids.len() as u64;
2021                let sessions_pct = if denom > 0 {
2022                    sessions as f64 * 100.0 / denom as f64
2023                } else {
2024                    0.0
2025                };
2026                let total_cost_usd_e6: i64 = sids
2027                    .iter()
2028                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
2029                    .sum();
2030                let avg_cost_per_session_usd = if sessions > 0 {
2031                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
2032                } else {
2033                    None
2034                };
2035                let vs_workspace_avg_cost_per_session_usd =
2036                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
2037                        (Some(avg), Some(w)) => Some(avg - w),
2038                        _ => None,
2039                    };
2040                rows.push(GuidancePerfRow {
2041                    kind,
2042                    id,
2043                    sessions,
2044                    sessions_pct,
2045                    total_cost_usd_e6,
2046                    avg_cost_per_session_usd,
2047                    vs_workspace_avg_cost_per_session_usd,
2048                    on_disk,
2049                });
2050            };
2051
2052        let mut seen_skills: HashSet<String> = HashSet::new();
2053        for (id, sids) in &skill_sessions {
2054            seen_skills.insert(id.clone());
2055            push_row(
2056                GuidanceKind::Skill,
2057                id.clone(),
2058                sids,
2059                skill_slugs_on_disk.contains(id),
2060            );
2061        }
2062        for slug in skill_slugs_on_disk {
2063            if seen_skills.contains(slug) {
2064                continue;
2065            }
2066            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
2067        }
2068
2069        let mut seen_rules: HashSet<String> = HashSet::new();
2070        for (id, sids) in &rule_sessions {
2071            seen_rules.insert(id.clone());
2072            push_row(
2073                GuidanceKind::Rule,
2074                id.clone(),
2075                sids,
2076                rule_slugs_on_disk.contains(id),
2077            );
2078        }
2079        for slug in rule_slugs_on_disk {
2080            if seen_rules.contains(slug) {
2081                continue;
2082            }
2083            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
2084        }
2085
2086        rows.sort_by(|a, b| {
2087            b.sessions
2088                .cmp(&a.sessions)
2089                .then_with(|| a.kind.cmp(&b.kind))
2090                .then_with(|| a.id.cmp(&b.id))
2091        });
2092
2093        Ok(GuidanceReport {
2094            workspace: workspace.to_string(),
2095            window_start_ms,
2096            window_end_ms,
2097            sessions_in_window: denom,
2098            workspace_avg_cost_per_session_usd,
2099            rows,
2100        })
2101    }
2102
2103    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
2104        let mut stmt = self.conn.prepare(
2105            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
2106                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
2107                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
2108                    repo_file_count, repo_total_loc
2109             FROM sessions WHERE id = ?1",
2110        )?;
2111        let mut rows = stmt.query_map(params![id], |row| {
2112            Ok((
2113                row.get::<_, String>(0)?,
2114                row.get::<_, String>(1)?,
2115                row.get::<_, Option<String>>(2)?,
2116                row.get::<_, String>(3)?,
2117                row.get::<_, i64>(4)?,
2118                row.get::<_, Option<i64>>(5)?,
2119                row.get::<_, String>(6)?,
2120                row.get::<_, String>(7)?,
2121                row.get::<_, Option<String>>(8)?,
2122                row.get::<_, Option<String>>(9)?,
2123                row.get::<_, Option<String>>(10)?,
2124                row.get::<_, Option<i64>>(11)?,
2125                row.get::<_, Option<i64>>(12)?,
2126                row.get::<_, String>(13)?,
2127                row.get::<_, Option<String>>(14)?,
2128                row.get::<_, Option<String>>(15)?,
2129                row.get::<_, Option<String>>(16)?,
2130                row.get::<_, Option<String>>(17)?,
2131                row.get::<_, Option<String>>(18)?,
2132                row.get::<_, Option<i64>>(19)?,
2133                row.get::<_, Option<i64>>(20)?,
2134            ))
2135        })?;
2136
2137        if let Some(row) = rows.next() {
2138            let (
2139                id,
2140                agent,
2141                model,
2142                workspace,
2143                started,
2144                ended,
2145                status_str,
2146                trace,
2147                start_commit,
2148                end_commit,
2149                branch,
2150                dirty_start,
2151                dirty_end,
2152                source,
2153                prompt_fingerprint,
2154                parent_session_id,
2155                agent_version,
2156                os,
2157                arch,
2158                repo_file_count,
2159                repo_total_loc,
2160            ) = row?;
2161            Ok(Some(SessionRecord {
2162                id,
2163                agent,
2164                model,
2165                workspace,
2166                started_at_ms: started as u64,
2167                ended_at_ms: ended.map(|v| v as u64),
2168                status: status_from_str(&status_str),
2169                trace_path: trace,
2170                start_commit,
2171                end_commit,
2172                branch,
2173                dirty_start: dirty_start.map(i64_to_bool),
2174                dirty_end: dirty_end.map(i64_to_bool),
2175                repo_binding_source: empty_to_none(source),
2176                prompt_fingerprint,
2177                parent_session_id,
2178                agent_version,
2179                os,
2180                arch,
2181                repo_file_count: repo_file_count.map(|v| v as u32),
2182                repo_total_loc: repo_total_loc.map(|v| v as u64),
2183            }))
2184        } else {
2185            Ok(None)
2186        }
2187    }
2188
2189    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2190        let mut stmt = self.conn.prepare(
2191            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2192                    indexed_at_ms, dirty, graph_path
2193             FROM repo_snapshots WHERE workspace = ?1
2194             ORDER BY indexed_at_ms DESC LIMIT 1",
2195        )?;
2196        let mut rows = stmt.query_map(params![workspace], |row| {
2197            Ok(RepoSnapshotRecord {
2198                id: row.get(0)?,
2199                workspace: row.get(1)?,
2200                head_commit: row.get(2)?,
2201                dirty_fingerprint: row.get(3)?,
2202                analyzer_version: row.get(4)?,
2203                indexed_at_ms: row.get::<_, i64>(5)? as u64,
2204                dirty: row.get::<_, i64>(6)? != 0,
2205                graph_path: row.get(7)?,
2206            })
2207        })?;
2208        Ok(rows.next().transpose()?)
2209    }
2210
2211    pub fn save_repo_snapshot(
2212        &self,
2213        snapshot: &RepoSnapshotRecord,
2214        facts: &[FileFact],
2215        edges: &[RepoEdge],
2216    ) -> Result<()> {
2217        self.conn.execute(
2218            "INSERT INTO repo_snapshots (
2219                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2220                indexed_at_ms, dirty, graph_path
2221             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2222             ON CONFLICT(id) DO UPDATE SET
2223                workspace=excluded.workspace,
2224                head_commit=excluded.head_commit,
2225                dirty_fingerprint=excluded.dirty_fingerprint,
2226                analyzer_version=excluded.analyzer_version,
2227                indexed_at_ms=excluded.indexed_at_ms,
2228                dirty=excluded.dirty,
2229                graph_path=excluded.graph_path",
2230            params![
2231                snapshot.id,
2232                snapshot.workspace,
2233                snapshot.head_commit,
2234                snapshot.dirty_fingerprint,
2235                snapshot.analyzer_version,
2236                snapshot.indexed_at_ms as i64,
2237                bool_to_i64(snapshot.dirty),
2238                snapshot.graph_path,
2239            ],
2240        )?;
2241        self.conn.execute(
2242            "DELETE FROM file_facts WHERE snapshot_id = ?1",
2243            params![snapshot.id],
2244        )?;
2245        self.conn.execute(
2246            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2247            params![snapshot.id],
2248        )?;
2249        for fact in facts {
2250            self.conn.execute(
2251                "INSERT INTO file_facts (
2252                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2253                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2254                    churn_30d, churn_90d, authors_90d, last_changed_ms
2255                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2256                params![
2257                    fact.snapshot_id,
2258                    fact.path,
2259                    fact.language,
2260                    fact.bytes as i64,
2261                    fact.loc as i64,
2262                    fact.sloc as i64,
2263                    fact.complexity_total as i64,
2264                    fact.max_fn_complexity as i64,
2265                    fact.symbol_count as i64,
2266                    fact.import_count as i64,
2267                    fact.fan_in as i64,
2268                    fact.fan_out as i64,
2269                    fact.churn_30d as i64,
2270                    fact.churn_90d as i64,
2271                    fact.authors_90d as i64,
2272                    fact.last_changed_ms.map(|v| v as i64),
2273                ],
2274            )?;
2275        }
2276        for edge in edges {
2277            self.conn.execute(
2278                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2279                 VALUES (?1, ?2, ?3, ?4, ?5)
2280                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2281                 DO UPDATE SET weight = weight + excluded.weight",
2282                params![
2283                    snapshot.id,
2284                    edge.from_path,
2285                    edge.to_path,
2286                    edge.kind,
2287                    edge.weight as i64,
2288                ],
2289            )?;
2290        }
2291        Ok(())
2292    }
2293
2294    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2295        let mut stmt = self.conn.prepare(
2296            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2297                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2298                    churn_30d, churn_90d, authors_90d, last_changed_ms
2299             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2300        )?;
2301        let rows = stmt.query_map(params![snapshot_id], |row| {
2302            Ok(FileFact {
2303                snapshot_id: row.get(0)?,
2304                path: row.get(1)?,
2305                language: row.get(2)?,
2306                bytes: row.get::<_, i64>(3)? as u64,
2307                loc: row.get::<_, i64>(4)? as u32,
2308                sloc: row.get::<_, i64>(5)? as u32,
2309                complexity_total: row.get::<_, i64>(6)? as u32,
2310                max_fn_complexity: row.get::<_, i64>(7)? as u32,
2311                symbol_count: row.get::<_, i64>(8)? as u32,
2312                import_count: row.get::<_, i64>(9)? as u32,
2313                fan_in: row.get::<_, i64>(10)? as u32,
2314                fan_out: row.get::<_, i64>(11)? as u32,
2315                churn_30d: row.get::<_, i64>(12)? as u32,
2316                churn_90d: row.get::<_, i64>(13)? as u32,
2317                authors_90d: row.get::<_, i64>(14)? as u32,
2318                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2319            })
2320        })?;
2321        Ok(rows.filter_map(|row| row.ok()).collect())
2322    }
2323
2324    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2325        let mut stmt = self.conn.prepare(
2326            "SELECT from_id, to_id, kind, weight
2327             FROM repo_edges WHERE snapshot_id = ?1
2328             ORDER BY kind, from_id, to_id",
2329        )?;
2330        let rows = stmt.query_map(params![snapshot_id], |row| {
2331            Ok(RepoEdge {
2332                from_path: row.get(0)?,
2333                to_path: row.get(1)?,
2334                kind: row.get(2)?,
2335                weight: row.get::<_, i64>(3)? as u32,
2336            })
2337        })?;
2338        Ok(rows.filter_map(|row| row.ok()).collect())
2339    }
2340
2341    pub fn hottest_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2342        self.ranked_files_for_snapshot(snapshot_id, "churn_30d * complexity_total")
2343    }
2344
2345    pub fn most_changed_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2346        self.ranked_files_for_snapshot(snapshot_id, "churn_30d")
2347    }
2348
2349    pub fn most_complex_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2350        self.ranked_files_for_snapshot(snapshot_id, "complexity_total")
2351    }
2352
2353    pub fn highest_risk_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2354        self.ranked_files_for_snapshot(snapshot_id, "churn_30d * authors_90d * complexity_total")
2355    }
2356
2357    fn ranked_files_for_snapshot(
2358        &self,
2359        snapshot_id: &str,
2360        value_sql: &str,
2361    ) -> Result<Vec<RankedFile>> {
2362        let sql = format!(
2363            "SELECT path, {value_sql}, complexity_total, churn_30d
2364             FROM file_facts WHERE snapshot_id = ?1
2365             ORDER BY {value_sql} DESC, path ASC LIMIT 10"
2366        );
2367        let mut stmt = self.conn.prepare(&sql)?;
2368        let rows = stmt.query_map(params![snapshot_id], ranked_file_row)?;
2369        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2370    }
2371
2372    pub fn pain_hotspots_for_snapshot(
2373        &self,
2374        snapshot_id: &str,
2375        workspace: &str,
2376        start_ms: u64,
2377        end_ms: u64,
2378    ) -> Result<Vec<RankedFile>> {
2379        let mut stmt = self.conn.prepare(PAIN_HOTSPOTS_SQL)?;
2380        let rows = stmt.query_map(
2381            params![snapshot_id, workspace, start_ms as i64, end_ms as i64],
2382            ranked_file_row,
2383        )?;
2384        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2385    }
2386
2387    pub fn tool_rank_rows_in_window(
2388        &self,
2389        workspace: &str,
2390        start_ms: u64,
2391        end_ms: u64,
2392    ) -> Result<Vec<RankedTool>> {
2393        let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
2394        let rows = stmt.query_map(
2395            params![workspace, start_ms as i64, end_ms as i64],
2396            ranked_tool_row,
2397        )?;
2398        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2399    }
2400
2401    pub fn tool_spans_in_window(
2402        &self,
2403        workspace: &str,
2404        start_ms: u64,
2405        end_ms: u64,
2406    ) -> Result<Vec<ToolSpanView>> {
2407        let mut stmt = self.conn.prepare(
2408            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2409                    reasoning_tokens, cost_usd_e6, paths_json,
2410                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2411             FROM (
2412                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2413                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2414                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2415                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2416                        ts.started_at_ms AS sort_ms
2417                 FROM tool_spans ts
2418                 JOIN sessions s ON s.id = ts.session_id
2419                 WHERE s.workspace = ?1
2420                   AND ts.started_at_ms >= ?2
2421                   AND ts.started_at_ms <= ?3
2422                 UNION ALL
2423                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2424                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2425                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2426                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2427                        ts.ended_at_ms AS sort_ms
2428                 FROM tool_spans ts
2429                 JOIN sessions s ON s.id = ts.session_id
2430                 WHERE s.workspace = ?1
2431                   AND ts.started_at_ms IS NULL
2432                   AND ts.ended_at_ms >= ?2
2433                   AND ts.ended_at_ms <= ?3
2434             )
2435             ORDER BY sort_ms DESC",
2436        )?;
2437        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2438            let paths_json: String = row.get(8)?;
2439            Ok(ToolSpanView {
2440                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2441                tool: row
2442                    .get::<_, Option<String>>(1)?
2443                    .unwrap_or_else(|| "unknown".into()),
2444                status: row.get(2)?,
2445                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2446                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2447                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2448                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2449                cost_usd_e6: row.get(7)?,
2450                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2451                parent_span_id: row.get(9)?,
2452                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2453                subtree_cost_usd_e6: row.get(11)?,
2454                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2455            })
2456        })?;
2457        Ok(rows.filter_map(|row| row.ok()).collect())
2458    }
2459
2460    pub fn session_span_tree(
2461        &self,
2462        session_id: &str,
2463    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2464        let last_event_seq = self.last_event_seq_for_session(session_id)?;
2465        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2466            && entry.session_id == session_id
2467            && entry.last_event_seq == last_event_seq
2468        {
2469            return Ok(entry.nodes.clone());
2470        }
2471        let mut stmt = self.conn.prepare(
2472            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2473                    reasoning_tokens, cost_usd_e6, paths_json,
2474                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2475             FROM tool_spans
2476             WHERE session_id = ?1
2477             ORDER BY depth ASC, started_at_ms ASC",
2478        )?;
2479        let rows = stmt.query_map(params![session_id], |row| {
2480            let paths_json: String = row.get(8)?;
2481            Ok(crate::metrics::types::ToolSpanView {
2482                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2483                tool: row
2484                    .get::<_, Option<String>>(1)?
2485                    .unwrap_or_else(|| "unknown".into()),
2486                status: row.get(2)?,
2487                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2488                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2489                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2490                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2491                cost_usd_e6: row.get(7)?,
2492                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2493                parent_span_id: row.get(9)?,
2494                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2495                subtree_cost_usd_e6: row.get(11)?,
2496                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2497            })
2498        })?;
2499        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2500        let nodes = crate::store::span_tree::build_tree(spans);
2501        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2502            session_id: session_id.to_string(),
2503            last_event_seq,
2504            nodes: nodes.clone(),
2505        });
2506        Ok(nodes)
2507    }
2508
2509    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2510        let seq = self
2511            .conn
2512            .query_row(
2513                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2514                params![session_id],
2515                |r| r.get::<_, Option<i64>>(0),
2516            )?
2517            .map(|v| v as u64);
2518        Ok(seq)
2519    }
2520
2521    /// Sync-shaped tool spans whose session falls in `[start_ms, end_ms]`. Mirrors
2522    /// `retro_events_in_window` for the spans table so `kaizen telemetry push` can ship
2523    /// `IngestExportBatch::ToolSpans` next to the events batch. Window matches on
2524    /// `started_at_ms` first, falling back to `ended_at_ms` for spans that never started a
2525    /// timer (status-only rows). Workspace filter joins through `sessions.workspace`.
2526    pub fn tool_spans_sync_rows_in_window(
2527        &self,
2528        workspace: &str,
2529        start_ms: u64,
2530        end_ms: u64,
2531    ) -> Result<Vec<ToolSpanSyncRow>> {
2532        let mut stmt = self.conn.prepare(
2533            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2534                    lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2535             FROM (
2536                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2537                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2538                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2539                        ts.started_at_ms AS sort_ms
2540                 FROM tool_spans ts
2541                 JOIN sessions s ON s.id = ts.session_id
2542                 WHERE s.workspace = ?1
2543                   AND ts.started_at_ms IS NOT NULL
2544                   AND ts.started_at_ms >= ?2
2545                   AND ts.started_at_ms <= ?3
2546                 UNION ALL
2547                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2548                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2549                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2550                        ts.ended_at_ms AS sort_ms
2551                 FROM tool_spans ts
2552                 JOIN sessions s ON s.id = ts.session_id
2553                 WHERE s.workspace = ?1
2554                   AND ts.started_at_ms IS NULL
2555                   AND ts.ended_at_ms IS NOT NULL
2556                   AND ts.ended_at_ms >= ?2
2557                   AND ts.ended_at_ms <= ?3
2558             )
2559             ORDER BY sort_ms ASC, span_id ASC",
2560        )?;
2561        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2562            let paths_json: String = row.get(12)?;
2563            Ok(ToolSpanSyncRow {
2564                span_id: row.get(0)?,
2565                session_id: row.get(1)?,
2566                tool: row.get(2)?,
2567                tool_call_id: row.get(3)?,
2568                status: row.get(4)?,
2569                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2570                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2571                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2572                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2573                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2574                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2575                cost_usd_e6: row.get(11)?,
2576                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2577            })
2578        })?;
2579        Ok(rows.filter_map(|row| row.ok()).collect())
2580    }
2581
2582    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2583        let mut stmt = self.conn.prepare(
2584            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2585                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2586             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2587        )?;
2588        let rows = stmt.query_map(params![session_id], |row| {
2589            let paths_json: String = row.get(12)?;
2590            Ok(ToolSpanSyncRow {
2591                span_id: row.get(0)?,
2592                session_id: row.get(1)?,
2593                tool: row.get(2)?,
2594                tool_call_id: row.get(3)?,
2595                status: row.get(4)?,
2596                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2597                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2598                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2599                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2600                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2601                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2602                cost_usd_e6: row.get(11)?,
2603                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2604            })
2605        })?;
2606        Ok(rows.filter_map(|row| row.ok()).collect())
2607    }
2608
2609    pub fn upsert_trace_span(&self, span: &TraceSpanRecord) -> Result<()> {
2610        self.conn.execute(
2611            "INSERT INTO trace_spans (
2612                span_id, trace_id, parent_span_id, session_id, kind, name, status,
2613                started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2614                reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2615             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)
2616             ON CONFLICT(span_id) DO UPDATE SET
2617                trace_id=excluded.trace_id, parent_span_id=excluded.parent_span_id,
2618                session_id=excluded.session_id, kind=excluded.kind, name=excluded.name,
2619                status=excluded.status, started_at_ms=excluded.started_at_ms,
2620                ended_at_ms=excluded.ended_at_ms, duration_ms=excluded.duration_ms,
2621                model=excluded.model, tool=excluded.tool, tokens_in=excluded.tokens_in,
2622                tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
2623                cost_usd_e6=excluded.cost_usd_e6,
2624                context_used_tokens=excluded.context_used_tokens,
2625                context_max_tokens=excluded.context_max_tokens, payload=excluded.payload",
2626            params![
2627                span.span_id.as_str(),
2628                span.trace_id.as_str(),
2629                span.parent_span_id.as_deref(),
2630                span.session_id.as_str(),
2631                span.kind.as_str(),
2632                span.name.as_str(),
2633                span.status.as_str(),
2634                span.started_at_ms.map(|v| v as i64),
2635                span.ended_at_ms.map(|v| v as i64),
2636                span.duration_ms.map(|v| v as i64),
2637                span.model.as_deref(),
2638                span.tool.as_deref(),
2639                span.tokens_in.map(|v| v as i64),
2640                span.tokens_out.map(|v| v as i64),
2641                span.reasoning_tokens.map(|v| v as i64),
2642                span.cost_usd_e6,
2643                span.context_used_tokens.map(|v| v as i64),
2644                span.context_max_tokens.map(|v| v as i64),
2645                serde_json::to_string(&span.payload)?,
2646            ],
2647        )?;
2648        Ok(())
2649    }
2650
2651    pub fn trace_spans_for_session(&self, session_id: &str) -> Result<Vec<TraceSpanRecord>> {
2652        let mut stmt = self.conn.prepare(
2653            "SELECT span_id, trace_id, parent_span_id, session_id, kind, name, status,
2654                    started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2655                    reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2656             FROM trace_spans WHERE session_id = ?1
2657             ORDER BY COALESCE(started_at_ms, ended_at_ms, 0), span_id",
2658        )?;
2659        let rows = stmt.query_map(params![session_id], trace_span_from_row)?;
2660        Ok(rows.filter_map(|row| row.ok()).collect())
2661    }
2662
2663    pub(crate) fn capture_quality_rows(
2664        &self,
2665        workspace: &str,
2666        start_ms: u64,
2667        end_ms: u64,
2668    ) -> Result<Vec<CaptureQualityRow>> {
2669        let mut stmt = self.conn.prepare(
2670            "SELECT e.source,
2671                    e.tokens_in IS NOT NULL OR e.tokens_out IS NOT NULL OR e.reasoning_tokens IS NOT NULL,
2672                    e.latency_ms IS NOT NULL OR e.ttft_ms IS NOT NULL,
2673                    e.context_used_tokens IS NOT NULL AND e.context_max_tokens IS NOT NULL
2674             FROM events e JOIN sessions s ON s.id = e.session_id
2675             WHERE s.workspace = ?1 AND e.ts_ms >= ?2 AND e.ts_ms <= ?3",
2676        )?;
2677        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2678            Ok(CaptureQualityRow {
2679                source: row.get(0)?,
2680                has_tokens: row.get::<_, i64>(1)? != 0,
2681                has_latency: row.get::<_, i64>(2)? != 0,
2682                has_context: row.get::<_, i64>(3)? != 0,
2683            })
2684        })?;
2685        Ok(rows.filter_map(|row| row.ok()).collect())
2686    }
2687
2688    pub(crate) fn trace_span_quality_rows(
2689        &self,
2690        workspace: &str,
2691        start_ms: u64,
2692        end_ms: u64,
2693    ) -> Result<Vec<TraceSpanQualityRow>> {
2694        let mut stmt = self.conn.prepare(
2695            "SELECT ts.kind,
2696                    ts.parent_span_id IS NOT NULL
2697                    AND parent.span_id IS NULL
2698                    AND ts.kind NOT IN ('session', 'agent')
2699             FROM trace_spans ts
2700             JOIN sessions s ON s.id = ts.session_id
2701             LEFT JOIN trace_spans parent ON parent.span_id = ts.parent_span_id
2702             WHERE s.workspace = ?1
2703               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
2704               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3",
2705        )?;
2706        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2707            Ok(TraceSpanQualityRow {
2708                kind: row.get(0)?,
2709                is_orphan: row.get::<_, i64>(1)? != 0,
2710            })
2711        })?;
2712        Ok(rows.filter_map(|row| row.ok()).collect())
2713    }
2714
2715    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2716        self.conn.execute(
2717            "INSERT OR REPLACE INTO session_evals
2718             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2719             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2720            rusqlite::params![
2721                eval.id,
2722                eval.session_id,
2723                eval.judge_model,
2724                eval.rubric_id,
2725                eval.score,
2726                eval.rationale,
2727                eval.flagged as i64,
2728                eval.created_at_ms as i64,
2729            ],
2730        )?;
2731        Ok(())
2732    }
2733
2734    pub fn list_evals_in_window(
2735        &self,
2736        start_ms: u64,
2737        end_ms: u64,
2738    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2739        let mut stmt = self.conn.prepare(
2740            "SELECT id, session_id, judge_model, rubric_id, score,
2741                    rationale, flagged, created_at_ms
2742             FROM session_evals
2743             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2744             ORDER BY created_at_ms ASC",
2745        )?;
2746        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2747            Ok(crate::eval::types::EvalRow {
2748                id: r.get(0)?,
2749                session_id: r.get(1)?,
2750                judge_model: r.get(2)?,
2751                rubric_id: r.get(3)?,
2752                score: r.get(4)?,
2753                rationale: r.get(5)?,
2754                flagged: r.get::<_, i64>(6)? != 0,
2755                created_at_ms: r.get::<_, i64>(7)? as u64,
2756            })
2757        })?;
2758        rows.collect()
2759    }
2760
2761    pub fn list_evals_for_session(
2762        &self,
2763        session_id: &str,
2764    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2765        let mut stmt = self.conn.prepare(
2766            "SELECT id, session_id, judge_model, rubric_id, score,
2767                    rationale, flagged, created_at_ms
2768             FROM session_evals
2769             WHERE session_id = ?1
2770             ORDER BY created_at_ms DESC",
2771        )?;
2772        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2773            Ok(crate::eval::types::EvalRow {
2774                id: r.get(0)?,
2775                session_id: r.get(1)?,
2776                judge_model: r.get(2)?,
2777                rubric_id: r.get(3)?,
2778                score: r.get(4)?,
2779                rationale: r.get(5)?,
2780                flagged: r.get::<_, i64>(6)? != 0,
2781                created_at_ms: r.get::<_, i64>(7)? as u64,
2782            })
2783        })?;
2784        rows.collect()
2785    }
2786
2787    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2788        use crate::feedback::types::FeedbackLabel;
2789        self.conn.execute(
2790            "INSERT OR REPLACE INTO session_feedback
2791             (id, session_id, score, label, note, created_at_ms)
2792             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2793            rusqlite::params![
2794                r.id,
2795                r.session_id,
2796                r.score.as_ref().map(|s| s.0 as i64),
2797                r.label.as_ref().map(FeedbackLabel::to_db_str),
2798                r.note,
2799                r.created_at_ms as i64,
2800            ],
2801        )?;
2802        let payload = serde_json::to_string(r).unwrap_or_default();
2803        self.conn.execute(
2804            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2805             VALUES (?1, 'session_feedback', ?2, 0)",
2806            rusqlite::params![r.session_id, payload],
2807        )?;
2808        Ok(())
2809    }
2810
2811    pub fn list_feedback_in_window(
2812        &self,
2813        start_ms: u64,
2814        end_ms: u64,
2815    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2816        let mut stmt = self.conn.prepare(
2817            "SELECT id, session_id, score, label, note, created_at_ms
2818             FROM session_feedback
2819             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2820             ORDER BY created_at_ms ASC",
2821        )?;
2822        let rows = stmt.query_map(
2823            rusqlite::params![start_ms as i64, end_ms as i64],
2824            feedback_row,
2825        )?;
2826        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2827    }
2828
2829    pub fn feedback_for_sessions(
2830        &self,
2831        ids: &[String],
2832    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2833        if ids.is_empty() {
2834            return Ok(std::collections::HashMap::new());
2835        }
2836        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2837        let sql = format!(
2838            "SELECT id, session_id, score, label, note, created_at_ms
2839             FROM session_feedback WHERE session_id IN ({placeholders})
2840             ORDER BY created_at_ms DESC"
2841        );
2842        let mut stmt = self.conn.prepare(&sql)?;
2843        let params: Vec<&dyn rusqlite::ToSql> =
2844            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2845        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2846        let mut map = std::collections::HashMap::new();
2847        for row in rows {
2848            let r = row?;
2849            map.entry(r.session_id.clone()).or_insert(r);
2850        }
2851        Ok(map)
2852    }
2853
2854    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2855        self.conn.execute(
2856            "INSERT INTO session_outcomes (
2857                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2858                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2859            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2860            ON CONFLICT(session_id) DO UPDATE SET
2861                test_passed=excluded.test_passed,
2862                test_failed=excluded.test_failed,
2863                test_skipped=excluded.test_skipped,
2864                build_ok=excluded.build_ok,
2865                lint_errors=excluded.lint_errors,
2866                revert_lines_14d=excluded.revert_lines_14d,
2867                pr_open=excluded.pr_open,
2868                ci_ok=excluded.ci_ok,
2869                measured_at_ms=excluded.measured_at_ms,
2870                measure_error=excluded.measure_error",
2871            params![
2872                row.session_id,
2873                row.test_passed,
2874                row.test_failed,
2875                row.test_skipped,
2876                row.build_ok.map(bool_to_i64),
2877                row.lint_errors,
2878                row.revert_lines_14d,
2879                row.pr_open,
2880                row.ci_ok.map(bool_to_i64),
2881                row.measured_at_ms as i64,
2882                row.measure_error.as_deref(),
2883            ],
2884        )?;
2885        Ok(())
2886    }
2887
2888    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2889        let mut stmt = self.conn.prepare(
2890            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2891                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2892             FROM session_outcomes WHERE session_id = ?1",
2893        )?;
2894        let row = stmt
2895            .query_row(params![session_id], outcome_row)
2896            .optional()?;
2897        Ok(row)
2898    }
2899
2900    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2901    pub fn list_session_outcomes_in_window(
2902        &self,
2903        workspace: &str,
2904        start_ms: u64,
2905        end_ms: u64,
2906    ) -> Result<Vec<SessionOutcomeRow>> {
2907        let mut stmt = self.conn.prepare(
2908            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2909                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2910             FROM session_outcomes o
2911             JOIN sessions s ON s.id = o.session_id
2912             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2913             ORDER BY o.measured_at_ms ASC",
2914        )?;
2915        let rows = stmt.query_map(
2916            params![workspace, start_ms as i64, end_ms as i64],
2917            outcome_row,
2918        )?;
2919        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2920    }
2921
2922    pub fn append_session_sample(
2923        &self,
2924        session_id: &str,
2925        ts_ms: u64,
2926        pid: u32,
2927        cpu_percent: Option<f64>,
2928        rss_bytes: Option<u64>,
2929    ) -> Result<()> {
2930        self.conn.execute(
2931            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2932             VALUES (?1, ?2, ?3, ?4, ?5)",
2933            params![
2934                session_id,
2935                ts_ms as i64,
2936                pid as i64,
2937                cpu_percent,
2938                rss_bytes.map(|b| b as i64)
2939            ],
2940        )?;
2941        Ok(())
2942    }
2943
2944    /// Per-session maxima for retro heuristics.
2945    pub fn list_session_sample_aggs_in_window(
2946        &self,
2947        workspace: &str,
2948        start_ms: u64,
2949        end_ms: u64,
2950    ) -> Result<Vec<SessionSampleAgg>> {
2951        let mut stmt = self.conn.prepare(
2952            "SELECT ss.session_id, COUNT(*) AS n,
2953                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2954             FROM session_samples ss
2955             JOIN sessions s ON s.id = ss.session_id
2956             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2957             GROUP BY ss.session_id",
2958        )?;
2959        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2960            let sid: String = r.get(0)?;
2961            let n: i64 = r.get(1)?;
2962            let max_cpu: Option<f64> = r.get(2)?;
2963            let max_rss: Option<i64> = r.get(3)?;
2964            Ok(SessionSampleAgg {
2965                session_id: sid,
2966                sample_count: n as u64,
2967                max_cpu_percent: max_cpu.unwrap_or(0.0),
2968                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2969            })
2970        })?;
2971        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2972    }
2973
2974    pub fn list_sessions_for_eval(
2975        &self,
2976        since_ms: u64,
2977        min_cost_usd: f64,
2978    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2979        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2980        let mut stmt = self.conn.prepare(
2981            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2982                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2983                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2984                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2985             FROM sessions s
2986             WHERE s.started_at_ms >= ?1
2987               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2988               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2989             ORDER BY s.started_at_ms DESC",
2990        )?;
2991        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2992            Ok((
2993                r.get::<_, String>(0)?,
2994                r.get::<_, String>(1)?,
2995                r.get::<_, Option<String>>(2)?,
2996                r.get::<_, String>(3)?,
2997                r.get::<_, i64>(4)?,
2998                r.get::<_, Option<i64>>(5)?,
2999                r.get::<_, String>(6)?,
3000                r.get::<_, String>(7)?,
3001                r.get::<_, Option<String>>(8)?,
3002                r.get::<_, Option<String>>(9)?,
3003                r.get::<_, Option<String>>(10)?,
3004                r.get::<_, Option<i64>>(11)?,
3005                r.get::<_, Option<i64>>(12)?,
3006                r.get::<_, Option<String>>(13)?,
3007                r.get::<_, Option<String>>(14)?,
3008                r.get::<_, Option<String>>(15)?,
3009                r.get::<_, Option<String>>(16)?,
3010                r.get::<_, Option<String>>(17)?,
3011                r.get::<_, Option<String>>(18)?,
3012                r.get::<_, Option<i64>>(19)?,
3013                r.get::<_, Option<i64>>(20)?,
3014            ))
3015        })?;
3016        let mut out = Vec::new();
3017        for row in rows {
3018            let (
3019                id,
3020                agent,
3021                model,
3022                workspace,
3023                started,
3024                ended,
3025                status_str,
3026                trace,
3027                start_commit,
3028                end_commit,
3029                branch,
3030                dirty_start,
3031                dirty_end,
3032                source,
3033                prompt_fingerprint,
3034                parent_session_id,
3035                agent_version,
3036                os,
3037                arch,
3038                repo_file_count,
3039                repo_total_loc,
3040            ) = row?;
3041            out.push(crate::core::event::SessionRecord {
3042                id,
3043                agent,
3044                model,
3045                workspace,
3046                started_at_ms: started as u64,
3047                ended_at_ms: ended.map(|v| v as u64),
3048                status: status_from_str(&status_str),
3049                trace_path: trace,
3050                start_commit,
3051                end_commit,
3052                branch,
3053                dirty_start: dirty_start.map(i64_to_bool),
3054                dirty_end: dirty_end.map(i64_to_bool),
3055                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
3056                prompt_fingerprint,
3057                parent_session_id,
3058                agent_version,
3059                os,
3060                arch,
3061                repo_file_count: repo_file_count.map(|v| v as u32),
3062                repo_total_loc: repo_total_loc.map(|v| v as u64),
3063            });
3064        }
3065        Ok(out)
3066    }
3067
3068    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
3069        self.conn.execute(
3070            "INSERT OR IGNORE INTO prompt_snapshots
3071             (fingerprint, captured_at_ms, files_json, total_bytes)
3072             VALUES (?1, ?2, ?3, ?4)",
3073            params![
3074                snap.fingerprint,
3075                snap.captured_at_ms as i64,
3076                snap.files_json,
3077                snap.total_bytes as i64
3078            ],
3079        )?;
3080        Ok(())
3081    }
3082
3083    pub fn get_prompt_snapshot(
3084        &self,
3085        fingerprint: &str,
3086    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
3087        self.conn
3088            .query_row(
3089                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3090                 FROM prompt_snapshots WHERE fingerprint = ?1",
3091                params![fingerprint],
3092                |r| {
3093                    Ok(crate::prompt::PromptSnapshot {
3094                        fingerprint: r.get(0)?,
3095                        captured_at_ms: r.get::<_, i64>(1)? as u64,
3096                        files_json: r.get(2)?,
3097                        total_bytes: r.get::<_, i64>(3)? as u64,
3098                    })
3099                },
3100            )
3101            .optional()
3102            .map_err(Into::into)
3103    }
3104
3105    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
3106        let mut stmt = self.conn.prepare(
3107            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3108             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
3109        )?;
3110        let rows = stmt.query_map([], |r| {
3111            Ok(crate::prompt::PromptSnapshot {
3112                fingerprint: r.get(0)?,
3113                captured_at_ms: r.get::<_, i64>(1)? as u64,
3114                files_json: r.get(2)?,
3115                total_bytes: r.get::<_, i64>(3)? as u64,
3116            })
3117        })?;
3118        Ok(rows.filter_map(|r| r.ok()).collect())
3119    }
3120
3121    /// Sessions with a non-null prompt_fingerprint in the given window.
3122    pub fn sessions_with_prompt_fingerprint(
3123        &self,
3124        workspace: &str,
3125        start_ms: u64,
3126        end_ms: u64,
3127    ) -> Result<Vec<(String, String)>> {
3128        let mut stmt = self.conn.prepare(
3129            "SELECT id, prompt_fingerprint FROM sessions
3130             WHERE workspace = ?1
3131               AND started_at_ms >= ?2 AND started_at_ms < ?3
3132               AND prompt_fingerprint IS NOT NULL",
3133        )?;
3134        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
3135            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
3136        })?;
3137        Ok(rows.filter_map(|r| r.ok()).collect())
3138    }
3139}
3140
3141impl Drop for Store {
3142    fn drop(&mut self) {
3143        if let Some(writer) = self.search_writer.get_mut().as_mut() {
3144            let _ = writer.commit();
3145        }
3146    }
3147}
3148
3149fn now_ms() -> u64 {
3150    std::time::SystemTime::now()
3151        .duration_since(std::time::UNIX_EPOCH)
3152        .unwrap_or_default()
3153        .as_millis() as u64
3154}
3155
3156fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
3157    let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
3158    let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
3159    Ok(rows.filter_map(|r| r.ok()).collect())
3160}
3161
3162fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
3163    raw.and_then(|s| s.trim().parse::<u64>().ok())
3164        .unwrap_or(DEFAULT_MMAP_MB)
3165        .saturating_mul(1024)
3166        .saturating_mul(1024)
3167        .min(i64::MAX as u64) as i64
3168}
3169
3170fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
3171    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
3172    conn.execute_batch(&format!(
3173        "
3174        PRAGMA journal_mode=WAL;
3175        PRAGMA busy_timeout=5000;
3176        PRAGMA synchronous=NORMAL;
3177        PRAGMA cache_size=-65536;
3178        PRAGMA mmap_size={mmap_size};
3179        PRAGMA temp_store=MEMORY;
3180        PRAGMA wal_autocheckpoint=1000;
3181        "
3182    ))?;
3183    if mode == StoreOpenMode::ReadOnlyQuery {
3184        conn.execute_batch("PRAGMA query_only=ON;")?;
3185    }
3186    Ok(())
3187}
3188
3189fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
3190    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
3191}
3192
3193fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
3194    let status_str: String = row.get(6)?;
3195    Ok(SessionRecord {
3196        id: row.get(0)?,
3197        agent: row.get(1)?,
3198        model: row.get(2)?,
3199        workspace: row.get(3)?,
3200        started_at_ms: row.get::<_, i64>(4)? as u64,
3201        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3202        status: status_from_str(&status_str),
3203        trace_path: row.get(7)?,
3204        start_commit: row.get(8)?,
3205        end_commit: row.get(9)?,
3206        branch: row.get(10)?,
3207        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3208        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3209        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
3210        prompt_fingerprint: row.get(14)?,
3211        parent_session_id: row.get(15)?,
3212        agent_version: row.get(16)?,
3213        os: row.get(17)?,
3214        arch: row.get(18)?,
3215        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3216        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3217    })
3218}
3219
3220fn ranked_file_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedFile> {
3221    Ok(RankedFile {
3222        path: row.get(0)?,
3223        value: row.get::<_, i64>(1)? as u64,
3224        complexity_total: row.get::<_, i64>(2)? as u32,
3225        churn_30d: row.get::<_, i64>(3)? as u32,
3226    })
3227}
3228
3229fn ranked_tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedTool> {
3230    Ok(RankedTool {
3231        tool: row.get(0)?,
3232        calls: row.get::<_, i64>(1)? as u64,
3233        p50_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
3234        p95_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
3235        total_tokens: row.get::<_, i64>(4)? as u64,
3236        total_reasoning_tokens: row.get::<_, i64>(5)? as u64,
3237    })
3238}
3239
3240fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
3241    let payload_str: String = row.get(12)?;
3242    Ok(Event {
3243        session_id: row.get(0)?,
3244        seq: row.get::<_, i64>(1)? as u64,
3245        ts_ms: row.get::<_, i64>(2)? as u64,
3246        ts_exact: row.get::<_, i64>(3)? != 0,
3247        kind: kind_from_str(&row.get::<_, String>(4)?),
3248        source: source_from_str(&row.get::<_, String>(5)?),
3249        tool: row.get(6)?,
3250        tool_call_id: row.get(7)?,
3251        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
3252        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3253        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
3254        cost_usd_e6: row.get(11)?,
3255        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
3256        stop_reason: row.get(13)?,
3257        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3258        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
3259        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
3260        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3261        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
3262        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3263        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
3264        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
3265    })
3266}
3267
3268fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> {
3269    Ok((row.get(22)?, event_row(row)?))
3270}
3271
3272fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
3273    let mut clauses = vec!["workspace = ?".to_string()];
3274    let mut args = vec![Value::Text(workspace.to_string())];
3275    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
3276        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
3277        args.push(Value::Text(format!("{}%", escape_like(prefix))));
3278    }
3279    if let Some(status) = &filter.status {
3280        clauses.push("status = ?".to_string());
3281        args.push(Value::Text(format!("{status:?}")));
3282    }
3283    if let Some(since_ms) = filter.since_ms {
3284        clauses.push("started_at_ms >= ?".to_string());
3285        args.push(Value::Integer(since_ms as i64));
3286    }
3287    (format!("WHERE {}", clauses.join(" AND ")), args)
3288}
3289
3290fn escape_like(raw: &str) -> String {
3291    raw.to_lowercase()
3292        .replace('\\', "\\\\")
3293        .replace('%', "\\%")
3294        .replace('_', "\\_")
3295}
3296
3297fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
3298    let cost: i64 = conn.query_row(
3299        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
3300        params![workspace], |r| r.get(0),
3301    )?;
3302    let with_cost: i64 = conn.query_row(
3303        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
3304        params![workspace], |r| r.get(0),
3305    )?;
3306    Ok((cost, with_cost as u64))
3307}
3308
3309fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
3310    let build_raw: Option<i64> = r.get(4)?;
3311    let ci_raw: Option<i64> = r.get(8)?;
3312    Ok(SessionOutcomeRow {
3313        session_id: r.get(0)?,
3314        test_passed: r.get(1)?,
3315        test_failed: r.get(2)?,
3316        test_skipped: r.get(3)?,
3317        build_ok: build_raw.map(|v| v != 0),
3318        lint_errors: r.get(5)?,
3319        revert_lines_14d: r.get(6)?,
3320        pr_open: r.get(7)?,
3321        ci_ok: ci_raw.map(|v| v != 0),
3322        measured_at_ms: r.get::<_, i64>(9)? as u64,
3323        measure_error: r.get(10)?,
3324    })
3325}
3326
3327fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
3328    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
3329    let score = r
3330        .get::<_, Option<i64>>(2)?
3331        .and_then(|v| FeedbackScore::new(v as u8));
3332    let label = r
3333        .get::<_, Option<String>>(3)?
3334        .and_then(|s| FeedbackLabel::from_str_opt(&s));
3335    Ok(FeedbackRecord {
3336        id: r.get(0)?,
3337        session_id: r.get(1)?,
3338        score,
3339        label,
3340        note: r.get(4)?,
3341        created_at_ms: r.get::<_, i64>(5)? as u64,
3342    })
3343}
3344
3345fn day_label(day_idx: u64) -> &'static str {
3346    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
3347}
3348
3349fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
3350    let week_ago = now.saturating_sub(7 * 86_400_000);
3351    let mut stmt = conn
3352        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
3353    let days: Vec<u64> = stmt
3354        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
3355        .filter_map(|r| r.ok())
3356        .map(|v| v as u64 / 86_400_000)
3357        .collect();
3358    let today = now / 86_400_000;
3359    Ok((0u64..7)
3360        .map(|i| {
3361            let d = today.saturating_sub(6 - i);
3362            (
3363                day_label(d).to_string(),
3364                days.iter().filter(|&&x| x == d).count() as u64,
3365            )
3366        })
3367        .collect())
3368}
3369
3370fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3371    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3372               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3373               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3374               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3375               COUNT(e.id) FROM sessions s \
3376               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3377               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3378    let mut stmt = conn.prepare(sql)?;
3379    let out: Vec<(SessionRecord, u64)> = stmt
3380        .query_map(params![workspace], |r| {
3381            let st: String = r.get(6)?;
3382            Ok((
3383                SessionRecord {
3384                    id: r.get(0)?,
3385                    agent: r.get(1)?,
3386                    model: r.get(2)?,
3387                    workspace: r.get(3)?,
3388                    started_at_ms: r.get::<_, i64>(4)? as u64,
3389                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3390                    status: status_from_str(&st),
3391                    trace_path: r.get(7)?,
3392                    start_commit: r.get(8)?,
3393                    end_commit: r.get(9)?,
3394                    branch: r.get(10)?,
3395                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3396                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3397                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3398                    prompt_fingerprint: r.get(14)?,
3399                    parent_session_id: r.get(15)?,
3400                    agent_version: r.get(16)?,
3401                    os: r.get(17)?,
3402                    arch: r.get(18)?,
3403                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3404                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3405                },
3406                r.get::<_, i64>(21)? as u64,
3407            ))
3408        })?
3409        .filter_map(|r| r.ok())
3410        .collect();
3411    Ok(out)
3412}
3413
3414fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3415    let mut stmt = conn.prepare(
3416        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3417         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3418    )?;
3419    let out: Vec<(String, u64)> = stmt
3420        .query_map(params![workspace], |r| {
3421            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3422        })?
3423        .filter_map(|r| r.ok())
3424        .collect();
3425    Ok(out)
3426}
3427
3428fn trace_span_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TraceSpanRecord> {
3429    let kind: String = row.get(4)?;
3430    let payload: String = row.get(18)?;
3431    Ok(TraceSpanRecord {
3432        span_id: row.get(0)?,
3433        trace_id: row.get(1)?,
3434        parent_span_id: row.get(2)?,
3435        session_id: row.get(3)?,
3436        kind: TraceSpanKind::parse(&kind),
3437        name: row.get(5)?,
3438        status: row.get(6)?,
3439        started_at_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
3440        ended_at_ms: row.get::<_, Option<i64>>(8)?.map(|v| v as u64),
3441        duration_ms: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3442        model: row.get(10)?,
3443        tool: row.get(11)?,
3444        tokens_in: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
3445        tokens_out: row.get::<_, Option<i64>>(13)?.map(|v| v as u32),
3446        reasoning_tokens: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3447        cost_usd_e6: row.get(15)?,
3448        context_used_tokens: row.get::<_, Option<i64>>(16)?.map(|v| v as u32),
3449        context_max_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3450        payload: serde_json::from_str(&payload).unwrap_or_default(),
3451    })
3452}
3453
3454fn status_from_str(s: &str) -> SessionStatus {
3455    match s {
3456        "Running" => SessionStatus::Running,
3457        "Waiting" => SessionStatus::Waiting,
3458        "Idle" => SessionStatus::Idle,
3459        _ => SessionStatus::Done,
3460    }
3461}
3462
3463fn projector_legacy_mode() -> bool {
3464    std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3465}
3466
3467fn is_stop_event(e: &Event) -> bool {
3468    if !matches!(e.kind, EventKind::Hook) {
3469        return false;
3470    }
3471    e.payload
3472        .get("event")
3473        .and_then(|v| v.as_str())
3474        .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3475        == Some("Stop")
3476}
3477
3478fn kind_from_str(s: &str) -> EventKind {
3479    match s {
3480        "ToolCall" => EventKind::ToolCall,
3481        "ToolResult" => EventKind::ToolResult,
3482        "Message" => EventKind::Message,
3483        "Error" => EventKind::Error,
3484        "Cost" => EventKind::Cost,
3485        "Hook" => EventKind::Hook,
3486        "Lifecycle" => EventKind::Lifecycle,
3487        _ => EventKind::Hook,
3488    }
3489}
3490
3491fn source_from_str(s: &str) -> EventSource {
3492    match s {
3493        "Tail" => EventSource::Tail,
3494        "Hook" => EventSource::Hook,
3495        _ => EventSource::Proxy,
3496    }
3497}
3498
3499fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3500    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3501    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3502    ensure_column(conn, "sessions", "branch", "TEXT")?;
3503    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3504    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3505    ensure_column(
3506        conn,
3507        "sessions",
3508        "repo_binding_source",
3509        "TEXT NOT NULL DEFAULT ''",
3510    )?;
3511    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3512    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3513    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3514    ensure_column(conn, "events", "stop_reason", "TEXT")?;
3515    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3516    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3517    ensure_column(conn, "events", "retry_count", "INTEGER")?;
3518    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3519    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3520    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3521    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3522    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3523    ensure_column(
3524        conn,
3525        "sync_outbox",
3526        "kind",
3527        "TEXT NOT NULL DEFAULT 'events'",
3528    )?;
3529    ensure_column(
3530        conn,
3531        "experiments",
3532        "state",
3533        "TEXT NOT NULL DEFAULT 'Draft'",
3534    )?;
3535    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3536    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3537    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3538    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3539    ensure_column(conn, "sessions", "os", "TEXT")?;
3540    ensure_column(conn, "sessions", "arch", "TEXT")?;
3541    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3542    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3543    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3544    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3545    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3546    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3547    conn.execute_batch(
3548        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3549         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3550    )?;
3551    Ok(())
3552}
3553
3554fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3555    if has_column(conn, table, column)? {
3556        return Ok(());
3557    }
3558    let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}");
3559    match conn.execute(&sql, []) {
3560        Ok(_) => Ok(()),
3561        Err(err) if column_was_added_by_race(conn, table, column, &err)? => Ok(()),
3562        Err(err) => Err(err.into()),
3563    }
3564}
3565
3566fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3567    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3568    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3569    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3570}
3571
3572fn column_was_added_by_race(
3573    conn: &Connection,
3574    table: &str,
3575    column: &str,
3576    err: &rusqlite::Error,
3577) -> Result<bool> {
3578    if !is_duplicate_column_error(err) {
3579        return Ok(false);
3580    }
3581    has_column(conn, table, column)
3582}
3583
3584fn is_duplicate_column_error(err: &rusqlite::Error) -> bool {
3585    matches!(
3586        err,
3587        rusqlite::Error::SqliteFailure(_, Some(message))
3588            if message.contains("duplicate column name")
3589    )
3590}
3591
3592fn bool_to_i64(v: bool) -> i64 {
3593    if v { 1 } else { 0 }
3594}
3595
3596fn i64_to_bool(v: i64) -> bool {
3597    v != 0
3598}
3599
3600fn empty_to_none(s: String) -> Option<String> {
3601    if s.is_empty() { None } else { Some(s) }
3602}
3603
3604#[cfg(test)]
3605mod tests {
3606    use super::*;
3607    use serde_json::json;
3608    use std::collections::HashSet;
3609    use tempfile::TempDir;
3610
3611    fn make_session(id: &str) -> SessionRecord {
3612        SessionRecord {
3613            id: id.to_string(),
3614            agent: "cursor".to_string(),
3615            model: None,
3616            workspace: "/ws".to_string(),
3617            started_at_ms: 1000,
3618            ended_at_ms: None,
3619            status: SessionStatus::Done,
3620            trace_path: "/trace".to_string(),
3621            start_commit: None,
3622            end_commit: None,
3623            branch: None,
3624            dirty_start: None,
3625            dirty_end: None,
3626            repo_binding_source: None,
3627            prompt_fingerprint: None,
3628            parent_session_id: None,
3629            agent_version: None,
3630            os: None,
3631            arch: None,
3632            repo_file_count: None,
3633            repo_total_loc: None,
3634        }
3635    }
3636
3637    fn make_event(session_id: &str, seq: u64) -> Event {
3638        Event {
3639            session_id: session_id.to_string(),
3640            seq,
3641            ts_ms: 1000 + seq * 100,
3642            ts_exact: false,
3643            kind: EventKind::ToolCall,
3644            source: EventSource::Tail,
3645            tool: Some("read_file".to_string()),
3646            tool_call_id: Some(format!("call_{seq}")),
3647            tokens_in: None,
3648            tokens_out: None,
3649            reasoning_tokens: None,
3650            cost_usd_e6: None,
3651            stop_reason: None,
3652            latency_ms: None,
3653            ttft_ms: None,
3654            retry_count: None,
3655            context_used_tokens: None,
3656            context_max_tokens: None,
3657            cache_creation_tokens: None,
3658            cache_read_tokens: None,
3659            system_prompt_tokens: None,
3660            payload: json!({}),
3661        }
3662    }
3663
3664    #[test]
3665    fn open_and_wal_mode() {
3666        let dir = TempDir::new().unwrap();
3667        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3668        let mode: String = store
3669            .conn
3670            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3671            .unwrap();
3672        assert_eq!(mode, "wal");
3673    }
3674
3675    #[test]
3676    fn open_applies_phase0_pragmas() {
3677        let dir = TempDir::new().unwrap();
3678        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3679        let synchronous: i64 = store
3680            .conn
3681            .query_row("PRAGMA synchronous", [], |r| r.get(0))
3682            .unwrap();
3683        let cache_size: i64 = store
3684            .conn
3685            .query_row("PRAGMA cache_size", [], |r| r.get(0))
3686            .unwrap();
3687        let temp_store: i64 = store
3688            .conn
3689            .query_row("PRAGMA temp_store", [], |r| r.get(0))
3690            .unwrap();
3691        let wal_autocheckpoint: i64 = store
3692            .conn
3693            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3694            .unwrap();
3695        assert_eq!(synchronous, 1);
3696        assert_eq!(cache_size, -65_536);
3697        assert_eq!(temp_store, 2);
3698        assert_eq!(wal_autocheckpoint, 1_000);
3699        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3700    }
3701
3702    #[test]
3703    fn ensure_column_tolerates_duplicate_from_race() {
3704        let conn = Connection::open_in_memory().unwrap();
3705        conn.execute_batch("CREATE TABLE sessions (id TEXT, start_commit TEXT)")
3706            .unwrap();
3707        let err = conn
3708            .execute("ALTER TABLE sessions ADD COLUMN start_commit TEXT", [])
3709            .unwrap_err();
3710        assert!(column_was_added_by_race(&conn, "sessions", "start_commit", &err).unwrap());
3711    }
3712
3713    #[test]
3714    fn read_only_open_sets_query_only() {
3715        let dir = TempDir::new().unwrap();
3716        let db = dir.path().join("kaizen.db");
3717        Store::open(&db).unwrap();
3718        let store = Store::open_read_only(&db).unwrap();
3719        let query_only: i64 = store
3720            .conn
3721            .query_row("PRAGMA query_only", [], |r| r.get(0))
3722            .unwrap();
3723        assert_eq!(query_only, 1);
3724    }
3725
3726    #[test]
3727    fn phase0_indexes_exist() {
3728        let dir = TempDir::new().unwrap();
3729        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3730        for name in [
3731            "tool_spans_session_idx",
3732            "tool_spans_started_idx",
3733            "session_samples_ts_idx",
3734            "events_ts_idx",
3735            "feedback_session_idx",
3736        ] {
3737            let found: i64 = store
3738                .conn
3739                .query_row(
3740                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3741                    params![name],
3742                    |r| r.get(0),
3743                )
3744                .unwrap();
3745            assert_eq!(found, 1, "{name}");
3746        }
3747    }
3748
3749    #[test]
3750    fn upsert_and_get_session() {
3751        let dir = TempDir::new().unwrap();
3752        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3753        let s = make_session("s1");
3754        store.upsert_session(&s).unwrap();
3755
3756        let got = store.get_session("s1").unwrap().unwrap();
3757        assert_eq!(got.id, "s1");
3758        assert_eq!(got.status, SessionStatus::Done);
3759    }
3760
3761    #[test]
3762    fn append_and_list_events_round_trip() {
3763        let dir = TempDir::new().unwrap();
3764        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3765        let s = make_session("s2");
3766        store.upsert_session(&s).unwrap();
3767        store.append_event(&make_event("s2", 0)).unwrap();
3768        store.append_event(&make_event("s2", 1)).unwrap();
3769
3770        let sessions = store.list_sessions("/ws").unwrap();
3771        assert_eq!(sessions.len(), 1);
3772        assert_eq!(sessions[0].id, "s2");
3773    }
3774
3775    #[test]
3776    fn list_sessions_page_orders_and_counts() {
3777        let dir = TempDir::new().unwrap();
3778        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3779        let mut a = make_session("a");
3780        a.started_at_ms = 2_000;
3781        let mut b = make_session("b");
3782        b.started_at_ms = 2_000;
3783        let mut c = make_session("c");
3784        c.started_at_ms = 1_000;
3785        store.upsert_session(&c).unwrap();
3786        store.upsert_session(&b).unwrap();
3787        store.upsert_session(&a).unwrap();
3788
3789        let page = store
3790            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3791            .unwrap();
3792        assert_eq!(page.total, 3);
3793        assert_eq!(page.next_offset, Some(2));
3794        assert_eq!(
3795            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3796            vec!["a", "b"]
3797        );
3798
3799        let all = store.list_sessions("/ws").unwrap();
3800        assert_eq!(
3801            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3802            vec!["a", "b", "c"]
3803        );
3804    }
3805
3806    #[test]
3807    fn list_sessions_page_filters_in_sql_shape() {
3808        let dir = TempDir::new().unwrap();
3809        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3810        let mut cursor = make_session("cursor");
3811        cursor.agent = "Cursor".into();
3812        cursor.started_at_ms = 2_000;
3813        cursor.status = SessionStatus::Running;
3814        let mut claude = make_session("claude");
3815        claude.agent = "claude".into();
3816        claude.started_at_ms = 3_000;
3817        store.upsert_session(&cursor).unwrap();
3818        store.upsert_session(&claude).unwrap();
3819
3820        let page = store
3821            .list_sessions_page(
3822                "/ws",
3823                0,
3824                10,
3825                SessionFilter {
3826                    agent_prefix: Some("cur".into()),
3827                    status: Some(SessionStatus::Running),
3828                    since_ms: Some(1_500),
3829                },
3830            )
3831            .unwrap();
3832        assert_eq!(page.total, 1);
3833        assert_eq!(page.rows[0].id, "cursor");
3834    }
3835
3836    #[test]
3837    fn incremental_session_helpers_find_new_rows_and_statuses() {
3838        let dir = TempDir::new().unwrap();
3839        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3840        let mut old = make_session("old");
3841        old.started_at_ms = 1_000;
3842        let mut new = make_session("new");
3843        new.started_at_ms = 2_000;
3844        new.status = SessionStatus::Running;
3845        store.upsert_session(&old).unwrap();
3846        store.upsert_session(&new).unwrap();
3847
3848        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3849        assert_eq!(rows.len(), 1);
3850        assert_eq!(rows[0].id, "new");
3851
3852        store
3853            .update_session_status("new", SessionStatus::Done)
3854            .unwrap();
3855        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3856        assert_eq!(statuses.len(), 1);
3857        assert_eq!(statuses[0].status, SessionStatus::Done);
3858    }
3859
3860    #[test]
3861    fn summary_stats_empty() {
3862        let dir = TempDir::new().unwrap();
3863        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3864        let stats = store.summary_stats("/ws").unwrap();
3865        assert_eq!(stats.session_count, 0);
3866        assert_eq!(stats.total_cost_usd_e6, 0);
3867    }
3868
3869    #[test]
3870    fn summary_stats_counts_sessions() {
3871        let dir = TempDir::new().unwrap();
3872        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3873        store.upsert_session(&make_session("a")).unwrap();
3874        store.upsert_session(&make_session("b")).unwrap();
3875        let stats = store.summary_stats("/ws").unwrap();
3876        assert_eq!(stats.session_count, 2);
3877        assert_eq!(stats.by_agent.len(), 1);
3878        assert_eq!(stats.by_agent[0].0, "cursor");
3879        assert_eq!(stats.by_agent[0].1, 2);
3880    }
3881
3882    #[test]
3883    fn list_events_for_session_round_trip() {
3884        let dir = TempDir::new().unwrap();
3885        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3886        store.upsert_session(&make_session("s4")).unwrap();
3887        store.append_event(&make_event("s4", 0)).unwrap();
3888        store.append_event(&make_event("s4", 1)).unwrap();
3889        let events = store.list_events_for_session("s4").unwrap();
3890        assert_eq!(events.len(), 2);
3891        assert_eq!(events[0].seq, 0);
3892        assert_eq!(events[1].seq, 1);
3893    }
3894
3895    #[test]
3896    fn list_events_page_uses_inclusive_seq_cursor() {
3897        let dir = TempDir::new().unwrap();
3898        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3899        store.upsert_session(&make_session("paged")).unwrap();
3900        for seq in 0..5 {
3901            store.append_event(&make_event("paged", seq)).unwrap();
3902        }
3903        let first = store.list_events_page("paged", 0, 2).unwrap();
3904        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3905        let second = store
3906            .list_events_page("paged", first[1].seq + 1, 2)
3907            .unwrap();
3908        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3909    }
3910
3911    #[test]
3912    fn append_event_dedup() {
3913        let dir = TempDir::new().unwrap();
3914        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3915        store.upsert_session(&make_session("s5")).unwrap();
3916        store.append_event(&make_event("s5", 0)).unwrap();
3917        // Duplicate — should be silently ignored
3918        store.append_event(&make_event("s5", 0)).unwrap();
3919        let events = store.list_events_for_session("s5").unwrap();
3920        assert_eq!(events.len(), 1);
3921    }
3922
3923    #[test]
3924    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3925        let dir = TempDir::new().unwrap();
3926        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3927        assert!(store.session_span_tree("missing").unwrap().is_empty());
3928        assert!(store.span_tree_cache.borrow().is_some());
3929
3930        store.upsert_session(&make_session("tree")).unwrap();
3931        let call = make_event("tree", 0);
3932        store.append_event(&call).unwrap();
3933        assert!(store.span_tree_cache.borrow().is_none());
3934        assert!(store.session_span_tree("tree").unwrap().is_empty());
3935        assert!(store.span_tree_cache.borrow().is_some());
3936        let mut result = make_event("tree", 1);
3937        result.kind = EventKind::ToolResult;
3938        result.tool_call_id = call.tool_call_id.clone();
3939        store.append_event(&result).unwrap();
3940        assert!(store.span_tree_cache.borrow().is_none());
3941        let first = store.session_span_tree("tree").unwrap();
3942        assert_eq!(first.len(), 1);
3943        assert!(store.span_tree_cache.borrow().is_some());
3944        store.append_event(&make_event("tree", 2)).unwrap();
3945        assert!(store.span_tree_cache.borrow().is_none());
3946    }
3947
3948    #[test]
3949    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3950        let dir = TempDir::new().unwrap();
3951        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3952        store.upsert_session(&make_session("spans")).unwrap();
3953        for (id, started, ended) in [
3954            ("started", Some(200_i64), None),
3955            ("fallback", None, Some(250_i64)),
3956            ("outside", Some(400_i64), None),
3957            ("too_old", None, Some(50_i64)),
3958            ("started_wins", Some(500_i64), Some(200_i64)),
3959        ] {
3960            store
3961                .conn
3962                .execute(
3963                    "INSERT INTO tool_spans
3964                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3965                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3966                    params![id, started, ended],
3967                )
3968                .unwrap();
3969        }
3970        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3971        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3972        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3973    }
3974
3975    #[test]
3976    fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3977        let dir = TempDir::new().unwrap();
3978        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3979        store.upsert_session(&make_session("s1")).unwrap();
3980        for (id, started, ended) in [
3981            ("inside_started", Some(150_i64), None),
3982            ("inside_ended_only", None, Some(220_i64)),
3983            ("after_window", Some(400_i64), None),
3984            ("before_window", None, Some(50_i64)),
3985        ] {
3986            store
3987                .conn
3988                .execute(
3989                    "INSERT INTO tool_spans
3990                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3991                     VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3992                    params![id, started, ended],
3993                )
3994                .unwrap();
3995        }
3996        let rows = store
3997            .tool_spans_sync_rows_in_window("/ws", 100, 300)
3998            .unwrap();
3999        let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
4000        assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
4001        assert!(rows.iter().all(|r| r.session_id == "s1"));
4002    }
4003
4004    #[test]
4005    fn upsert_idempotent() {
4006        let dir = TempDir::new().unwrap();
4007        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4008        let mut s = make_session("s3");
4009        store.upsert_session(&s).unwrap();
4010        s.status = SessionStatus::Running;
4011        store.upsert_session(&s).unwrap();
4012
4013        let got = store.get_session("s3").unwrap().unwrap();
4014        assert_eq!(got.status, SessionStatus::Running);
4015    }
4016
4017    #[test]
4018    fn append_event_indexes_path_from_payload() {
4019        let dir = TempDir::new().unwrap();
4020        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4021        store.upsert_session(&make_session("sx")).unwrap();
4022        let mut ev = make_event("sx", 0);
4023        ev.payload = json!({"input": {"path": "src/lib.rs"}});
4024        store.append_event(&ev).unwrap();
4025        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
4026        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
4027    }
4028
4029    #[test]
4030    fn update_session_status_changes_status() {
4031        let dir = TempDir::new().unwrap();
4032        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4033        store.upsert_session(&make_session("s6")).unwrap();
4034        store
4035            .update_session_status("s6", SessionStatus::Running)
4036            .unwrap();
4037        let got = store.get_session("s6").unwrap().unwrap();
4038        assert_eq!(got.status, SessionStatus::Running);
4039    }
4040
4041    #[test]
4042    fn prune_sessions_removes_old_rows_and_keeps_recent() {
4043        let dir = TempDir::new().unwrap();
4044        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4045        let mut old = make_session("old");
4046        old.started_at_ms = 1_000;
4047        let mut new = make_session("new");
4048        new.started_at_ms = 9_000_000_000_000;
4049        store.upsert_session(&old).unwrap();
4050        store.upsert_session(&new).unwrap();
4051        store.append_event(&make_event("old", 0)).unwrap();
4052
4053        let stats = store.prune_sessions_started_before(5_000).unwrap();
4054        assert_eq!(
4055            stats,
4056            PruneStats {
4057                sessions_removed: 1,
4058                events_removed: 1,
4059            }
4060        );
4061        assert!(store.get_session("old").unwrap().is_none());
4062        assert!(store.get_session("new").unwrap().is_some());
4063        let sessions = store.list_sessions("/ws").unwrap();
4064        assert_eq!(sessions.len(), 1);
4065        assert_eq!(sessions[0].id, "new");
4066    }
4067
4068    #[test]
4069    fn append_event_indexes_rules_from_payload() {
4070        let dir = TempDir::new().unwrap();
4071        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4072        store.upsert_session(&make_session("sr")).unwrap();
4073        let mut ev = make_event("sr", 0);
4074        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
4075        store.append_event(&ev).unwrap();
4076        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
4077        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
4078    }
4079
4080    #[test]
4081    fn guidance_report_counts_skill_and_rule_sessions() {
4082        let dir = TempDir::new().unwrap();
4083        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4084        store.upsert_session(&make_session("sx")).unwrap();
4085        let mut ev = make_event("sx", 0);
4086        ev.payload =
4087            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
4088        ev.cost_usd_e6 = Some(500_000);
4089        store.append_event(&ev).unwrap();
4090
4091        let mut skill_slugs = HashSet::new();
4092        skill_slugs.insert("tdd".into());
4093        let mut rule_slugs = HashSet::new();
4094        rule_slugs.insert("style".into());
4095
4096        let rep = store
4097            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
4098            .unwrap();
4099        assert_eq!(rep.sessions_in_window, 1);
4100        let tdd = rep
4101            .rows
4102            .iter()
4103            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
4104            .unwrap();
4105        assert_eq!(tdd.sessions, 1);
4106        assert!(tdd.on_disk);
4107        let style = rep
4108            .rows
4109            .iter()
4110            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
4111            .unwrap();
4112        assert_eq!(style.sessions, 1);
4113        assert!(style.on_disk);
4114    }
4115
4116    #[test]
4117    fn prune_sessions_removes_rules_used_rows() {
4118        let dir = TempDir::new().unwrap();
4119        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4120        let mut old = make_session("old_r");
4121        old.started_at_ms = 1_000;
4122        store.upsert_session(&old).unwrap();
4123        let mut ev = make_event("old_r", 0);
4124        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
4125        store.append_event(&ev).unwrap();
4126
4127        store.prune_sessions_started_before(5_000).unwrap();
4128        let n: i64 = store
4129            .conn
4130            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
4131            .unwrap();
4132        assert_eq!(n, 0);
4133    }
4134}