Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8    FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
16use crate::sync::context::SyncIngestContext;
17use crate::sync::outbound::outbound_event_from_row;
18use crate::sync::redact::redact_payload;
19use crate::sync::smart::enqueue_tool_spans_for_session;
20use anyhow::{Context, Result};
21use rusqlite::types::Value;
22use rusqlite::{
23    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
24};
25use std::cell::RefCell;
26use std::collections::{HashMap, HashSet};
27use std::path::{Path, PathBuf};
28
29/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
30/// Rows below this use `sessions.started_at_ms` for time-window matching.
31const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
32const DEFAULT_MMAP_MB: u64 = 256;
33const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
34    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
35    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
36    repo_file_count, repo_total_loc FROM sessions";
37const PAIN_HOTSPOTS_SQL: &str = "
38    SELECT f.path,
39           COUNT(s.id) * f.complexity_total AS value,
40           f.complexity_total,
41           f.churn_30d
42    FROM file_facts f
43    LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
44    LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
45       AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
46         OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
47    LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
48    WHERE f.snapshot_id = ?1
49    GROUP BY f.path, f.complexity_total, f.churn_30d
50    ORDER BY value DESC, f.path ASC
51    LIMIT 10";
52const TOOL_RANK_ROWS_SQL: &str = "
53    WITH scoped AS (
54      SELECT COALESCE(ts.tool, 'unknown') AS tool,
55             ts.lead_time_ms,
56             COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
57                 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
58             COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
59      FROM tool_spans ts
60      JOIN sessions s ON s.id = ts.session_id
61      WHERE s.workspace = ?1
62        AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
63          OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
64    ),
65    agg AS (
66      SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
67             SUM(reasoning_tokens) AS total_reasoning_tokens
68      FROM scoped GROUP BY tool
69    ),
70    lat AS (
71      SELECT tool, lead_time_ms,
72             ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
73             COUNT(*) OVER (PARTITION BY tool) AS n
74      FROM scoped WHERE lead_time_ms IS NOT NULL
75    ),
76    pct AS (
77      SELECT tool,
78             MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
79             MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
80      FROM lat GROUP BY tool
81    )
82    SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
83           agg.total_tokens, agg.total_reasoning_tokens
84    FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
85
86const MIGRATIONS: &[&str] = &[
87    "CREATE TABLE IF NOT EXISTS sessions (
88        id TEXT PRIMARY KEY,
89        agent TEXT NOT NULL,
90        model TEXT,
91        workspace TEXT NOT NULL,
92        started_at_ms INTEGER NOT NULL,
93        ended_at_ms INTEGER,
94        status TEXT NOT NULL,
95        trace_path TEXT NOT NULL
96    )",
97    "CREATE TABLE IF NOT EXISTS events (
98        id INTEGER PRIMARY KEY AUTOINCREMENT,
99        session_id TEXT NOT NULL,
100        seq INTEGER NOT NULL,
101        ts_ms INTEGER NOT NULL,
102        kind TEXT NOT NULL,
103        source TEXT NOT NULL,
104        tool TEXT,
105        tokens_in INTEGER,
106        tokens_out INTEGER,
107        cost_usd_e6 INTEGER,
108        payload TEXT NOT NULL
109    )",
110    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
111    "CREATE TABLE IF NOT EXISTS files_touched (
112        id INTEGER PRIMARY KEY AUTOINCREMENT,
113        session_id TEXT NOT NULL,
114        path TEXT NOT NULL
115    )",
116    "CREATE TABLE IF NOT EXISTS skills_used (
117        id INTEGER PRIMARY KEY AUTOINCREMENT,
118        session_id TEXT NOT NULL,
119        skill TEXT NOT NULL
120    )",
121    "CREATE TABLE IF NOT EXISTS sync_outbox (
122        id INTEGER PRIMARY KEY AUTOINCREMENT,
123        session_id TEXT NOT NULL,
124        payload TEXT NOT NULL,
125        sent INTEGER NOT NULL DEFAULT 0
126    )",
127    "CREATE TABLE IF NOT EXISTS experiments (
128        id TEXT PRIMARY KEY,
129        name TEXT NOT NULL,
130        created_at_ms INTEGER NOT NULL,
131        metadata TEXT NOT NULL DEFAULT '{}'
132    )",
133    "CREATE TABLE IF NOT EXISTS experiment_tags (
134        experiment_id TEXT NOT NULL,
135        session_id TEXT NOT NULL,
136        variant TEXT NOT NULL,
137        PRIMARY KEY (experiment_id, session_id)
138    )",
139    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
140    "CREATE TABLE IF NOT EXISTS sync_state (
141        k TEXT PRIMARY KEY,
142        v TEXT NOT NULL
143    )",
144    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
145    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
146    "CREATE TABLE IF NOT EXISTS tool_spans (
147        span_id TEXT PRIMARY KEY,
148        session_id TEXT NOT NULL,
149        tool TEXT,
150        tool_call_id TEXT,
151        status TEXT NOT NULL,
152        started_at_ms INTEGER,
153        ended_at_ms INTEGER,
154        lead_time_ms INTEGER,
155        tokens_in INTEGER,
156        tokens_out INTEGER,
157        reasoning_tokens INTEGER,
158        cost_usd_e6 INTEGER,
159        paths_json TEXT NOT NULL DEFAULT '[]'
160    )",
161    "CREATE TABLE IF NOT EXISTS tool_span_paths (
162        span_id TEXT NOT NULL,
163        path TEXT NOT NULL,
164        PRIMARY KEY (span_id, path)
165    )",
166    "CREATE TABLE IF NOT EXISTS trace_spans (
167        span_id TEXT PRIMARY KEY,
168        trace_id TEXT NOT NULL,
169        parent_span_id TEXT,
170        session_id TEXT NOT NULL,
171        kind TEXT NOT NULL,
172        name TEXT NOT NULL,
173        status TEXT NOT NULL,
174        started_at_ms INTEGER,
175        ended_at_ms INTEGER,
176        duration_ms INTEGER,
177        model TEXT,
178        tool TEXT,
179        tokens_in INTEGER,
180        tokens_out INTEGER,
181        reasoning_tokens INTEGER,
182        cost_usd_e6 INTEGER,
183        context_used_tokens INTEGER,
184        context_max_tokens INTEGER,
185        payload TEXT NOT NULL DEFAULT '{}'
186    )",
187    "CREATE INDEX IF NOT EXISTS trace_spans_session_idx ON trace_spans(session_id)",
188    "CREATE INDEX IF NOT EXISTS trace_spans_trace_idx ON trace_spans(trace_id)",
189    "CREATE INDEX IF NOT EXISTS trace_spans_started_idx ON trace_spans(started_at_ms)",
190    "CREATE TABLE IF NOT EXISTS session_repo_binding (
191        session_id TEXT PRIMARY KEY,
192        start_commit TEXT,
193        end_commit TEXT,
194        branch TEXT,
195        dirty_start INTEGER,
196        dirty_end INTEGER,
197        repo_binding_source TEXT NOT NULL DEFAULT ''
198    )",
199    "CREATE TABLE IF NOT EXISTS repo_snapshots (
200        id TEXT PRIMARY KEY,
201        workspace TEXT NOT NULL,
202        head_commit TEXT,
203        dirty_fingerprint TEXT NOT NULL,
204        analyzer_version TEXT NOT NULL,
205        indexed_at_ms INTEGER NOT NULL,
206        dirty INTEGER NOT NULL DEFAULT 0,
207        graph_path TEXT NOT NULL
208    )",
209    "CREATE TABLE IF NOT EXISTS file_facts (
210        snapshot_id TEXT NOT NULL,
211        path TEXT NOT NULL,
212        language TEXT NOT NULL,
213        bytes INTEGER NOT NULL,
214        loc INTEGER NOT NULL,
215        sloc INTEGER NOT NULL,
216        complexity_total INTEGER NOT NULL,
217        max_fn_complexity INTEGER NOT NULL,
218        symbol_count INTEGER NOT NULL,
219        import_count INTEGER NOT NULL,
220        fan_in INTEGER NOT NULL,
221        fan_out INTEGER NOT NULL,
222        churn_30d INTEGER NOT NULL,
223        churn_90d INTEGER NOT NULL,
224        authors_90d INTEGER NOT NULL,
225        last_changed_ms INTEGER,
226        PRIMARY KEY (snapshot_id, path)
227    )",
228    "CREATE TABLE IF NOT EXISTS repo_edges (
229        snapshot_id TEXT NOT NULL,
230        from_id TEXT NOT NULL,
231        to_id TEXT NOT NULL,
232        kind TEXT NOT NULL,
233        weight INTEGER NOT NULL,
234        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
235    )",
236    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
237    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
238    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
239    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
240    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
241        ON sessions(workspace, started_at_ms DESC, id ASC)",
242    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
243        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
244    "CREATE TABLE IF NOT EXISTS rules_used (
245        id INTEGER PRIMARY KEY AUTOINCREMENT,
246        session_id TEXT NOT NULL,
247        rule TEXT NOT NULL
248    )",
249    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
250    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
251    "CREATE TABLE IF NOT EXISTS remote_pull_state (
252        id INTEGER PRIMARY KEY CHECK (id = 1),
253        query_provider TEXT NOT NULL DEFAULT 'none',
254        cursor_json TEXT NOT NULL DEFAULT '',
255        last_success_ms INTEGER
256    )",
257    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
258    "CREATE TABLE IF NOT EXISTS remote_sessions (
259        team_id TEXT NOT NULL,
260        workspace_hash TEXT NOT NULL,
261        session_id_hash TEXT NOT NULL,
262        json TEXT NOT NULL,
263        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
264    )",
265    "CREATE TABLE IF NOT EXISTS remote_events (
266        team_id TEXT NOT NULL,
267        workspace_hash TEXT NOT NULL,
268        session_id_hash TEXT NOT NULL,
269        event_seq INTEGER NOT NULL,
270        json TEXT NOT NULL,
271        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
272    )",
273    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
274        team_id TEXT NOT NULL,
275        workspace_hash TEXT NOT NULL,
276        span_id_hash TEXT NOT NULL,
277        json TEXT NOT NULL,
278        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
279    )",
280    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
281        team_id TEXT NOT NULL,
282        workspace_hash TEXT NOT NULL,
283        snapshot_id_hash TEXT NOT NULL,
284        chunk_index INTEGER NOT NULL,
285        json TEXT NOT NULL,
286        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
287    )",
288    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
289        team_id TEXT NOT NULL,
290        workspace_hash TEXT NOT NULL,
291        fact_key TEXT NOT NULL,
292        json TEXT NOT NULL,
293        PRIMARY KEY (team_id, workspace_hash, fact_key)
294    )",
295    "CREATE TABLE IF NOT EXISTS session_evals (
296        id            TEXT    PRIMARY KEY,
297        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
298        judge_model   TEXT    NOT NULL,
299        rubric_id     TEXT    NOT NULL,
300        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
301        rationale     TEXT    NOT NULL,
302        flagged       INTEGER NOT NULL DEFAULT 0,
303        created_at_ms INTEGER NOT NULL
304    );
305    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
306    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
307    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
308        fingerprint   TEXT    PRIMARY KEY,
309        captured_at_ms INTEGER NOT NULL,
310        files_json    TEXT    NOT NULL,
311        total_bytes   INTEGER NOT NULL
312    )",
313    "CREATE TABLE IF NOT EXISTS session_feedback (
314        id TEXT PRIMARY KEY,
315        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
316        score INTEGER CHECK(score BETWEEN 1 AND 5),
317        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
318        note TEXT,
319        created_at_ms INTEGER NOT NULL
320    );
321    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
322    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
323    "CREATE TABLE IF NOT EXISTS session_outcomes (
324        session_id TEXT PRIMARY KEY NOT NULL,
325        test_passed INTEGER,
326        test_failed INTEGER,
327        test_skipped INTEGER,
328        build_ok INTEGER,
329        lint_errors INTEGER,
330        revert_lines_14d INTEGER,
331        pr_open INTEGER,
332        ci_ok INTEGER,
333        measured_at_ms INTEGER NOT NULL,
334        measure_error TEXT
335    )",
336    "CREATE TABLE IF NOT EXISTS session_samples (
337        session_id TEXT NOT NULL,
338        ts_ms INTEGER NOT NULL,
339        pid INTEGER NOT NULL,
340        cpu_percent REAL,
341        rss_bytes INTEGER,
342        PRIMARY KEY (session_id, ts_ms, pid)
343    )",
344    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
345    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
346    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
347    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
348    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
349    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
350    "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
351    "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
352    "CREATE INDEX IF NOT EXISTS events_tool_ts_session_seq_idx ON events(tool, ts_ms DESC, session_id, seq)",
353    "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
354    "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
355    "CREATE INDEX IF NOT EXISTS tool_span_paths_path_idx ON tool_span_paths(path, span_id)",
356    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
357];
358
359/// Per-workspace activity dashboard stats.
360#[derive(Clone)]
361pub struct InsightsStats {
362    pub total_sessions: u64,
363    pub running_sessions: u64,
364    pub total_events: u64,
365    /// (day label e.g. "Mon", count) last 7 days oldest first
366    pub sessions_by_day: Vec<(String, u64)>,
367    /// Recent sessions DESC by started_at, max 3; paired with event count
368    pub recent: Vec<(SessionRecord, u64)>,
369    /// Top tools by event count, max 5
370    pub top_tools: Vec<(String, u64)>,
371    pub total_cost_usd_e6: i64,
372    pub sessions_with_cost: u64,
373}
374
375/// Sync daemon / outbox status for `kaizen sync status`.
376pub struct SyncStatusSnapshot {
377    pub pending_outbox: u64,
378    pub last_success_ms: Option<u64>,
379    pub last_error: Option<String>,
380    pub consecutive_failures: u32,
381}
382
383/// Aggregate stats across sessions + events for a workspace.
384#[derive(serde::Serialize)]
385pub struct SummaryStats {
386    pub session_count: u64,
387    pub total_cost_usd_e6: i64,
388    pub by_agent: Vec<(String, u64)>,
389    pub by_model: Vec<(String, u64)>,
390    pub top_tools: Vec<(String, u64)>,
391}
392
393/// Skill vs Cursor rule for [`GuidancePerfRow`].
394#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
395#[serde(rename_all = "lowercase")]
396pub enum GuidanceKind {
397    Skill,
398    Rule,
399}
400
401/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
402#[derive(Clone, Debug, serde::Serialize)]
403pub struct GuidancePerfRow {
404    pub kind: GuidanceKind,
405    pub id: String,
406    pub sessions: u64,
407    pub sessions_pct: f64,
408    pub total_cost_usd_e6: i64,
409    pub avg_cost_per_session_usd: Option<f64>,
410    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
411    pub on_disk: bool,
412}
413
414/// Aggregated skill/rule adoption and cost proxy for a time window.
415#[derive(Clone, Debug, serde::Serialize)]
416pub struct GuidanceReport {
417    pub workspace: String,
418    pub window_start_ms: u64,
419    pub window_end_ms: u64,
420    pub sessions_in_window: u64,
421    pub workspace_avg_cost_per_session_usd: Option<f64>,
422    pub rows: Vec<GuidancePerfRow>,
423}
424
425/// Result of [`Store::prune_sessions_started_before`].
426#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
427pub struct PruneStats {
428    pub sessions_removed: u64,
429    pub events_removed: u64,
430}
431
432/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
433#[derive(Debug, Clone, Eq, PartialEq)]
434pub struct SessionOutcomeRow {
435    pub session_id: String,
436    pub test_passed: Option<i64>,
437    pub test_failed: Option<i64>,
438    pub test_skipped: Option<i64>,
439    pub build_ok: Option<bool>,
440    pub lint_errors: Option<i64>,
441    pub revert_lines_14d: Option<i64>,
442    pub pr_open: Option<i64>,
443    pub ci_ok: Option<bool>,
444    pub measured_at_ms: u64,
445    pub measure_error: Option<String>,
446}
447
448/// Aggregated process samples for retro (Tier D).
449#[derive(Debug, Clone)]
450pub struct SessionSampleAgg {
451    pub session_id: String,
452    pub sample_count: u64,
453    pub max_cpu_percent: f64,
454    pub max_rss_bytes: u64,
455}
456
457/// `sync_state` keys for agent rescan throttling and auto-prune.
458pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
459pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
460pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
461
462pub struct ToolSpanSyncRow {
463    pub span_id: String,
464    pub session_id: String,
465    pub tool: Option<String>,
466    pub tool_call_id: Option<String>,
467    pub status: String,
468    pub started_at_ms: Option<u64>,
469    pub ended_at_ms: Option<u64>,
470    pub lead_time_ms: Option<u64>,
471    pub tokens_in: Option<u32>,
472    pub tokens_out: Option<u32>,
473    pub reasoning_tokens: Option<u32>,
474    pub cost_usd_e6: Option<i64>,
475    pub paths: Vec<String>,
476}
477
478pub(crate) struct CaptureQualityRow {
479    pub source: String,
480    pub has_tokens: bool,
481    pub has_latency: bool,
482    pub has_context: bool,
483}
484
485pub(crate) struct TraceSpanQualityRow {
486    pub kind: String,
487    pub is_orphan: bool,
488}
489
490#[derive(Debug, Clone, Copy, Eq, PartialEq)]
491pub enum StoreOpenMode {
492    ReadWrite,
493    ReadOnlyQuery,
494}
495
496#[derive(Debug, Clone)]
497pub struct SessionStatusRow {
498    pub id: String,
499    pub status: SessionStatus,
500    pub ended_at_ms: Option<u64>,
501}
502
503#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
504pub struct SessionFilter {
505    pub agent_prefix: Option<String>,
506    pub status: Option<SessionStatus>,
507    pub since_ms: Option<u64>,
508}
509
510#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
511pub struct SessionPage {
512    pub rows: Vec<SessionRecord>,
513    pub total: usize,
514    pub next_offset: Option<usize>,
515}
516
517#[derive(Clone)]
518struct SpanTreeCacheEntry {
519    session_id: String,
520    last_event_seq: Option<u64>,
521    nodes: Vec<crate::store::span_tree::SpanNode>,
522}
523
524pub struct Store {
525    conn: Connection,
526    root: PathBuf,
527    hot_log: RefCell<Option<HotLog>>,
528    search_writer: RefCell<Option<crate::search::PendingWriter>>,
529    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
530    projector: RefCell<Projector>,
531}
532
533impl Store {
534    pub(crate) fn conn(&self) -> &Connection {
535        &self.conn
536    }
537
538    pub fn open(path: &Path) -> Result<Self> {
539        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
540    }
541
542    pub fn open_read_only(path: &Path) -> Result<Self> {
543        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
544    }
545
546    pub fn open_query(path: &Path) -> Result<Self> {
547        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
548    }
549
550    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
551        if let Some(parent) = path.parent() {
552            std::fs::create_dir_all(parent)?;
553        }
554        let conn = match mode {
555            StoreOpenMode::ReadWrite => Connection::open(path),
556            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
557                path,
558                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
559            ),
560        }
561        .with_context(|| format!("open db: {}", path.display()))?;
562        apply_pragmas(&conn, mode)?;
563        if mode == StoreOpenMode::ReadWrite {
564            for sql in MIGRATIONS {
565                conn.execute_batch(sql)?;
566            }
567            ensure_schema_columns(&conn)?;
568        }
569        let store = Self {
570            conn,
571            root: path
572                .parent()
573                .unwrap_or_else(|| Path::new("."))
574                .to_path_buf(),
575            hot_log: RefCell::new(None),
576            search_writer: RefCell::new(None),
577            span_tree_cache: RefCell::new(None),
578            projector: RefCell::new(Projector::default()),
579        };
580        if mode == StoreOpenMode::ReadWrite {
581            store.warm_projector()?;
582        }
583        Ok(store)
584    }
585
586    fn invalidate_span_tree_cache(&self) {
587        *self.span_tree_cache.borrow_mut() = None;
588    }
589
590    fn warm_projector(&self) -> Result<()> {
591        let ids = self.running_session_ids()?;
592        let mut projector = self.projector.borrow_mut();
593        for id in ids {
594            for event in self.list_events_for_session(&id)? {
595                let _ = projector.apply(&event);
596            }
597        }
598        Ok(())
599    }
600
601    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
602        self.conn.execute(
603            "INSERT INTO sessions (
604                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
605                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
606                prompt_fingerprint, parent_session_id, agent_version, os, arch,
607                repo_file_count, repo_total_loc
608             )
609             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
610                ?16, ?17, ?18, ?19, ?20, ?21)
611             ON CONFLICT(id) DO UPDATE SET
612               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
613               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
614               status=excluded.status, trace_path=excluded.trace_path,
615               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
616               branch=excluded.branch, dirty_start=excluded.dirty_start,
617               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
618               prompt_fingerprint=excluded.prompt_fingerprint,
619               parent_session_id=excluded.parent_session_id,
620               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
621               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
622            params![
623                s.id,
624                s.agent,
625                s.model,
626                s.workspace,
627                s.started_at_ms as i64,
628                s.ended_at_ms.map(|v| v as i64),
629                format!("{:?}", s.status),
630                s.trace_path,
631                s.start_commit,
632                s.end_commit,
633                s.branch,
634                s.dirty_start.map(bool_to_i64),
635                s.dirty_end.map(bool_to_i64),
636                s.repo_binding_source.clone().unwrap_or_default(),
637                s.prompt_fingerprint.as_deref(),
638                s.parent_session_id.as_deref(),
639                s.agent_version.as_deref(),
640                s.os.as_deref(),
641                s.arch.as_deref(),
642                s.repo_file_count.map(|v| v as i64),
643                s.repo_total_loc.map(|v| v as i64),
644            ],
645        )?;
646        self.conn.execute(
647            "INSERT INTO session_repo_binding (
648                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
649             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
650             ON CONFLICT(session_id) DO UPDATE SET
651                start_commit=excluded.start_commit,
652                end_commit=excluded.end_commit,
653                branch=excluded.branch,
654                dirty_start=excluded.dirty_start,
655                dirty_end=excluded.dirty_end,
656                repo_binding_source=excluded.repo_binding_source",
657            params![
658                s.id,
659                s.start_commit,
660                s.end_commit,
661                s.branch,
662                s.dirty_start.map(bool_to_i64),
663                s.dirty_end.map(bool_to_i64),
664                s.repo_binding_source.clone().unwrap_or_default(),
665            ],
666        )?;
667        Ok(())
668    }
669
670    /// Insert a minimal session row if none exists. Used by hook ingestion when
671    /// the first observed event is not `SessionStart` (hooks installed mid-session).
672    pub fn ensure_session_stub(
673        &self,
674        id: &str,
675        agent: &str,
676        workspace: &str,
677        started_at_ms: u64,
678    ) -> Result<()> {
679        self.conn.execute(
680            "INSERT OR IGNORE INTO sessions (
681                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
682                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
683                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
684             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
685                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
686            params![id, agent, workspace, started_at_ms as i64],
687        )?;
688        Ok(())
689    }
690
691    /// Next `seq` for a new event in this session (0 when there are no events yet).
692    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
693        let n: i64 = self.conn.query_row(
694            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
695            [session_id],
696            |r| r.get(0),
697        )?;
698        Ok(n as u64)
699    }
700
701    pub fn append_event(&self, e: &Event) -> Result<()> {
702        self.append_event_with_sync(e, None)
703    }
704
705    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
706    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
707        let last_before = if projector_legacy_mode() {
708            None
709        } else {
710            self.last_event_seq_for_session(&e.session_id)?
711        };
712        let payload = serde_json::to_string(&e.payload)?;
713        self.conn.execute(
714            "INSERT INTO events (
715                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
716                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
717                stop_reason, latency_ms, ttft_ms, retry_count,
718                context_used_tokens, context_max_tokens,
719                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
720             ) VALUES (
721                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
722                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
723             )
724             ON CONFLICT(session_id, seq) DO UPDATE SET
725                ts_ms = excluded.ts_ms,
726                ts_exact = excluded.ts_exact,
727                kind = excluded.kind,
728                source = excluded.source,
729                tool = excluded.tool,
730                tool_call_id = excluded.tool_call_id,
731                tokens_in = excluded.tokens_in,
732                tokens_out = excluded.tokens_out,
733                reasoning_tokens = excluded.reasoning_tokens,
734                cost_usd_e6 = excluded.cost_usd_e6,
735                payload = excluded.payload,
736                stop_reason = excluded.stop_reason,
737                latency_ms = excluded.latency_ms,
738                ttft_ms = excluded.ttft_ms,
739                retry_count = excluded.retry_count,
740                context_used_tokens = excluded.context_used_tokens,
741                context_max_tokens = excluded.context_max_tokens,
742                cache_creation_tokens = excluded.cache_creation_tokens,
743                cache_read_tokens = excluded.cache_read_tokens,
744                system_prompt_tokens = excluded.system_prompt_tokens",
745            params![
746                e.session_id,
747                e.seq as i64,
748                e.ts_ms as i64,
749                bool_to_i64(e.ts_exact),
750                format!("{:?}", e.kind),
751                format!("{:?}", e.source),
752                e.tool,
753                e.tool_call_id,
754                e.tokens_in.map(|v| v as i64),
755                e.tokens_out.map(|v| v as i64),
756                e.reasoning_tokens.map(|v| v as i64),
757                e.cost_usd_e6,
758                payload,
759                e.stop_reason,
760                e.latency_ms.map(|v| v as i64),
761                e.ttft_ms.map(|v| v as i64),
762                e.retry_count.map(|v| v as i64),
763                e.context_used_tokens.map(|v| v as i64),
764                e.context_max_tokens.map(|v| v as i64),
765                e.cache_creation_tokens.map(|v| v as i64),
766                e.cache_read_tokens.map(|v| v as i64),
767                e.system_prompt_tokens.map(|v| v as i64),
768            ],
769        )?;
770        if self.conn.changes() == 0 {
771            return Ok(());
772        }
773        self.append_hot_event(e)?;
774        if projector_legacy_mode() {
775            index_event_derived(&self.conn, e)?;
776            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
777            self.invalidate_span_tree_cache();
778        } else if last_before.is_some_and(|last| e.seq <= last) {
779            self.replay_projector_session(&e.session_id)?;
780        } else {
781            let deltas = self.projector.borrow_mut().apply(e);
782            self.apply_projector_events(&deltas)?;
783            let expired = self
784                .projector
785                .borrow_mut()
786                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
787            self.apply_projector_events(&expired)?;
788            if is_stop_event(e) {
789                let flushed = self
790                    .projector
791                    .borrow_mut()
792                    .flush_session(&e.session_id, e.ts_ms);
793                self.apply_projector_events(&flushed)?;
794            }
795            self.invalidate_span_tree_cache();
796        }
797        self.append_search_event(e);
798        let Some(ctx) = ctx else {
799            return Ok(());
800        };
801        let sync = &ctx.sync;
802        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
803            return Ok(());
804        }
805        let Some(salt) = try_team_salt(sync) else {
806            tracing::warn!(
807                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
808            );
809            return Ok(());
810        };
811        if sync.sample_rate < 1.0 {
812            let u: f64 = rand::random();
813            if u > sync.sample_rate {
814                return Ok(());
815            }
816        }
817        let Some(session) = self.get_session(&e.session_id)? else {
818            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
819            return Ok(());
820        };
821        let mut outbound = outbound_event_from_row(e, &session, &salt);
822        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
823        let row = serde_json::to_string(&outbound)?;
824        self.outbox()?.append(&e.session_id, "events", &row)?;
825        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
826        Ok(())
827    }
828
829    fn append_hot_event(&self, e: &Event) -> Result<()> {
830        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
831            return Ok(());
832        }
833        let mut slot = self.hot_log.borrow_mut();
834        if slot.is_none() {
835            *slot = Some(HotLog::open(&self.root)?);
836        }
837        if let Some(log) = slot.as_mut() {
838            log.append(e)?;
839        }
840        Ok(())
841    }
842
843    fn append_search_event(&self, e: &Event) {
844        if let Err(err) = self.try_append_search_event(e) {
845            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
846            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
847        }
848    }
849
850    fn try_append_search_event(&self, e: &Event) -> Result<()> {
851        let Some(session) = self.get_session(&e.session_id)? else {
852            return Ok(());
853        };
854        let workspace = PathBuf::from(&session.workspace);
855        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
856        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
857        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
858            return Ok(());
859        };
860        let mut slot = self.search_writer.borrow_mut();
861        if slot.is_none() {
862            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
863        }
864        slot.as_mut().expect("writer").add(&doc)
865    }
866
867    pub fn flush_search(&self) -> Result<()> {
868        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
869            writer.commit()?;
870        }
871        Ok(())
872    }
873
874    fn outbox(&self) -> Result<Outbox> {
875        Outbox::open(&self.root)
876    }
877
878    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
879        if projector_legacy_mode() {
880            rebuild_tool_spans_for_session(&self.conn, session_id)?;
881            self.invalidate_span_tree_cache();
882            return Ok(());
883        }
884        let deltas = self
885            .projector
886            .borrow_mut()
887            .flush_session(session_id, now_ms);
888        if self.apply_projector_events(&deltas)? {
889            self.invalidate_span_tree_cache();
890        }
891        Ok(())
892    }
893
894    fn replay_projector_session(&self, session_id: &str) -> Result<()> {
895        clear_session_spans(&self.conn, session_id)?;
896        self.projector.borrow_mut().reset_session(session_id);
897        let events = self.list_events_for_session(session_id)?;
898        let mut changed = false;
899        for event in &events {
900            let deltas = self.projector.borrow_mut().apply(event);
901            changed |= self.apply_projector_events(&deltas)?;
902        }
903        if self
904            .get_session(session_id)?
905            .is_some_and(|session| session.status == SessionStatus::Done)
906        {
907            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
908            let deltas = self
909                .projector
910                .borrow_mut()
911                .flush_session(session_id, now_ms);
912            changed |= self.apply_projector_events(&deltas)?;
913        }
914        if changed {
915            self.invalidate_span_tree_cache();
916        }
917        Ok(())
918    }
919
920    fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
921        let mut changed = false;
922        for delta in deltas {
923            match delta {
924                ProjectorEvent::SpanClosed(span, sample) => {
925                    upsert_tool_span_record(&self.conn, span)?;
926                    tracing::debug!(
927                        session_id = %sample.session_id,
928                        span_id = %sample.span_id,
929                        tool = ?sample.tool,
930                        lead_time_ms = ?sample.lead_time_ms,
931                        tokens_in = ?sample.tokens_in,
932                        tokens_out = ?sample.tokens_out,
933                        reasoning_tokens = ?sample.reasoning_tokens,
934                        cost_usd_e6 = ?sample.cost_usd_e6,
935                        paths = ?sample.paths,
936                        "tool span closed"
937                    );
938                    changed = true;
939                }
940                ProjectorEvent::SpanPatched(span) => {
941                    upsert_tool_span_record(&self.conn, span)?;
942                    changed = true;
943                }
944                ProjectorEvent::FileTouched { session, path } => {
945                    self.conn.execute(
946                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
947                        params![session, path],
948                    )?;
949                    changed = true;
950                }
951                ProjectorEvent::SkillUsed { session, skill } => {
952                    self.conn.execute(
953                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
954                        params![session, skill],
955                    )?;
956                    changed = true;
957                }
958                ProjectorEvent::RuleUsed { session, rule } => {
959                    self.conn.execute(
960                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
961                        params![session, rule],
962                    )?;
963                    changed = true;
964                }
965            }
966        }
967        Ok(changed)
968    }
969
970    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
971        let rows = self.outbox()?.list_pending(limit)?;
972        if !rows.is_empty() {
973            return Ok(rows);
974        }
975        let mut stmt = self.conn.prepare(
976            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
977        )?;
978        let rows = stmt.query_map(params![limit as i64], |row| {
979            Ok((
980                row.get::<_, i64>(0)?,
981                row.get::<_, String>(1)?,
982                row.get::<_, String>(2)?,
983            ))
984        })?;
985        let mut out = Vec::new();
986        for r in rows {
987            out.push(r?);
988        }
989        Ok(out)
990    }
991
992    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
993        self.outbox()?.delete_ids(ids)?;
994        for id in ids {
995            self.conn
996                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
997        }
998        Ok(())
999    }
1000
1001    pub fn replace_outbox_rows(
1002        &self,
1003        owner_id: &str,
1004        kind: &str,
1005        payloads: &[String],
1006    ) -> Result<()> {
1007        self.outbox()?.replace(owner_id, kind, payloads)?;
1008        self.conn.execute(
1009            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
1010            params![owner_id, kind],
1011        )?;
1012        for payload in payloads {
1013            self.conn.execute(
1014                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
1015                params![owner_id, kind, payload],
1016            )?;
1017        }
1018        Ok(())
1019    }
1020
1021    pub fn outbox_pending_count(&self) -> Result<u64> {
1022        let redb = self.outbox()?.pending_count()?;
1023        if redb > 0 {
1024            return Ok(redb);
1025        }
1026        let c: i64 =
1027            self.conn
1028                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
1029                    r.get(0)
1030                })?;
1031        Ok(c as u64)
1032    }
1033
1034    pub fn set_sync_state_ok(&self) -> Result<()> {
1035        let now = now_ms().to_string();
1036        self.conn.execute(
1037            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
1038             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1039            params![now],
1040        )?;
1041        self.conn.execute(
1042            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
1043             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1044            [],
1045        )?;
1046        self.conn
1047            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
1048        Ok(())
1049    }
1050
1051    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
1052        let prev: i64 = self
1053            .conn
1054            .query_row(
1055                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1056                [],
1057                |r| {
1058                    let s: String = r.get(0)?;
1059                    Ok(s.parse::<i64>().unwrap_or(0))
1060                },
1061            )
1062            .optional()?
1063            .unwrap_or(0);
1064        let next = prev.saturating_add(1);
1065        self.conn.execute(
1066            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
1067             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1068            params![msg],
1069        )?;
1070        self.conn.execute(
1071            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
1072             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1073            params![next.to_string()],
1074        )?;
1075        Ok(())
1076    }
1077
1078    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
1079        let pending_outbox = self.outbox_pending_count()?;
1080        let last_success_ms = self
1081            .conn
1082            .query_row(
1083                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
1084                [],
1085                |r| r.get::<_, String>(0),
1086            )
1087            .optional()?
1088            .and_then(|s| s.parse().ok());
1089        let last_error = self
1090            .conn
1091            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1092                r.get::<_, String>(0)
1093            })
1094            .optional()?;
1095        let consecutive_failures = self
1096            .conn
1097            .query_row(
1098                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1099                [],
1100                |r| r.get::<_, String>(0),
1101            )
1102            .optional()?
1103            .and_then(|s| s.parse().ok())
1104            .unwrap_or(0);
1105        Ok(SyncStatusSnapshot {
1106            pending_outbox,
1107            last_success_ms,
1108            last_error,
1109            consecutive_failures,
1110        })
1111    }
1112
1113    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1114        let row: Option<String> = self
1115            .conn
1116            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1117                r.get::<_, String>(0)
1118            })
1119            .optional()?;
1120        Ok(row.and_then(|s| s.parse().ok()))
1121    }
1122
1123    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1124        self.conn.execute(
1125            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1126             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1127            params![key, v.to_string()],
1128        )?;
1129        Ok(())
1130    }
1131
1132    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
1133    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1134        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1135        let old_ids = old_session_ids(&tx, cutoff_ms)?;
1136        let sessions_to_remove: i64 = tx.query_row(
1137            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1138            params![cutoff_ms],
1139            |r| r.get(0),
1140        )?;
1141        let events_to_remove: i64 = tx.query_row(
1142            "SELECT COUNT(*) FROM events WHERE session_id IN \
1143             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1144            params![cutoff_ms],
1145            |r| r.get(0),
1146        )?;
1147
1148        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1149        tx.execute(
1150            &format!(
1151                "DELETE FROM tool_span_paths WHERE span_id IN \
1152                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1153            ),
1154            params![cutoff_ms],
1155        )?;
1156        tx.execute(
1157            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1158            params![cutoff_ms],
1159        )?;
1160        tx.execute(
1161            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1162            params![cutoff_ms],
1163        )?;
1164        tx.execute(
1165            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1166            params![cutoff_ms],
1167        )?;
1168        tx.execute(
1169            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1170            params![cutoff_ms],
1171        )?;
1172        tx.execute(
1173            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1174            params![cutoff_ms],
1175        )?;
1176        tx.execute(
1177            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1178            params![cutoff_ms],
1179        )?;
1180        tx.execute(
1181            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1182            params![cutoff_ms],
1183        )?;
1184        tx.execute(
1185            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1186            params![cutoff_ms],
1187        )?;
1188        tx.execute(
1189            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1190            params![cutoff_ms],
1191        )?;
1192        tx.execute(
1193            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1194            params![cutoff_ms],
1195        )?;
1196        tx.execute(
1197            "DELETE FROM sessions WHERE started_at_ms < ?1",
1198            params![cutoff_ms],
1199        )?;
1200        tx.commit()?;
1201        if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1202            let _ = writer.commit();
1203        }
1204        if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1205            tracing::warn!("search prune skipped: {err:#}");
1206            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1207        }
1208        self.invalidate_span_tree_cache();
1209        Ok(PruneStats {
1210            sessions_removed: sessions_to_remove as u64,
1211            events_removed: events_to_remove as u64,
1212        })
1213    }
1214
1215    /// Reclaim file space after large deletes (exclusive lock; can be slow).
1216    pub fn vacuum(&self) -> Result<()> {
1217        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1218        Ok(())
1219    }
1220
1221    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1222        Ok(self
1223            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1224            .rows)
1225    }
1226
1227    pub fn list_sessions_page(
1228        &self,
1229        workspace: &str,
1230        offset: usize,
1231        limit: usize,
1232        filter: SessionFilter,
1233    ) -> Result<SessionPage> {
1234        let (where_sql, args) = session_filter_sql(workspace, &filter);
1235        let total = self.query_session_page_count(&where_sql, &args)?;
1236        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1237        let next = offset.saturating_add(rows.len());
1238        Ok(SessionPage {
1239            rows,
1240            total,
1241            next_offset: (next < total).then_some(next),
1242        })
1243    }
1244
1245    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1246        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1247        let total: i64 = self
1248            .conn
1249            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1250        Ok(total as usize)
1251    }
1252
1253    fn query_session_page_rows(
1254        &self,
1255        where_sql: &str,
1256        args: &[Value],
1257        offset: usize,
1258        limit: usize,
1259    ) -> Result<Vec<SessionRecord>> {
1260        let sql = format!(
1261            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1262        );
1263        let mut values = args.to_vec();
1264        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1265        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1266        let mut stmt = self.conn.prepare(&sql)?;
1267        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1268        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1269    }
1270
1271    pub fn list_sessions_started_after(
1272        &self,
1273        workspace: &str,
1274        after_started_at_ms: u64,
1275    ) -> Result<Vec<SessionRecord>> {
1276        let mut stmt = self.conn.prepare(
1277            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1278                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1279                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1280                    repo_file_count, repo_total_loc
1281             FROM sessions
1282             WHERE workspace = ?1 AND started_at_ms > ?2
1283             ORDER BY started_at_ms DESC, id ASC",
1284        )?;
1285        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1286        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1287    }
1288
1289    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1290        if ids.is_empty() {
1291            return Ok(Vec::new());
1292        }
1293        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1294        let sql =
1295            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1296        let mut stmt = self.conn.prepare(&sql)?;
1297        let params: Vec<&dyn rusqlite::ToSql> =
1298            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1299        let rows = stmt.query_map(params.as_slice(), |r| {
1300            let status: String = r.get(1)?;
1301            Ok(SessionStatusRow {
1302                id: r.get(0)?,
1303                status: status_from_str(&status),
1304                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1305            })
1306        })?;
1307        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1308    }
1309
1310    fn running_session_ids(&self) -> Result<Vec<String>> {
1311        let mut stmt = self
1312            .conn
1313            .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1314        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1315        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1316    }
1317
1318    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1319        let session_count: i64 = self.conn.query_row(
1320            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1321            params![workspace],
1322            |r| r.get(0),
1323        )?;
1324
1325        let total_cost: i64 = self.conn.query_row(
1326            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1327             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1328            params![workspace],
1329            |r| r.get(0),
1330        )?;
1331
1332        let mut stmt = self.conn.prepare(
1333            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1334        )?;
1335        let by_agent: Vec<(String, u64)> = stmt
1336            .query_map(params![workspace], |r| {
1337                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1338            })?
1339            .filter_map(|r| r.ok())
1340            .collect();
1341
1342        let mut stmt = self.conn.prepare(
1343            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1344        )?;
1345        let by_model: Vec<(String, u64)> = stmt
1346            .query_map(params![workspace], |r| {
1347                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1348            })?
1349            .filter_map(|r| r.ok())
1350            .collect();
1351
1352        let mut stmt = self.conn.prepare(
1353            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1354             WHERE s.workspace = ?1 AND tool IS NOT NULL
1355             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1356        )?;
1357        let top_tools: Vec<(String, u64)> = stmt
1358            .query_map(params![workspace], |r| {
1359                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1360            })?
1361            .filter_map(|r| r.ok())
1362            .collect();
1363
1364        Ok(SummaryStats {
1365            session_count: session_count as u64,
1366            total_cost_usd_e6: total_cost,
1367            by_agent,
1368            by_model,
1369            top_tools,
1370        })
1371    }
1372
1373    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1374        self.list_events_page(session_id, 0, i64::MAX as usize)
1375    }
1376
1377    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1378        let mut stmt = self.conn.prepare(
1379            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1380                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1381                    stop_reason, latency_ms, ttft_ms, retry_count,
1382                    context_used_tokens, context_max_tokens,
1383                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1384             FROM events WHERE session_id = ?1 AND seq = ?2",
1385        )?;
1386        stmt.query_row(params![session_id, seq as i64], event_row)
1387            .optional()
1388            .map_err(Into::into)
1389    }
1390
1391    pub fn search_tool_events(
1392        &self,
1393        workspace: &str,
1394        tool: &str,
1395        since_ms: Option<u64>,
1396        agent: Option<&str>,
1397        limit: usize,
1398    ) -> Result<Vec<(String, Event)>> {
1399        let mut stmt = self.conn.prepare(
1400            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1401                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1402                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1403                    e.context_used_tokens, e.context_max_tokens,
1404                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
1405                    s.agent
1406             FROM events e JOIN sessions s ON s.id = e.session_id
1407             WHERE e.tool = ?2
1408               AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
1409               AND (?3 IS NULL OR e.ts_ms >= ?3)
1410               AND (?4 IS NULL OR s.agent = ?4)
1411             ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
1412             LIMIT ?5",
1413        )?;
1414        let since = since_ms.map(|v| v as i64);
1415        let rows = stmt.query_map(
1416            params![workspace, tool, since, agent, limit as i64],
1417            search_tool_event_row,
1418        )?;
1419        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1420    }
1421
1422    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1423        let mut out = Vec::new();
1424        for session in self.list_sessions(workspace)? {
1425            for event in self.list_events_for_session(&session.id)? {
1426                out.push((session.clone(), event));
1427            }
1428        }
1429        out.sort_by(|a, b| {
1430            (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1431        });
1432        Ok(out)
1433    }
1434
1435    pub fn list_events_page(
1436        &self,
1437        session_id: &str,
1438        after_seq: u64,
1439        limit: usize,
1440    ) -> Result<Vec<Event>> {
1441        let mut stmt = self.conn.prepare(
1442            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1443                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1444                    stop_reason, latency_ms, ttft_ms, retry_count,
1445                    context_used_tokens, context_max_tokens,
1446                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1447             FROM events
1448             WHERE session_id = ?1 AND seq >= ?2
1449             ORDER BY seq ASC LIMIT ?3",
1450        )?;
1451        let rows = stmt.query_map(
1452            params![
1453                session_id,
1454                after_seq as i64,
1455                limit.min(i64::MAX as usize) as i64
1456            ],
1457            event_row,
1458        )?;
1459        let mut events = Vec::new();
1460        for row in rows {
1461            events.push(row?);
1462        }
1463        Ok(events)
1464    }
1465
1466    /// Update only status for existing session.
1467    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1468        self.conn.execute(
1469            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1470            params![format!("{:?}", status), id],
1471        )?;
1472        Ok(())
1473    }
1474
1475    /// Workspace activity dashboard — feeds `cmd_insights`.
1476    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1477        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1478        Ok(InsightsStats {
1479            total_sessions: count_q(
1480                &self.conn,
1481                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1482                workspace,
1483            )?,
1484            running_sessions: count_q(
1485                &self.conn,
1486                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1487                workspace,
1488            )?,
1489            total_events: count_q(
1490                &self.conn,
1491                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1492                workspace,
1493            )?,
1494            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1495            recent: recent_sessions_3(&self.conn, workspace)?,
1496            top_tools: top_tools_5(&self.conn, workspace)?,
1497            total_cost_usd_e6,
1498            sessions_with_cost,
1499        })
1500    }
1501
1502    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1503    pub fn retro_events_in_window(
1504        &self,
1505        workspace: &str,
1506        start_ms: u64,
1507        end_ms: u64,
1508    ) -> Result<Vec<(SessionRecord, Event)>> {
1509        let mut stmt = self.conn.prepare(
1510            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1511                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1512                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1513                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1514                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1515                    s.repo_file_count, s.repo_total_loc,
1516                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1517                    e.context_used_tokens, e.context_max_tokens,
1518                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1519             FROM events e
1520             JOIN sessions s ON s.id = e.session_id
1521             WHERE s.workspace = ?1
1522               AND (
1523                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1524                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1525               )
1526             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1527        )?;
1528        let rows = stmt.query_map(
1529            params![
1530                workspace,
1531                start_ms as i64,
1532                end_ms as i64,
1533                SYNTHETIC_TS_CEILING_MS,
1534            ],
1535            |row| {
1536                let payload_str: String = row.get(12)?;
1537                let status_str: String = row.get(19)?;
1538                Ok((
1539                    SessionRecord {
1540                        id: row.get(13)?,
1541                        agent: row.get(14)?,
1542                        model: row.get(15)?,
1543                        workspace: row.get(16)?,
1544                        started_at_ms: row.get::<_, i64>(17)? as u64,
1545                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1546                        status: status_from_str(&status_str),
1547                        trace_path: row.get(20)?,
1548                        start_commit: row.get(21)?,
1549                        end_commit: row.get(22)?,
1550                        branch: row.get(23)?,
1551                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1552                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1553                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1554                        prompt_fingerprint: row.get(27)?,
1555                        parent_session_id: row.get(28)?,
1556                        agent_version: row.get(29)?,
1557                        os: row.get(30)?,
1558                        arch: row.get(31)?,
1559                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1560                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1561                    },
1562                    Event {
1563                        session_id: row.get(0)?,
1564                        seq: row.get::<_, i64>(1)? as u64,
1565                        ts_ms: row.get::<_, i64>(2)? as u64,
1566                        ts_exact: row.get::<_, i64>(3)? != 0,
1567                        kind: kind_from_str(&row.get::<_, String>(4)?),
1568                        source: source_from_str(&row.get::<_, String>(5)?),
1569                        tool: row.get(6)?,
1570                        tool_call_id: row.get(7)?,
1571                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1572                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1573                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1574                        cost_usd_e6: row.get(11)?,
1575                        payload: serde_json::from_str(&payload_str)
1576                            .unwrap_or(serde_json::Value::Null),
1577                        stop_reason: row.get(34)?,
1578                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1579                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1580                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1581                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1582                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1583                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1584                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1585                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1586                    },
1587                ))
1588            },
1589        )?;
1590
1591        let mut out = Vec::new();
1592        for r in rows {
1593            out.push(r?);
1594        }
1595        Ok(out)
1596    }
1597
1598    pub fn experiment_metric_values_in_window(
1599        &self,
1600        workspace: &str,
1601        start_ms: u64,
1602        end_ms: u64,
1603        metric: crate::experiment::types::Metric,
1604    ) -> Result<Vec<(SessionRecord, f64)>> {
1605        use crate::experiment::types::Metric;
1606        let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1607            s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1608            s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1609            s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1610        let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1611            OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1612        let sql = match metric {
1613            Metric::TokensPerSession => format!(
1614                "SELECT {session_cols},
1615                    SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1616                 FROM sessions s JOIN events e ON e.session_id = s.id
1617                 WHERE {window}
1618                 GROUP BY s.id"
1619            ),
1620            Metric::CostPerSession => format!(
1621                "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1622                 FROM sessions s JOIN events e ON e.session_id = s.id
1623                 WHERE {window}
1624                 GROUP BY s.id"
1625            ),
1626            Metric::SuccessRate => format!(
1627                "SELECT {session_cols},
1628                    CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1629                 FROM sessions s JOIN events e ON e.session_id = s.id
1630                 WHERE {window}
1631                 GROUP BY s.id"
1632            ),
1633            Metric::DurationMinutes => format!(
1634                "SELECT {session_cols},
1635                    (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1636                 FROM sessions s
1637                 WHERE s.workspace = ?1
1638                   AND s.ended_at_ms IS NOT NULL
1639                   AND EXISTS (
1640                     SELECT 1 FROM events e
1641                     WHERE e.session_id = s.id
1642                       AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1643                         OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1644                   )"
1645            ),
1646            Metric::FilesPerSession => format!(
1647                "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1648                 FROM sessions s
1649                 JOIN events e ON e.session_id = s.id
1650                 LEFT JOIN files_touched ft ON ft.session_id = s.id
1651                 WHERE {window}
1652                 GROUP BY s.id"
1653            ),
1654            Metric::SuccessRateByPrompt => format!(
1655                "SELECT {session_cols},
1656                    1.0 - (MIN(
1657                      SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1658                      SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1659                    ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1660                 FROM sessions s JOIN events e ON e.session_id = s.id
1661                 WHERE {window}
1662                 GROUP BY s.id
1663                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1664            ),
1665            Metric::CostByPrompt => format!(
1666                "SELECT {session_cols},
1667                    SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1668                    SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1669                 FROM sessions s JOIN events e ON e.session_id = s.id
1670                 WHERE {window}
1671                 GROUP BY s.id
1672                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1673            ),
1674            Metric::ToolLoops => format!(
1675                "WITH calls AS (
1676                   SELECT e.session_id, e.tool,
1677                     LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1678                   FROM events e JOIN sessions s ON s.id = e.session_id
1679                   WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1680                 )
1681                 SELECT {session_cols},
1682                    SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1683                 FROM sessions s JOIN calls ON calls.session_id = s.id
1684                 GROUP BY s.id"
1685            ),
1686        };
1687        let mut stmt = self.conn.prepare(&sql)?;
1688        let rows = stmt.query_map(
1689            params![
1690                workspace,
1691                start_ms as i64,
1692                end_ms as i64,
1693                SYNTHETIC_TS_CEILING_MS,
1694            ],
1695            |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1696        )?;
1697        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1698    }
1699
1700    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1701    pub fn files_touched_in_window(
1702        &self,
1703        workspace: &str,
1704        start_ms: u64,
1705        end_ms: u64,
1706    ) -> Result<Vec<(String, String)>> {
1707        let mut stmt = self.conn.prepare(
1708            "SELECT DISTINCT ft.session_id, ft.path
1709             FROM files_touched ft
1710             JOIN sessions s ON s.id = ft.session_id
1711             WHERE s.workspace = ?1
1712               AND EXISTS (
1713                 SELECT 1 FROM events e
1714                 JOIN sessions ss ON ss.id = e.session_id
1715                 WHERE e.session_id = ft.session_id
1716                   AND (
1717                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1718                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1719                   )
1720               )
1721             ORDER BY ft.session_id, ft.path",
1722        )?;
1723        let out: Vec<(String, String)> = stmt
1724            .query_map(
1725                params![
1726                    workspace,
1727                    start_ms as i64,
1728                    end_ms as i64,
1729                    SYNTHETIC_TS_CEILING_MS,
1730                ],
1731                |r| Ok((r.get(0)?, r.get(1)?)),
1732            )?
1733            .filter_map(|r| r.ok())
1734            .collect();
1735        Ok(out)
1736    }
1737
1738    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1739    /// (any session with an indexed skill row; join events optional — use row existence).
1740    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1741        let mut stmt = self.conn.prepare(
1742            "SELECT DISTINCT su.skill
1743             FROM skills_used su
1744             JOIN sessions s ON s.id = su.session_id
1745             WHERE s.workspace = ?1
1746               AND EXISTS (
1747                 SELECT 1 FROM events e
1748                 JOIN sessions ss ON ss.id = e.session_id
1749                 WHERE e.session_id = su.session_id
1750                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1751               )
1752             ORDER BY su.skill",
1753        )?;
1754        let out: Vec<String> = stmt
1755            .query_map(
1756                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1757                |r| r.get::<_, String>(0),
1758            )?
1759            .filter_map(|r| r.ok())
1760            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1761            .collect();
1762        Ok(out)
1763    }
1764
1765    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1766    pub fn skills_used_in_window(
1767        &self,
1768        workspace: &str,
1769        start_ms: u64,
1770        end_ms: u64,
1771    ) -> Result<Vec<(String, String)>> {
1772        let mut stmt = self.conn.prepare(
1773            "SELECT DISTINCT su.session_id, su.skill
1774             FROM skills_used su
1775             JOIN sessions s ON s.id = su.session_id
1776             WHERE s.workspace = ?1
1777               AND EXISTS (
1778                 SELECT 1 FROM events e
1779                 JOIN sessions ss ON ss.id = e.session_id
1780                 WHERE e.session_id = su.session_id
1781                   AND (
1782                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1783                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1784                   )
1785               )
1786             ORDER BY su.session_id, su.skill",
1787        )?;
1788        let out: Vec<(String, String)> = stmt
1789            .query_map(
1790                params![
1791                    workspace,
1792                    start_ms as i64,
1793                    end_ms as i64,
1794                    SYNTHETIC_TS_CEILING_MS,
1795                ],
1796                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1797            )?
1798            .filter_map(|r| r.ok())
1799            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1800            .collect();
1801        Ok(out)
1802    }
1803
1804    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1805    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1806        let mut stmt = self.conn.prepare(
1807            "SELECT DISTINCT ru.rule
1808             FROM rules_used ru
1809             JOIN sessions s ON s.id = ru.session_id
1810             WHERE s.workspace = ?1
1811               AND EXISTS (
1812                 SELECT 1 FROM events e
1813                 JOIN sessions ss ON ss.id = e.session_id
1814                 WHERE e.session_id = ru.session_id
1815                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1816               )
1817             ORDER BY ru.rule",
1818        )?;
1819        let out: Vec<String> = stmt
1820            .query_map(
1821                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1822                |r| r.get::<_, String>(0),
1823            )?
1824            .filter_map(|r| r.ok())
1825            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1826            .collect();
1827        Ok(out)
1828    }
1829
1830    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1831    pub fn rules_used_in_window(
1832        &self,
1833        workspace: &str,
1834        start_ms: u64,
1835        end_ms: u64,
1836    ) -> Result<Vec<(String, String)>> {
1837        let mut stmt = self.conn.prepare(
1838            "SELECT DISTINCT ru.session_id, ru.rule
1839             FROM rules_used ru
1840             JOIN sessions s ON s.id = ru.session_id
1841             WHERE s.workspace = ?1
1842               AND EXISTS (
1843                 SELECT 1 FROM events e
1844                 JOIN sessions ss ON ss.id = e.session_id
1845                 WHERE e.session_id = ru.session_id
1846                   AND (
1847                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1848                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1849                   )
1850               )
1851             ORDER BY ru.session_id, ru.rule",
1852        )?;
1853        let out: Vec<(String, String)> = stmt
1854            .query_map(
1855                params![
1856                    workspace,
1857                    start_ms as i64,
1858                    end_ms as i64,
1859                    SYNTHETIC_TS_CEILING_MS,
1860                ],
1861                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1862            )?
1863            .filter_map(|r| r.ok())
1864            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1865            .collect();
1866        Ok(out)
1867    }
1868
1869    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1870    pub fn sessions_active_in_window(
1871        &self,
1872        workspace: &str,
1873        start_ms: u64,
1874        end_ms: u64,
1875    ) -> Result<HashSet<String>> {
1876        let mut stmt = self.conn.prepare(
1877            "SELECT DISTINCT s.id
1878             FROM sessions s
1879             WHERE s.workspace = ?1
1880               AND EXISTS (
1881                 SELECT 1 FROM events e
1882                 WHERE e.session_id = s.id
1883                   AND (
1884                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1885                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1886                   )
1887               )",
1888        )?;
1889        let out: HashSet<String> = stmt
1890            .query_map(
1891                params![
1892                    workspace,
1893                    start_ms as i64,
1894                    end_ms as i64,
1895                    SYNTHETIC_TS_CEILING_MS,
1896                ],
1897                |r| r.get(0),
1898            )?
1899            .filter_map(|r| r.ok())
1900            .collect();
1901        Ok(out)
1902    }
1903
1904    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1905    pub fn session_costs_usd_e6_in_window(
1906        &self,
1907        workspace: &str,
1908        start_ms: u64,
1909        end_ms: u64,
1910    ) -> Result<HashMap<String, i64>> {
1911        let mut stmt = self.conn.prepare(
1912            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1913             FROM events e
1914             JOIN sessions s ON s.id = e.session_id
1915             WHERE s.workspace = ?1
1916               AND (
1917                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1918                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1919               )
1920             GROUP BY e.session_id",
1921        )?;
1922        let rows: Vec<(String, i64)> = stmt
1923            .query_map(
1924                params![
1925                    workspace,
1926                    start_ms as i64,
1927                    end_ms as i64,
1928                    SYNTHETIC_TS_CEILING_MS,
1929                ],
1930                |r| Ok((r.get(0)?, r.get(1)?)),
1931            )?
1932            .filter_map(|r| r.ok())
1933            .collect();
1934        Ok(rows.into_iter().collect())
1935    }
1936
1937    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1938    pub fn guidance_report(
1939        &self,
1940        workspace: &str,
1941        window_start_ms: u64,
1942        window_end_ms: u64,
1943        skill_slugs_on_disk: &HashSet<String>,
1944        rule_slugs_on_disk: &HashSet<String>,
1945    ) -> Result<GuidanceReport> {
1946        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1947        let denom = active.len() as u64;
1948        let costs =
1949            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1950
1951        let workspace_avg_cost_per_session_usd = if denom > 0 {
1952            let total_e6: i64 = active
1953                .iter()
1954                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1955                .sum();
1956            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1957        } else {
1958            None
1959        };
1960
1961        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1962        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1963            skill_sessions.entry(skill).or_default().insert(sid);
1964        }
1965        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1966        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1967            rule_sessions.entry(rule).or_default().insert(sid);
1968        }
1969
1970        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1971
1972        let mut push_row =
1973            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1974                let sessions = sids.len() as u64;
1975                let sessions_pct = if denom > 0 {
1976                    sessions as f64 * 100.0 / denom as f64
1977                } else {
1978                    0.0
1979                };
1980                let total_cost_usd_e6: i64 = sids
1981                    .iter()
1982                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1983                    .sum();
1984                let avg_cost_per_session_usd = if sessions > 0 {
1985                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1986                } else {
1987                    None
1988                };
1989                let vs_workspace_avg_cost_per_session_usd =
1990                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1991                        (Some(avg), Some(w)) => Some(avg - w),
1992                        _ => None,
1993                    };
1994                rows.push(GuidancePerfRow {
1995                    kind,
1996                    id,
1997                    sessions,
1998                    sessions_pct,
1999                    total_cost_usd_e6,
2000                    avg_cost_per_session_usd,
2001                    vs_workspace_avg_cost_per_session_usd,
2002                    on_disk,
2003                });
2004            };
2005
2006        let mut seen_skills: HashSet<String> = HashSet::new();
2007        for (id, sids) in &skill_sessions {
2008            seen_skills.insert(id.clone());
2009            push_row(
2010                GuidanceKind::Skill,
2011                id.clone(),
2012                sids,
2013                skill_slugs_on_disk.contains(id),
2014            );
2015        }
2016        for slug in skill_slugs_on_disk {
2017            if seen_skills.contains(slug) {
2018                continue;
2019            }
2020            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
2021        }
2022
2023        let mut seen_rules: HashSet<String> = HashSet::new();
2024        for (id, sids) in &rule_sessions {
2025            seen_rules.insert(id.clone());
2026            push_row(
2027                GuidanceKind::Rule,
2028                id.clone(),
2029                sids,
2030                rule_slugs_on_disk.contains(id),
2031            );
2032        }
2033        for slug in rule_slugs_on_disk {
2034            if seen_rules.contains(slug) {
2035                continue;
2036            }
2037            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
2038        }
2039
2040        rows.sort_by(|a, b| {
2041            b.sessions
2042                .cmp(&a.sessions)
2043                .then_with(|| a.kind.cmp(&b.kind))
2044                .then_with(|| a.id.cmp(&b.id))
2045        });
2046
2047        Ok(GuidanceReport {
2048            workspace: workspace.to_string(),
2049            window_start_ms,
2050            window_end_ms,
2051            sessions_in_window: denom,
2052            workspace_avg_cost_per_session_usd,
2053            rows,
2054        })
2055    }
2056
2057    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
2058        let mut stmt = self.conn.prepare(
2059            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
2060                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
2061                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
2062                    repo_file_count, repo_total_loc
2063             FROM sessions WHERE id = ?1",
2064        )?;
2065        let mut rows = stmt.query_map(params![id], |row| {
2066            Ok((
2067                row.get::<_, String>(0)?,
2068                row.get::<_, String>(1)?,
2069                row.get::<_, Option<String>>(2)?,
2070                row.get::<_, String>(3)?,
2071                row.get::<_, i64>(4)?,
2072                row.get::<_, Option<i64>>(5)?,
2073                row.get::<_, String>(6)?,
2074                row.get::<_, String>(7)?,
2075                row.get::<_, Option<String>>(8)?,
2076                row.get::<_, Option<String>>(9)?,
2077                row.get::<_, Option<String>>(10)?,
2078                row.get::<_, Option<i64>>(11)?,
2079                row.get::<_, Option<i64>>(12)?,
2080                row.get::<_, String>(13)?,
2081                row.get::<_, Option<String>>(14)?,
2082                row.get::<_, Option<String>>(15)?,
2083                row.get::<_, Option<String>>(16)?,
2084                row.get::<_, Option<String>>(17)?,
2085                row.get::<_, Option<String>>(18)?,
2086                row.get::<_, Option<i64>>(19)?,
2087                row.get::<_, Option<i64>>(20)?,
2088            ))
2089        })?;
2090
2091        if let Some(row) = rows.next() {
2092            let (
2093                id,
2094                agent,
2095                model,
2096                workspace,
2097                started,
2098                ended,
2099                status_str,
2100                trace,
2101                start_commit,
2102                end_commit,
2103                branch,
2104                dirty_start,
2105                dirty_end,
2106                source,
2107                prompt_fingerprint,
2108                parent_session_id,
2109                agent_version,
2110                os,
2111                arch,
2112                repo_file_count,
2113                repo_total_loc,
2114            ) = row?;
2115            Ok(Some(SessionRecord {
2116                id,
2117                agent,
2118                model,
2119                workspace,
2120                started_at_ms: started as u64,
2121                ended_at_ms: ended.map(|v| v as u64),
2122                status: status_from_str(&status_str),
2123                trace_path: trace,
2124                start_commit,
2125                end_commit,
2126                branch,
2127                dirty_start: dirty_start.map(i64_to_bool),
2128                dirty_end: dirty_end.map(i64_to_bool),
2129                repo_binding_source: empty_to_none(source),
2130                prompt_fingerprint,
2131                parent_session_id,
2132                agent_version,
2133                os,
2134                arch,
2135                repo_file_count: repo_file_count.map(|v| v as u32),
2136                repo_total_loc: repo_total_loc.map(|v| v as u64),
2137            }))
2138        } else {
2139            Ok(None)
2140        }
2141    }
2142
2143    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2144        let mut stmt = self.conn.prepare(
2145            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2146                    indexed_at_ms, dirty, graph_path
2147             FROM repo_snapshots WHERE workspace = ?1
2148             ORDER BY indexed_at_ms DESC LIMIT 1",
2149        )?;
2150        let mut rows = stmt.query_map(params![workspace], |row| {
2151            Ok(RepoSnapshotRecord {
2152                id: row.get(0)?,
2153                workspace: row.get(1)?,
2154                head_commit: row.get(2)?,
2155                dirty_fingerprint: row.get(3)?,
2156                analyzer_version: row.get(4)?,
2157                indexed_at_ms: row.get::<_, i64>(5)? as u64,
2158                dirty: row.get::<_, i64>(6)? != 0,
2159                graph_path: row.get(7)?,
2160            })
2161        })?;
2162        Ok(rows.next().transpose()?)
2163    }
2164
2165    pub fn save_repo_snapshot(
2166        &self,
2167        snapshot: &RepoSnapshotRecord,
2168        facts: &[FileFact],
2169        edges: &[RepoEdge],
2170    ) -> Result<()> {
2171        self.conn.execute(
2172            "INSERT INTO repo_snapshots (
2173                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2174                indexed_at_ms, dirty, graph_path
2175             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2176             ON CONFLICT(id) DO UPDATE SET
2177                workspace=excluded.workspace,
2178                head_commit=excluded.head_commit,
2179                dirty_fingerprint=excluded.dirty_fingerprint,
2180                analyzer_version=excluded.analyzer_version,
2181                indexed_at_ms=excluded.indexed_at_ms,
2182                dirty=excluded.dirty,
2183                graph_path=excluded.graph_path",
2184            params![
2185                snapshot.id,
2186                snapshot.workspace,
2187                snapshot.head_commit,
2188                snapshot.dirty_fingerprint,
2189                snapshot.analyzer_version,
2190                snapshot.indexed_at_ms as i64,
2191                bool_to_i64(snapshot.dirty),
2192                snapshot.graph_path,
2193            ],
2194        )?;
2195        self.conn.execute(
2196            "DELETE FROM file_facts WHERE snapshot_id = ?1",
2197            params![snapshot.id],
2198        )?;
2199        self.conn.execute(
2200            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2201            params![snapshot.id],
2202        )?;
2203        for fact in facts {
2204            self.conn.execute(
2205                "INSERT INTO file_facts (
2206                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2207                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2208                    churn_30d, churn_90d, authors_90d, last_changed_ms
2209                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2210                params![
2211                    fact.snapshot_id,
2212                    fact.path,
2213                    fact.language,
2214                    fact.bytes as i64,
2215                    fact.loc as i64,
2216                    fact.sloc as i64,
2217                    fact.complexity_total as i64,
2218                    fact.max_fn_complexity as i64,
2219                    fact.symbol_count as i64,
2220                    fact.import_count as i64,
2221                    fact.fan_in as i64,
2222                    fact.fan_out as i64,
2223                    fact.churn_30d as i64,
2224                    fact.churn_90d as i64,
2225                    fact.authors_90d as i64,
2226                    fact.last_changed_ms.map(|v| v as i64),
2227                ],
2228            )?;
2229        }
2230        for edge in edges {
2231            self.conn.execute(
2232                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2233                 VALUES (?1, ?2, ?3, ?4, ?5)
2234                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2235                 DO UPDATE SET weight = weight + excluded.weight",
2236                params![
2237                    snapshot.id,
2238                    edge.from_path,
2239                    edge.to_path,
2240                    edge.kind,
2241                    edge.weight as i64,
2242                ],
2243            )?;
2244        }
2245        Ok(())
2246    }
2247
2248    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2249        let mut stmt = self.conn.prepare(
2250            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2251                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2252                    churn_30d, churn_90d, authors_90d, last_changed_ms
2253             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2254        )?;
2255        let rows = stmt.query_map(params![snapshot_id], |row| {
2256            Ok(FileFact {
2257                snapshot_id: row.get(0)?,
2258                path: row.get(1)?,
2259                language: row.get(2)?,
2260                bytes: row.get::<_, i64>(3)? as u64,
2261                loc: row.get::<_, i64>(4)? as u32,
2262                sloc: row.get::<_, i64>(5)? as u32,
2263                complexity_total: row.get::<_, i64>(6)? as u32,
2264                max_fn_complexity: row.get::<_, i64>(7)? as u32,
2265                symbol_count: row.get::<_, i64>(8)? as u32,
2266                import_count: row.get::<_, i64>(9)? as u32,
2267                fan_in: row.get::<_, i64>(10)? as u32,
2268                fan_out: row.get::<_, i64>(11)? as u32,
2269                churn_30d: row.get::<_, i64>(12)? as u32,
2270                churn_90d: row.get::<_, i64>(13)? as u32,
2271                authors_90d: row.get::<_, i64>(14)? as u32,
2272                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2273            })
2274        })?;
2275        Ok(rows.filter_map(|row| row.ok()).collect())
2276    }
2277
2278    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2279        let mut stmt = self.conn.prepare(
2280            "SELECT from_id, to_id, kind, weight
2281             FROM repo_edges WHERE snapshot_id = ?1
2282             ORDER BY kind, from_id, to_id",
2283        )?;
2284        let rows = stmt.query_map(params![snapshot_id], |row| {
2285            Ok(RepoEdge {
2286                from_path: row.get(0)?,
2287                to_path: row.get(1)?,
2288                kind: row.get(2)?,
2289                weight: row.get::<_, i64>(3)? as u32,
2290            })
2291        })?;
2292        Ok(rows.filter_map(|row| row.ok()).collect())
2293    }
2294
2295    pub fn hottest_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2296        self.ranked_files_for_snapshot(snapshot_id, "churn_30d * complexity_total")
2297    }
2298
2299    pub fn most_changed_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2300        self.ranked_files_for_snapshot(snapshot_id, "churn_30d")
2301    }
2302
2303    pub fn most_complex_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2304        self.ranked_files_for_snapshot(snapshot_id, "complexity_total")
2305    }
2306
2307    pub fn highest_risk_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2308        self.ranked_files_for_snapshot(snapshot_id, "churn_30d * authors_90d * complexity_total")
2309    }
2310
2311    fn ranked_files_for_snapshot(
2312        &self,
2313        snapshot_id: &str,
2314        value_sql: &str,
2315    ) -> Result<Vec<RankedFile>> {
2316        let sql = format!(
2317            "SELECT path, {value_sql}, complexity_total, churn_30d
2318             FROM file_facts WHERE snapshot_id = ?1
2319             ORDER BY {value_sql} DESC, path ASC LIMIT 10"
2320        );
2321        let mut stmt = self.conn.prepare(&sql)?;
2322        let rows = stmt.query_map(params![snapshot_id], ranked_file_row)?;
2323        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2324    }
2325
2326    pub fn pain_hotspots_for_snapshot(
2327        &self,
2328        snapshot_id: &str,
2329        workspace: &str,
2330        start_ms: u64,
2331        end_ms: u64,
2332    ) -> Result<Vec<RankedFile>> {
2333        let mut stmt = self.conn.prepare(PAIN_HOTSPOTS_SQL)?;
2334        let rows = stmt.query_map(
2335            params![snapshot_id, workspace, start_ms as i64, end_ms as i64],
2336            ranked_file_row,
2337        )?;
2338        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2339    }
2340
2341    pub fn tool_rank_rows_in_window(
2342        &self,
2343        workspace: &str,
2344        start_ms: u64,
2345        end_ms: u64,
2346    ) -> Result<Vec<RankedTool>> {
2347        let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
2348        let rows = stmt.query_map(
2349            params![workspace, start_ms as i64, end_ms as i64],
2350            ranked_tool_row,
2351        )?;
2352        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2353    }
2354
2355    pub fn tool_spans_in_window(
2356        &self,
2357        workspace: &str,
2358        start_ms: u64,
2359        end_ms: u64,
2360    ) -> Result<Vec<ToolSpanView>> {
2361        let mut stmt = self.conn.prepare(
2362            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2363                    reasoning_tokens, cost_usd_e6, paths_json,
2364                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2365             FROM (
2366                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2367                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2368                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2369                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2370                        ts.started_at_ms AS sort_ms
2371                 FROM tool_spans ts
2372                 JOIN sessions s ON s.id = ts.session_id
2373                 WHERE s.workspace = ?1
2374                   AND ts.started_at_ms >= ?2
2375                   AND ts.started_at_ms <= ?3
2376                 UNION ALL
2377                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2378                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2379                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2380                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2381                        ts.ended_at_ms AS sort_ms
2382                 FROM tool_spans ts
2383                 JOIN sessions s ON s.id = ts.session_id
2384                 WHERE s.workspace = ?1
2385                   AND ts.started_at_ms IS NULL
2386                   AND ts.ended_at_ms >= ?2
2387                   AND ts.ended_at_ms <= ?3
2388             )
2389             ORDER BY sort_ms DESC",
2390        )?;
2391        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2392            let paths_json: String = row.get(8)?;
2393            Ok(ToolSpanView {
2394                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2395                tool: row
2396                    .get::<_, Option<String>>(1)?
2397                    .unwrap_or_else(|| "unknown".into()),
2398                status: row.get(2)?,
2399                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2400                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2401                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2402                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2403                cost_usd_e6: row.get(7)?,
2404                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2405                parent_span_id: row.get(9)?,
2406                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2407                subtree_cost_usd_e6: row.get(11)?,
2408                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2409            })
2410        })?;
2411        Ok(rows.filter_map(|row| row.ok()).collect())
2412    }
2413
2414    pub fn session_span_tree(
2415        &self,
2416        session_id: &str,
2417    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2418        let last_event_seq = self.last_event_seq_for_session(session_id)?;
2419        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2420            && entry.session_id == session_id
2421            && entry.last_event_seq == last_event_seq
2422        {
2423            return Ok(entry.nodes.clone());
2424        }
2425        let mut stmt = self.conn.prepare(
2426            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2427                    reasoning_tokens, cost_usd_e6, paths_json,
2428                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2429             FROM tool_spans
2430             WHERE session_id = ?1
2431             ORDER BY depth ASC, started_at_ms ASC",
2432        )?;
2433        let rows = stmt.query_map(params![session_id], |row| {
2434            let paths_json: String = row.get(8)?;
2435            Ok(crate::metrics::types::ToolSpanView {
2436                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2437                tool: row
2438                    .get::<_, Option<String>>(1)?
2439                    .unwrap_or_else(|| "unknown".into()),
2440                status: row.get(2)?,
2441                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2442                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2443                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2444                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2445                cost_usd_e6: row.get(7)?,
2446                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2447                parent_span_id: row.get(9)?,
2448                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2449                subtree_cost_usd_e6: row.get(11)?,
2450                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2451            })
2452        })?;
2453        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2454        let nodes = crate::store::span_tree::build_tree(spans);
2455        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2456            session_id: session_id.to_string(),
2457            last_event_seq,
2458            nodes: nodes.clone(),
2459        });
2460        Ok(nodes)
2461    }
2462
2463    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2464        let seq = self
2465            .conn
2466            .query_row(
2467                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2468                params![session_id],
2469                |r| r.get::<_, Option<i64>>(0),
2470            )?
2471            .map(|v| v as u64);
2472        Ok(seq)
2473    }
2474
2475    /// Sync-shaped tool spans whose session falls in `[start_ms, end_ms]`. Mirrors
2476    /// `retro_events_in_window` for the spans table so `kaizen telemetry push` can ship
2477    /// `IngestExportBatch::ToolSpans` next to the events batch. Window matches on
2478    /// `started_at_ms` first, falling back to `ended_at_ms` for spans that never started a
2479    /// timer (status-only rows). Workspace filter joins through `sessions.workspace`.
2480    pub fn tool_spans_sync_rows_in_window(
2481        &self,
2482        workspace: &str,
2483        start_ms: u64,
2484        end_ms: u64,
2485    ) -> Result<Vec<ToolSpanSyncRow>> {
2486        let mut stmt = self.conn.prepare(
2487            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2488                    lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2489             FROM (
2490                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2491                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2492                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2493                        ts.started_at_ms AS sort_ms
2494                 FROM tool_spans ts
2495                 JOIN sessions s ON s.id = ts.session_id
2496                 WHERE s.workspace = ?1
2497                   AND ts.started_at_ms IS NOT NULL
2498                   AND ts.started_at_ms >= ?2
2499                   AND ts.started_at_ms <= ?3
2500                 UNION ALL
2501                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2502                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2503                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2504                        ts.ended_at_ms AS sort_ms
2505                 FROM tool_spans ts
2506                 JOIN sessions s ON s.id = ts.session_id
2507                 WHERE s.workspace = ?1
2508                   AND ts.started_at_ms IS NULL
2509                   AND ts.ended_at_ms IS NOT NULL
2510                   AND ts.ended_at_ms >= ?2
2511                   AND ts.ended_at_ms <= ?3
2512             )
2513             ORDER BY sort_ms ASC, span_id ASC",
2514        )?;
2515        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2516            let paths_json: String = row.get(12)?;
2517            Ok(ToolSpanSyncRow {
2518                span_id: row.get(0)?,
2519                session_id: row.get(1)?,
2520                tool: row.get(2)?,
2521                tool_call_id: row.get(3)?,
2522                status: row.get(4)?,
2523                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2524                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2525                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2526                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2527                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2528                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2529                cost_usd_e6: row.get(11)?,
2530                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2531            })
2532        })?;
2533        Ok(rows.filter_map(|row| row.ok()).collect())
2534    }
2535
2536    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2537        let mut stmt = self.conn.prepare(
2538            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2539                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2540             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2541        )?;
2542        let rows = stmt.query_map(params![session_id], |row| {
2543            let paths_json: String = row.get(12)?;
2544            Ok(ToolSpanSyncRow {
2545                span_id: row.get(0)?,
2546                session_id: row.get(1)?,
2547                tool: row.get(2)?,
2548                tool_call_id: row.get(3)?,
2549                status: row.get(4)?,
2550                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2551                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2552                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2553                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2554                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2555                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2556                cost_usd_e6: row.get(11)?,
2557                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2558            })
2559        })?;
2560        Ok(rows.filter_map(|row| row.ok()).collect())
2561    }
2562
2563    pub fn upsert_trace_span(&self, span: &TraceSpanRecord) -> Result<()> {
2564        self.conn.execute(
2565            "INSERT INTO trace_spans (
2566                span_id, trace_id, parent_span_id, session_id, kind, name, status,
2567                started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2568                reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2569             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)
2570             ON CONFLICT(span_id) DO UPDATE SET
2571                trace_id=excluded.trace_id, parent_span_id=excluded.parent_span_id,
2572                session_id=excluded.session_id, kind=excluded.kind, name=excluded.name,
2573                status=excluded.status, started_at_ms=excluded.started_at_ms,
2574                ended_at_ms=excluded.ended_at_ms, duration_ms=excluded.duration_ms,
2575                model=excluded.model, tool=excluded.tool, tokens_in=excluded.tokens_in,
2576                tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
2577                cost_usd_e6=excluded.cost_usd_e6,
2578                context_used_tokens=excluded.context_used_tokens,
2579                context_max_tokens=excluded.context_max_tokens, payload=excluded.payload",
2580            params![
2581                span.span_id.as_str(),
2582                span.trace_id.as_str(),
2583                span.parent_span_id.as_deref(),
2584                span.session_id.as_str(),
2585                span.kind.as_str(),
2586                span.name.as_str(),
2587                span.status.as_str(),
2588                span.started_at_ms.map(|v| v as i64),
2589                span.ended_at_ms.map(|v| v as i64),
2590                span.duration_ms.map(|v| v as i64),
2591                span.model.as_deref(),
2592                span.tool.as_deref(),
2593                span.tokens_in.map(|v| v as i64),
2594                span.tokens_out.map(|v| v as i64),
2595                span.reasoning_tokens.map(|v| v as i64),
2596                span.cost_usd_e6,
2597                span.context_used_tokens.map(|v| v as i64),
2598                span.context_max_tokens.map(|v| v as i64),
2599                serde_json::to_string(&span.payload)?,
2600            ],
2601        )?;
2602        Ok(())
2603    }
2604
2605    pub fn trace_spans_for_session(&self, session_id: &str) -> Result<Vec<TraceSpanRecord>> {
2606        let mut stmt = self.conn.prepare(
2607            "SELECT span_id, trace_id, parent_span_id, session_id, kind, name, status,
2608                    started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2609                    reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2610             FROM trace_spans WHERE session_id = ?1
2611             ORDER BY COALESCE(started_at_ms, ended_at_ms, 0), span_id",
2612        )?;
2613        let rows = stmt.query_map(params![session_id], trace_span_from_row)?;
2614        Ok(rows.filter_map(|row| row.ok()).collect())
2615    }
2616
2617    pub(crate) fn capture_quality_rows(
2618        &self,
2619        workspace: &str,
2620        start_ms: u64,
2621        end_ms: u64,
2622    ) -> Result<Vec<CaptureQualityRow>> {
2623        let mut stmt = self.conn.prepare(
2624            "SELECT e.source,
2625                    e.tokens_in IS NOT NULL OR e.tokens_out IS NOT NULL OR e.reasoning_tokens IS NOT NULL,
2626                    e.latency_ms IS NOT NULL OR e.ttft_ms IS NOT NULL,
2627                    e.context_used_tokens IS NOT NULL AND e.context_max_tokens IS NOT NULL
2628             FROM events e JOIN sessions s ON s.id = e.session_id
2629             WHERE s.workspace = ?1 AND e.ts_ms >= ?2 AND e.ts_ms <= ?3",
2630        )?;
2631        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2632            Ok(CaptureQualityRow {
2633                source: row.get(0)?,
2634                has_tokens: row.get::<_, i64>(1)? != 0,
2635                has_latency: row.get::<_, i64>(2)? != 0,
2636                has_context: row.get::<_, i64>(3)? != 0,
2637            })
2638        })?;
2639        Ok(rows.filter_map(|row| row.ok()).collect())
2640    }
2641
2642    pub(crate) fn trace_span_quality_rows(
2643        &self,
2644        workspace: &str,
2645        start_ms: u64,
2646        end_ms: u64,
2647    ) -> Result<Vec<TraceSpanQualityRow>> {
2648        let mut stmt = self.conn.prepare(
2649            "SELECT ts.kind,
2650                    ts.parent_span_id IS NOT NULL
2651                    AND parent.span_id IS NULL
2652                    AND ts.kind NOT IN ('session', 'agent')
2653             FROM trace_spans ts
2654             JOIN sessions s ON s.id = ts.session_id
2655             LEFT JOIN trace_spans parent ON parent.span_id = ts.parent_span_id
2656             WHERE s.workspace = ?1
2657               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
2658               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3",
2659        )?;
2660        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2661            Ok(TraceSpanQualityRow {
2662                kind: row.get(0)?,
2663                is_orphan: row.get::<_, i64>(1)? != 0,
2664            })
2665        })?;
2666        Ok(rows.filter_map(|row| row.ok()).collect())
2667    }
2668
2669    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2670        self.conn.execute(
2671            "INSERT OR REPLACE INTO session_evals
2672             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2673             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2674            rusqlite::params![
2675                eval.id,
2676                eval.session_id,
2677                eval.judge_model,
2678                eval.rubric_id,
2679                eval.score,
2680                eval.rationale,
2681                eval.flagged as i64,
2682                eval.created_at_ms as i64,
2683            ],
2684        )?;
2685        Ok(())
2686    }
2687
2688    pub fn list_evals_in_window(
2689        &self,
2690        start_ms: u64,
2691        end_ms: u64,
2692    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2693        let mut stmt = self.conn.prepare(
2694            "SELECT id, session_id, judge_model, rubric_id, score,
2695                    rationale, flagged, created_at_ms
2696             FROM session_evals
2697             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2698             ORDER BY created_at_ms ASC",
2699        )?;
2700        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2701            Ok(crate::eval::types::EvalRow {
2702                id: r.get(0)?,
2703                session_id: r.get(1)?,
2704                judge_model: r.get(2)?,
2705                rubric_id: r.get(3)?,
2706                score: r.get(4)?,
2707                rationale: r.get(5)?,
2708                flagged: r.get::<_, i64>(6)? != 0,
2709                created_at_ms: r.get::<_, i64>(7)? as u64,
2710            })
2711        })?;
2712        rows.collect()
2713    }
2714
2715    pub fn list_evals_for_session(
2716        &self,
2717        session_id: &str,
2718    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2719        let mut stmt = self.conn.prepare(
2720            "SELECT id, session_id, judge_model, rubric_id, score,
2721                    rationale, flagged, created_at_ms
2722             FROM session_evals
2723             WHERE session_id = ?1
2724             ORDER BY created_at_ms DESC",
2725        )?;
2726        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2727            Ok(crate::eval::types::EvalRow {
2728                id: r.get(0)?,
2729                session_id: r.get(1)?,
2730                judge_model: r.get(2)?,
2731                rubric_id: r.get(3)?,
2732                score: r.get(4)?,
2733                rationale: r.get(5)?,
2734                flagged: r.get::<_, i64>(6)? != 0,
2735                created_at_ms: r.get::<_, i64>(7)? as u64,
2736            })
2737        })?;
2738        rows.collect()
2739    }
2740
2741    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2742        use crate::feedback::types::FeedbackLabel;
2743        self.conn.execute(
2744            "INSERT OR REPLACE INTO session_feedback
2745             (id, session_id, score, label, note, created_at_ms)
2746             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2747            rusqlite::params![
2748                r.id,
2749                r.session_id,
2750                r.score.as_ref().map(|s| s.0 as i64),
2751                r.label.as_ref().map(FeedbackLabel::to_db_str),
2752                r.note,
2753                r.created_at_ms as i64,
2754            ],
2755        )?;
2756        let payload = serde_json::to_string(r).unwrap_or_default();
2757        self.conn.execute(
2758            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2759             VALUES (?1, 'session_feedback', ?2, 0)",
2760            rusqlite::params![r.session_id, payload],
2761        )?;
2762        Ok(())
2763    }
2764
2765    pub fn list_feedback_in_window(
2766        &self,
2767        start_ms: u64,
2768        end_ms: u64,
2769    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2770        let mut stmt = self.conn.prepare(
2771            "SELECT id, session_id, score, label, note, created_at_ms
2772             FROM session_feedback
2773             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2774             ORDER BY created_at_ms ASC",
2775        )?;
2776        let rows = stmt.query_map(
2777            rusqlite::params![start_ms as i64, end_ms as i64],
2778            feedback_row,
2779        )?;
2780        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2781    }
2782
2783    pub fn feedback_for_sessions(
2784        &self,
2785        ids: &[String],
2786    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2787        if ids.is_empty() {
2788            return Ok(std::collections::HashMap::new());
2789        }
2790        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2791        let sql = format!(
2792            "SELECT id, session_id, score, label, note, created_at_ms
2793             FROM session_feedback WHERE session_id IN ({placeholders})
2794             ORDER BY created_at_ms DESC"
2795        );
2796        let mut stmt = self.conn.prepare(&sql)?;
2797        let params: Vec<&dyn rusqlite::ToSql> =
2798            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2799        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2800        let mut map = std::collections::HashMap::new();
2801        for row in rows {
2802            let r = row?;
2803            map.entry(r.session_id.clone()).or_insert(r);
2804        }
2805        Ok(map)
2806    }
2807
2808    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2809        self.conn.execute(
2810            "INSERT INTO session_outcomes (
2811                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2812                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2813            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2814            ON CONFLICT(session_id) DO UPDATE SET
2815                test_passed=excluded.test_passed,
2816                test_failed=excluded.test_failed,
2817                test_skipped=excluded.test_skipped,
2818                build_ok=excluded.build_ok,
2819                lint_errors=excluded.lint_errors,
2820                revert_lines_14d=excluded.revert_lines_14d,
2821                pr_open=excluded.pr_open,
2822                ci_ok=excluded.ci_ok,
2823                measured_at_ms=excluded.measured_at_ms,
2824                measure_error=excluded.measure_error",
2825            params![
2826                row.session_id,
2827                row.test_passed,
2828                row.test_failed,
2829                row.test_skipped,
2830                row.build_ok.map(bool_to_i64),
2831                row.lint_errors,
2832                row.revert_lines_14d,
2833                row.pr_open,
2834                row.ci_ok.map(bool_to_i64),
2835                row.measured_at_ms as i64,
2836                row.measure_error.as_deref(),
2837            ],
2838        )?;
2839        Ok(())
2840    }
2841
2842    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2843        let mut stmt = self.conn.prepare(
2844            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2845                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2846             FROM session_outcomes WHERE session_id = ?1",
2847        )?;
2848        let row = stmt
2849            .query_row(params![session_id], outcome_row)
2850            .optional()?;
2851        Ok(row)
2852    }
2853
2854    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2855    pub fn list_session_outcomes_in_window(
2856        &self,
2857        workspace: &str,
2858        start_ms: u64,
2859        end_ms: u64,
2860    ) -> Result<Vec<SessionOutcomeRow>> {
2861        let mut stmt = self.conn.prepare(
2862            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2863                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2864             FROM session_outcomes o
2865             JOIN sessions s ON s.id = o.session_id
2866             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2867             ORDER BY o.measured_at_ms ASC",
2868        )?;
2869        let rows = stmt.query_map(
2870            params![workspace, start_ms as i64, end_ms as i64],
2871            outcome_row,
2872        )?;
2873        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2874    }
2875
2876    pub fn append_session_sample(
2877        &self,
2878        session_id: &str,
2879        ts_ms: u64,
2880        pid: u32,
2881        cpu_percent: Option<f64>,
2882        rss_bytes: Option<u64>,
2883    ) -> Result<()> {
2884        self.conn.execute(
2885            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2886             VALUES (?1, ?2, ?3, ?4, ?5)",
2887            params![
2888                session_id,
2889                ts_ms as i64,
2890                pid as i64,
2891                cpu_percent,
2892                rss_bytes.map(|b| b as i64)
2893            ],
2894        )?;
2895        Ok(())
2896    }
2897
2898    /// Per-session maxima for retro heuristics.
2899    pub fn list_session_sample_aggs_in_window(
2900        &self,
2901        workspace: &str,
2902        start_ms: u64,
2903        end_ms: u64,
2904    ) -> Result<Vec<SessionSampleAgg>> {
2905        let mut stmt = self.conn.prepare(
2906            "SELECT ss.session_id, COUNT(*) AS n,
2907                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2908             FROM session_samples ss
2909             JOIN sessions s ON s.id = ss.session_id
2910             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2911             GROUP BY ss.session_id",
2912        )?;
2913        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2914            let sid: String = r.get(0)?;
2915            let n: i64 = r.get(1)?;
2916            let max_cpu: Option<f64> = r.get(2)?;
2917            let max_rss: Option<i64> = r.get(3)?;
2918            Ok(SessionSampleAgg {
2919                session_id: sid,
2920                sample_count: n as u64,
2921                max_cpu_percent: max_cpu.unwrap_or(0.0),
2922                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2923            })
2924        })?;
2925        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2926    }
2927
2928    pub fn list_sessions_for_eval(
2929        &self,
2930        since_ms: u64,
2931        min_cost_usd: f64,
2932    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2933        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2934        let mut stmt = self.conn.prepare(
2935            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2936                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2937                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2938                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2939             FROM sessions s
2940             WHERE s.started_at_ms >= ?1
2941               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2942               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2943             ORDER BY s.started_at_ms DESC",
2944        )?;
2945        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2946            Ok((
2947                r.get::<_, String>(0)?,
2948                r.get::<_, String>(1)?,
2949                r.get::<_, Option<String>>(2)?,
2950                r.get::<_, String>(3)?,
2951                r.get::<_, i64>(4)?,
2952                r.get::<_, Option<i64>>(5)?,
2953                r.get::<_, String>(6)?,
2954                r.get::<_, String>(7)?,
2955                r.get::<_, Option<String>>(8)?,
2956                r.get::<_, Option<String>>(9)?,
2957                r.get::<_, Option<String>>(10)?,
2958                r.get::<_, Option<i64>>(11)?,
2959                r.get::<_, Option<i64>>(12)?,
2960                r.get::<_, Option<String>>(13)?,
2961                r.get::<_, Option<String>>(14)?,
2962                r.get::<_, Option<String>>(15)?,
2963                r.get::<_, Option<String>>(16)?,
2964                r.get::<_, Option<String>>(17)?,
2965                r.get::<_, Option<String>>(18)?,
2966                r.get::<_, Option<i64>>(19)?,
2967                r.get::<_, Option<i64>>(20)?,
2968            ))
2969        })?;
2970        let mut out = Vec::new();
2971        for row in rows {
2972            let (
2973                id,
2974                agent,
2975                model,
2976                workspace,
2977                started,
2978                ended,
2979                status_str,
2980                trace,
2981                start_commit,
2982                end_commit,
2983                branch,
2984                dirty_start,
2985                dirty_end,
2986                source,
2987                prompt_fingerprint,
2988                parent_session_id,
2989                agent_version,
2990                os,
2991                arch,
2992                repo_file_count,
2993                repo_total_loc,
2994            ) = row?;
2995            out.push(crate::core::event::SessionRecord {
2996                id,
2997                agent,
2998                model,
2999                workspace,
3000                started_at_ms: started as u64,
3001                ended_at_ms: ended.map(|v| v as u64),
3002                status: status_from_str(&status_str),
3003                trace_path: trace,
3004                start_commit,
3005                end_commit,
3006                branch,
3007                dirty_start: dirty_start.map(i64_to_bool),
3008                dirty_end: dirty_end.map(i64_to_bool),
3009                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
3010                prompt_fingerprint,
3011                parent_session_id,
3012                agent_version,
3013                os,
3014                arch,
3015                repo_file_count: repo_file_count.map(|v| v as u32),
3016                repo_total_loc: repo_total_loc.map(|v| v as u64),
3017            });
3018        }
3019        Ok(out)
3020    }
3021
3022    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
3023        self.conn.execute(
3024            "INSERT OR IGNORE INTO prompt_snapshots
3025             (fingerprint, captured_at_ms, files_json, total_bytes)
3026             VALUES (?1, ?2, ?3, ?4)",
3027            params![
3028                snap.fingerprint,
3029                snap.captured_at_ms as i64,
3030                snap.files_json,
3031                snap.total_bytes as i64
3032            ],
3033        )?;
3034        Ok(())
3035    }
3036
3037    pub fn get_prompt_snapshot(
3038        &self,
3039        fingerprint: &str,
3040    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
3041        self.conn
3042            .query_row(
3043                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3044                 FROM prompt_snapshots WHERE fingerprint = ?1",
3045                params![fingerprint],
3046                |r| {
3047                    Ok(crate::prompt::PromptSnapshot {
3048                        fingerprint: r.get(0)?,
3049                        captured_at_ms: r.get::<_, i64>(1)? as u64,
3050                        files_json: r.get(2)?,
3051                        total_bytes: r.get::<_, i64>(3)? as u64,
3052                    })
3053                },
3054            )
3055            .optional()
3056            .map_err(Into::into)
3057    }
3058
3059    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
3060        let mut stmt = self.conn.prepare(
3061            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3062             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
3063        )?;
3064        let rows = stmt.query_map([], |r| {
3065            Ok(crate::prompt::PromptSnapshot {
3066                fingerprint: r.get(0)?,
3067                captured_at_ms: r.get::<_, i64>(1)? as u64,
3068                files_json: r.get(2)?,
3069                total_bytes: r.get::<_, i64>(3)? as u64,
3070            })
3071        })?;
3072        Ok(rows.filter_map(|r| r.ok()).collect())
3073    }
3074
3075    /// Sessions with a non-null prompt_fingerprint in the given window.
3076    pub fn sessions_with_prompt_fingerprint(
3077        &self,
3078        workspace: &str,
3079        start_ms: u64,
3080        end_ms: u64,
3081    ) -> Result<Vec<(String, String)>> {
3082        let mut stmt = self.conn.prepare(
3083            "SELECT id, prompt_fingerprint FROM sessions
3084             WHERE workspace = ?1
3085               AND started_at_ms >= ?2 AND started_at_ms < ?3
3086               AND prompt_fingerprint IS NOT NULL",
3087        )?;
3088        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
3089            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
3090        })?;
3091        Ok(rows.filter_map(|r| r.ok()).collect())
3092    }
3093}
3094
3095impl Drop for Store {
3096    fn drop(&mut self) {
3097        if let Some(writer) = self.search_writer.get_mut().as_mut() {
3098            let _ = writer.commit();
3099        }
3100    }
3101}
3102
3103fn now_ms() -> u64 {
3104    std::time::SystemTime::now()
3105        .duration_since(std::time::UNIX_EPOCH)
3106        .unwrap_or_default()
3107        .as_millis() as u64
3108}
3109
3110fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
3111    let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
3112    let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
3113    Ok(rows.filter_map(|r| r.ok()).collect())
3114}
3115
3116fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
3117    raw.and_then(|s| s.trim().parse::<u64>().ok())
3118        .unwrap_or(DEFAULT_MMAP_MB)
3119        .saturating_mul(1024)
3120        .saturating_mul(1024)
3121        .min(i64::MAX as u64) as i64
3122}
3123
3124fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
3125    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
3126    conn.execute_batch(&format!(
3127        "
3128        PRAGMA journal_mode=WAL;
3129        PRAGMA busy_timeout=5000;
3130        PRAGMA synchronous=NORMAL;
3131        PRAGMA cache_size=-65536;
3132        PRAGMA mmap_size={mmap_size};
3133        PRAGMA temp_store=MEMORY;
3134        PRAGMA wal_autocheckpoint=1000;
3135        "
3136    ))?;
3137    if mode == StoreOpenMode::ReadOnlyQuery {
3138        conn.execute_batch("PRAGMA query_only=ON;")?;
3139    }
3140    Ok(())
3141}
3142
3143fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
3144    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
3145}
3146
3147fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
3148    let status_str: String = row.get(6)?;
3149    Ok(SessionRecord {
3150        id: row.get(0)?,
3151        agent: row.get(1)?,
3152        model: row.get(2)?,
3153        workspace: row.get(3)?,
3154        started_at_ms: row.get::<_, i64>(4)? as u64,
3155        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3156        status: status_from_str(&status_str),
3157        trace_path: row.get(7)?,
3158        start_commit: row.get(8)?,
3159        end_commit: row.get(9)?,
3160        branch: row.get(10)?,
3161        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3162        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3163        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
3164        prompt_fingerprint: row.get(14)?,
3165        parent_session_id: row.get(15)?,
3166        agent_version: row.get(16)?,
3167        os: row.get(17)?,
3168        arch: row.get(18)?,
3169        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3170        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3171    })
3172}
3173
3174fn ranked_file_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedFile> {
3175    Ok(RankedFile {
3176        path: row.get(0)?,
3177        value: row.get::<_, i64>(1)? as u64,
3178        complexity_total: row.get::<_, i64>(2)? as u32,
3179        churn_30d: row.get::<_, i64>(3)? as u32,
3180    })
3181}
3182
3183fn ranked_tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedTool> {
3184    Ok(RankedTool {
3185        tool: row.get(0)?,
3186        calls: row.get::<_, i64>(1)? as u64,
3187        p50_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
3188        p95_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
3189        total_tokens: row.get::<_, i64>(4)? as u64,
3190        total_reasoning_tokens: row.get::<_, i64>(5)? as u64,
3191    })
3192}
3193
3194fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
3195    let payload_str: String = row.get(12)?;
3196    Ok(Event {
3197        session_id: row.get(0)?,
3198        seq: row.get::<_, i64>(1)? as u64,
3199        ts_ms: row.get::<_, i64>(2)? as u64,
3200        ts_exact: row.get::<_, i64>(3)? != 0,
3201        kind: kind_from_str(&row.get::<_, String>(4)?),
3202        source: source_from_str(&row.get::<_, String>(5)?),
3203        tool: row.get(6)?,
3204        tool_call_id: row.get(7)?,
3205        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
3206        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3207        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
3208        cost_usd_e6: row.get(11)?,
3209        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
3210        stop_reason: row.get(13)?,
3211        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3212        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
3213        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
3214        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3215        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
3216        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3217        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
3218        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
3219    })
3220}
3221
3222fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> {
3223    Ok((row.get(22)?, event_row(row)?))
3224}
3225
3226fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
3227    let mut clauses = vec!["workspace = ?".to_string()];
3228    let mut args = vec![Value::Text(workspace.to_string())];
3229    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
3230        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
3231        args.push(Value::Text(format!("{}%", escape_like(prefix))));
3232    }
3233    if let Some(status) = &filter.status {
3234        clauses.push("status = ?".to_string());
3235        args.push(Value::Text(format!("{status:?}")));
3236    }
3237    if let Some(since_ms) = filter.since_ms {
3238        clauses.push("started_at_ms >= ?".to_string());
3239        args.push(Value::Integer(since_ms as i64));
3240    }
3241    (format!("WHERE {}", clauses.join(" AND ")), args)
3242}
3243
3244fn escape_like(raw: &str) -> String {
3245    raw.to_lowercase()
3246        .replace('\\', "\\\\")
3247        .replace('%', "\\%")
3248        .replace('_', "\\_")
3249}
3250
3251fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
3252    let cost: i64 = conn.query_row(
3253        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
3254        params![workspace], |r| r.get(0),
3255    )?;
3256    let with_cost: i64 = conn.query_row(
3257        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
3258        params![workspace], |r| r.get(0),
3259    )?;
3260    Ok((cost, with_cost as u64))
3261}
3262
3263fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
3264    let build_raw: Option<i64> = r.get(4)?;
3265    let ci_raw: Option<i64> = r.get(8)?;
3266    Ok(SessionOutcomeRow {
3267        session_id: r.get(0)?,
3268        test_passed: r.get(1)?,
3269        test_failed: r.get(2)?,
3270        test_skipped: r.get(3)?,
3271        build_ok: build_raw.map(|v| v != 0),
3272        lint_errors: r.get(5)?,
3273        revert_lines_14d: r.get(6)?,
3274        pr_open: r.get(7)?,
3275        ci_ok: ci_raw.map(|v| v != 0),
3276        measured_at_ms: r.get::<_, i64>(9)? as u64,
3277        measure_error: r.get(10)?,
3278    })
3279}
3280
3281fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
3282    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
3283    let score = r
3284        .get::<_, Option<i64>>(2)?
3285        .and_then(|v| FeedbackScore::new(v as u8));
3286    let label = r
3287        .get::<_, Option<String>>(3)?
3288        .and_then(|s| FeedbackLabel::from_str_opt(&s));
3289    Ok(FeedbackRecord {
3290        id: r.get(0)?,
3291        session_id: r.get(1)?,
3292        score,
3293        label,
3294        note: r.get(4)?,
3295        created_at_ms: r.get::<_, i64>(5)? as u64,
3296    })
3297}
3298
3299fn day_label(day_idx: u64) -> &'static str {
3300    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
3301}
3302
3303fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
3304    let week_ago = now.saturating_sub(7 * 86_400_000);
3305    let mut stmt = conn
3306        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
3307    let days: Vec<u64> = stmt
3308        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
3309        .filter_map(|r| r.ok())
3310        .map(|v| v as u64 / 86_400_000)
3311        .collect();
3312    let today = now / 86_400_000;
3313    Ok((0u64..7)
3314        .map(|i| {
3315            let d = today.saturating_sub(6 - i);
3316            (
3317                day_label(d).to_string(),
3318                days.iter().filter(|&&x| x == d).count() as u64,
3319            )
3320        })
3321        .collect())
3322}
3323
3324fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3325    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3326               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3327               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3328               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3329               COUNT(e.id) FROM sessions s \
3330               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3331               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3332    let mut stmt = conn.prepare(sql)?;
3333    let out: Vec<(SessionRecord, u64)> = stmt
3334        .query_map(params![workspace], |r| {
3335            let st: String = r.get(6)?;
3336            Ok((
3337                SessionRecord {
3338                    id: r.get(0)?,
3339                    agent: r.get(1)?,
3340                    model: r.get(2)?,
3341                    workspace: r.get(3)?,
3342                    started_at_ms: r.get::<_, i64>(4)? as u64,
3343                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3344                    status: status_from_str(&st),
3345                    trace_path: r.get(7)?,
3346                    start_commit: r.get(8)?,
3347                    end_commit: r.get(9)?,
3348                    branch: r.get(10)?,
3349                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3350                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3351                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3352                    prompt_fingerprint: r.get(14)?,
3353                    parent_session_id: r.get(15)?,
3354                    agent_version: r.get(16)?,
3355                    os: r.get(17)?,
3356                    arch: r.get(18)?,
3357                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3358                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3359                },
3360                r.get::<_, i64>(21)? as u64,
3361            ))
3362        })?
3363        .filter_map(|r| r.ok())
3364        .collect();
3365    Ok(out)
3366}
3367
3368fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3369    let mut stmt = conn.prepare(
3370        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3371         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3372    )?;
3373    let out: Vec<(String, u64)> = stmt
3374        .query_map(params![workspace], |r| {
3375            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3376        })?
3377        .filter_map(|r| r.ok())
3378        .collect();
3379    Ok(out)
3380}
3381
3382fn trace_span_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TraceSpanRecord> {
3383    let kind: String = row.get(4)?;
3384    let payload: String = row.get(18)?;
3385    Ok(TraceSpanRecord {
3386        span_id: row.get(0)?,
3387        trace_id: row.get(1)?,
3388        parent_span_id: row.get(2)?,
3389        session_id: row.get(3)?,
3390        kind: TraceSpanKind::parse(&kind),
3391        name: row.get(5)?,
3392        status: row.get(6)?,
3393        started_at_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
3394        ended_at_ms: row.get::<_, Option<i64>>(8)?.map(|v| v as u64),
3395        duration_ms: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3396        model: row.get(10)?,
3397        tool: row.get(11)?,
3398        tokens_in: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
3399        tokens_out: row.get::<_, Option<i64>>(13)?.map(|v| v as u32),
3400        reasoning_tokens: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3401        cost_usd_e6: row.get(15)?,
3402        context_used_tokens: row.get::<_, Option<i64>>(16)?.map(|v| v as u32),
3403        context_max_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3404        payload: serde_json::from_str(&payload).unwrap_or_default(),
3405    })
3406}
3407
3408fn status_from_str(s: &str) -> SessionStatus {
3409    match s {
3410        "Running" => SessionStatus::Running,
3411        "Waiting" => SessionStatus::Waiting,
3412        "Idle" => SessionStatus::Idle,
3413        _ => SessionStatus::Done,
3414    }
3415}
3416
3417fn projector_legacy_mode() -> bool {
3418    std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3419}
3420
3421fn is_stop_event(e: &Event) -> bool {
3422    if !matches!(e.kind, EventKind::Hook) {
3423        return false;
3424    }
3425    e.payload
3426        .get("event")
3427        .and_then(|v| v.as_str())
3428        .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3429        == Some("Stop")
3430}
3431
3432fn kind_from_str(s: &str) -> EventKind {
3433    match s {
3434        "ToolCall" => EventKind::ToolCall,
3435        "ToolResult" => EventKind::ToolResult,
3436        "Message" => EventKind::Message,
3437        "Error" => EventKind::Error,
3438        "Cost" => EventKind::Cost,
3439        "Hook" => EventKind::Hook,
3440        "Lifecycle" => EventKind::Lifecycle,
3441        _ => EventKind::Hook,
3442    }
3443}
3444
3445fn source_from_str(s: &str) -> EventSource {
3446    match s {
3447        "Tail" => EventSource::Tail,
3448        "Hook" => EventSource::Hook,
3449        _ => EventSource::Proxy,
3450    }
3451}
3452
3453fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3454    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3455    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3456    ensure_column(conn, "sessions", "branch", "TEXT")?;
3457    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3458    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3459    ensure_column(
3460        conn,
3461        "sessions",
3462        "repo_binding_source",
3463        "TEXT NOT NULL DEFAULT ''",
3464    )?;
3465    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3466    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3467    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3468    ensure_column(conn, "events", "stop_reason", "TEXT")?;
3469    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3470    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3471    ensure_column(conn, "events", "retry_count", "INTEGER")?;
3472    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3473    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3474    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3475    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3476    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3477    ensure_column(
3478        conn,
3479        "sync_outbox",
3480        "kind",
3481        "TEXT NOT NULL DEFAULT 'events'",
3482    )?;
3483    ensure_column(
3484        conn,
3485        "experiments",
3486        "state",
3487        "TEXT NOT NULL DEFAULT 'Draft'",
3488    )?;
3489    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3490    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3491    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3492    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3493    ensure_column(conn, "sessions", "os", "TEXT")?;
3494    ensure_column(conn, "sessions", "arch", "TEXT")?;
3495    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3496    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3497    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3498    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3499    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3500    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3501    conn.execute_batch(
3502        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3503         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3504    )?;
3505    Ok(())
3506}
3507
3508fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3509    if has_column(conn, table, column)? {
3510        return Ok(());
3511    }
3512    let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}");
3513    match conn.execute(&sql, []) {
3514        Ok(_) => Ok(()),
3515        Err(err) if column_was_added_by_race(conn, table, column, &err)? => Ok(()),
3516        Err(err) => Err(err.into()),
3517    }
3518}
3519
3520fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3521    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3522    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3523    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3524}
3525
3526fn column_was_added_by_race(
3527    conn: &Connection,
3528    table: &str,
3529    column: &str,
3530    err: &rusqlite::Error,
3531) -> Result<bool> {
3532    if !is_duplicate_column_error(err) {
3533        return Ok(false);
3534    }
3535    has_column(conn, table, column)
3536}
3537
3538fn is_duplicate_column_error(err: &rusqlite::Error) -> bool {
3539    matches!(
3540        err,
3541        rusqlite::Error::SqliteFailure(_, Some(message))
3542            if message.contains("duplicate column name")
3543    )
3544}
3545
3546fn bool_to_i64(v: bool) -> i64 {
3547    if v { 1 } else { 0 }
3548}
3549
3550fn i64_to_bool(v: i64) -> bool {
3551    v != 0
3552}
3553
3554fn empty_to_none(s: String) -> Option<String> {
3555    if s.is_empty() { None } else { Some(s) }
3556}
3557
3558#[cfg(test)]
3559mod tests {
3560    use super::*;
3561    use serde_json::json;
3562    use std::collections::HashSet;
3563    use tempfile::TempDir;
3564
3565    fn make_session(id: &str) -> SessionRecord {
3566        SessionRecord {
3567            id: id.to_string(),
3568            agent: "cursor".to_string(),
3569            model: None,
3570            workspace: "/ws".to_string(),
3571            started_at_ms: 1000,
3572            ended_at_ms: None,
3573            status: SessionStatus::Done,
3574            trace_path: "/trace".to_string(),
3575            start_commit: None,
3576            end_commit: None,
3577            branch: None,
3578            dirty_start: None,
3579            dirty_end: None,
3580            repo_binding_source: None,
3581            prompt_fingerprint: None,
3582            parent_session_id: None,
3583            agent_version: None,
3584            os: None,
3585            arch: None,
3586            repo_file_count: None,
3587            repo_total_loc: None,
3588        }
3589    }
3590
3591    fn make_event(session_id: &str, seq: u64) -> Event {
3592        Event {
3593            session_id: session_id.to_string(),
3594            seq,
3595            ts_ms: 1000 + seq * 100,
3596            ts_exact: false,
3597            kind: EventKind::ToolCall,
3598            source: EventSource::Tail,
3599            tool: Some("read_file".to_string()),
3600            tool_call_id: Some(format!("call_{seq}")),
3601            tokens_in: None,
3602            tokens_out: None,
3603            reasoning_tokens: None,
3604            cost_usd_e6: None,
3605            stop_reason: None,
3606            latency_ms: None,
3607            ttft_ms: None,
3608            retry_count: None,
3609            context_used_tokens: None,
3610            context_max_tokens: None,
3611            cache_creation_tokens: None,
3612            cache_read_tokens: None,
3613            system_prompt_tokens: None,
3614            payload: json!({}),
3615        }
3616    }
3617
3618    #[test]
3619    fn open_and_wal_mode() {
3620        let dir = TempDir::new().unwrap();
3621        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3622        let mode: String = store
3623            .conn
3624            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3625            .unwrap();
3626        assert_eq!(mode, "wal");
3627    }
3628
3629    #[test]
3630    fn open_applies_phase0_pragmas() {
3631        let dir = TempDir::new().unwrap();
3632        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3633        let synchronous: i64 = store
3634            .conn
3635            .query_row("PRAGMA synchronous", [], |r| r.get(0))
3636            .unwrap();
3637        let cache_size: i64 = store
3638            .conn
3639            .query_row("PRAGMA cache_size", [], |r| r.get(0))
3640            .unwrap();
3641        let temp_store: i64 = store
3642            .conn
3643            .query_row("PRAGMA temp_store", [], |r| r.get(0))
3644            .unwrap();
3645        let wal_autocheckpoint: i64 = store
3646            .conn
3647            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3648            .unwrap();
3649        assert_eq!(synchronous, 1);
3650        assert_eq!(cache_size, -65_536);
3651        assert_eq!(temp_store, 2);
3652        assert_eq!(wal_autocheckpoint, 1_000);
3653        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3654    }
3655
3656    #[test]
3657    fn ensure_column_tolerates_duplicate_from_race() {
3658        let conn = Connection::open_in_memory().unwrap();
3659        conn.execute_batch("CREATE TABLE sessions (id TEXT, start_commit TEXT)")
3660            .unwrap();
3661        let err = conn
3662            .execute("ALTER TABLE sessions ADD COLUMN start_commit TEXT", [])
3663            .unwrap_err();
3664        assert!(column_was_added_by_race(&conn, "sessions", "start_commit", &err).unwrap());
3665    }
3666
3667    #[test]
3668    fn read_only_open_sets_query_only() {
3669        let dir = TempDir::new().unwrap();
3670        let db = dir.path().join("kaizen.db");
3671        Store::open(&db).unwrap();
3672        let store = Store::open_read_only(&db).unwrap();
3673        let query_only: i64 = store
3674            .conn
3675            .query_row("PRAGMA query_only", [], |r| r.get(0))
3676            .unwrap();
3677        assert_eq!(query_only, 1);
3678    }
3679
3680    #[test]
3681    fn phase0_indexes_exist() {
3682        let dir = TempDir::new().unwrap();
3683        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3684        for name in [
3685            "tool_spans_session_idx",
3686            "tool_spans_started_idx",
3687            "session_samples_ts_idx",
3688            "events_ts_idx",
3689            "feedback_session_idx",
3690        ] {
3691            let found: i64 = store
3692                .conn
3693                .query_row(
3694                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3695                    params![name],
3696                    |r| r.get(0),
3697                )
3698                .unwrap();
3699            assert_eq!(found, 1, "{name}");
3700        }
3701    }
3702
3703    #[test]
3704    fn upsert_and_get_session() {
3705        let dir = TempDir::new().unwrap();
3706        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3707        let s = make_session("s1");
3708        store.upsert_session(&s).unwrap();
3709
3710        let got = store.get_session("s1").unwrap().unwrap();
3711        assert_eq!(got.id, "s1");
3712        assert_eq!(got.status, SessionStatus::Done);
3713    }
3714
3715    #[test]
3716    fn append_and_list_events_round_trip() {
3717        let dir = TempDir::new().unwrap();
3718        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3719        let s = make_session("s2");
3720        store.upsert_session(&s).unwrap();
3721        store.append_event(&make_event("s2", 0)).unwrap();
3722        store.append_event(&make_event("s2", 1)).unwrap();
3723
3724        let sessions = store.list_sessions("/ws").unwrap();
3725        assert_eq!(sessions.len(), 1);
3726        assert_eq!(sessions[0].id, "s2");
3727    }
3728
3729    #[test]
3730    fn list_sessions_page_orders_and_counts() {
3731        let dir = TempDir::new().unwrap();
3732        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3733        let mut a = make_session("a");
3734        a.started_at_ms = 2_000;
3735        let mut b = make_session("b");
3736        b.started_at_ms = 2_000;
3737        let mut c = make_session("c");
3738        c.started_at_ms = 1_000;
3739        store.upsert_session(&c).unwrap();
3740        store.upsert_session(&b).unwrap();
3741        store.upsert_session(&a).unwrap();
3742
3743        let page = store
3744            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3745            .unwrap();
3746        assert_eq!(page.total, 3);
3747        assert_eq!(page.next_offset, Some(2));
3748        assert_eq!(
3749            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3750            vec!["a", "b"]
3751        );
3752
3753        let all = store.list_sessions("/ws").unwrap();
3754        assert_eq!(
3755            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3756            vec!["a", "b", "c"]
3757        );
3758    }
3759
3760    #[test]
3761    fn list_sessions_page_filters_in_sql_shape() {
3762        let dir = TempDir::new().unwrap();
3763        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3764        let mut cursor = make_session("cursor");
3765        cursor.agent = "Cursor".into();
3766        cursor.started_at_ms = 2_000;
3767        cursor.status = SessionStatus::Running;
3768        let mut claude = make_session("claude");
3769        claude.agent = "claude".into();
3770        claude.started_at_ms = 3_000;
3771        store.upsert_session(&cursor).unwrap();
3772        store.upsert_session(&claude).unwrap();
3773
3774        let page = store
3775            .list_sessions_page(
3776                "/ws",
3777                0,
3778                10,
3779                SessionFilter {
3780                    agent_prefix: Some("cur".into()),
3781                    status: Some(SessionStatus::Running),
3782                    since_ms: Some(1_500),
3783                },
3784            )
3785            .unwrap();
3786        assert_eq!(page.total, 1);
3787        assert_eq!(page.rows[0].id, "cursor");
3788    }
3789
3790    #[test]
3791    fn incremental_session_helpers_find_new_rows_and_statuses() {
3792        let dir = TempDir::new().unwrap();
3793        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3794        let mut old = make_session("old");
3795        old.started_at_ms = 1_000;
3796        let mut new = make_session("new");
3797        new.started_at_ms = 2_000;
3798        new.status = SessionStatus::Running;
3799        store.upsert_session(&old).unwrap();
3800        store.upsert_session(&new).unwrap();
3801
3802        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3803        assert_eq!(rows.len(), 1);
3804        assert_eq!(rows[0].id, "new");
3805
3806        store
3807            .update_session_status("new", SessionStatus::Done)
3808            .unwrap();
3809        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3810        assert_eq!(statuses.len(), 1);
3811        assert_eq!(statuses[0].status, SessionStatus::Done);
3812    }
3813
3814    #[test]
3815    fn summary_stats_empty() {
3816        let dir = TempDir::new().unwrap();
3817        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3818        let stats = store.summary_stats("/ws").unwrap();
3819        assert_eq!(stats.session_count, 0);
3820        assert_eq!(stats.total_cost_usd_e6, 0);
3821    }
3822
3823    #[test]
3824    fn summary_stats_counts_sessions() {
3825        let dir = TempDir::new().unwrap();
3826        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3827        store.upsert_session(&make_session("a")).unwrap();
3828        store.upsert_session(&make_session("b")).unwrap();
3829        let stats = store.summary_stats("/ws").unwrap();
3830        assert_eq!(stats.session_count, 2);
3831        assert_eq!(stats.by_agent.len(), 1);
3832        assert_eq!(stats.by_agent[0].0, "cursor");
3833        assert_eq!(stats.by_agent[0].1, 2);
3834    }
3835
3836    #[test]
3837    fn list_events_for_session_round_trip() {
3838        let dir = TempDir::new().unwrap();
3839        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3840        store.upsert_session(&make_session("s4")).unwrap();
3841        store.append_event(&make_event("s4", 0)).unwrap();
3842        store.append_event(&make_event("s4", 1)).unwrap();
3843        let events = store.list_events_for_session("s4").unwrap();
3844        assert_eq!(events.len(), 2);
3845        assert_eq!(events[0].seq, 0);
3846        assert_eq!(events[1].seq, 1);
3847    }
3848
3849    #[test]
3850    fn list_events_page_uses_inclusive_seq_cursor() {
3851        let dir = TempDir::new().unwrap();
3852        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3853        store.upsert_session(&make_session("paged")).unwrap();
3854        for seq in 0..5 {
3855            store.append_event(&make_event("paged", seq)).unwrap();
3856        }
3857        let first = store.list_events_page("paged", 0, 2).unwrap();
3858        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3859        let second = store
3860            .list_events_page("paged", first[1].seq + 1, 2)
3861            .unwrap();
3862        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3863    }
3864
3865    #[test]
3866    fn append_event_dedup() {
3867        let dir = TempDir::new().unwrap();
3868        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3869        store.upsert_session(&make_session("s5")).unwrap();
3870        store.append_event(&make_event("s5", 0)).unwrap();
3871        // Duplicate — should be silently ignored
3872        store.append_event(&make_event("s5", 0)).unwrap();
3873        let events = store.list_events_for_session("s5").unwrap();
3874        assert_eq!(events.len(), 1);
3875    }
3876
3877    #[test]
3878    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3879        let dir = TempDir::new().unwrap();
3880        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3881        assert!(store.session_span_tree("missing").unwrap().is_empty());
3882        assert!(store.span_tree_cache.borrow().is_some());
3883
3884        store.upsert_session(&make_session("tree")).unwrap();
3885        let call = make_event("tree", 0);
3886        store.append_event(&call).unwrap();
3887        assert!(store.span_tree_cache.borrow().is_none());
3888        assert!(store.session_span_tree("tree").unwrap().is_empty());
3889        assert!(store.span_tree_cache.borrow().is_some());
3890        let mut result = make_event("tree", 1);
3891        result.kind = EventKind::ToolResult;
3892        result.tool_call_id = call.tool_call_id.clone();
3893        store.append_event(&result).unwrap();
3894        assert!(store.span_tree_cache.borrow().is_none());
3895        let first = store.session_span_tree("tree").unwrap();
3896        assert_eq!(first.len(), 1);
3897        assert!(store.span_tree_cache.borrow().is_some());
3898        store.append_event(&make_event("tree", 2)).unwrap();
3899        assert!(store.span_tree_cache.borrow().is_none());
3900    }
3901
3902    #[test]
3903    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3904        let dir = TempDir::new().unwrap();
3905        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3906        store.upsert_session(&make_session("spans")).unwrap();
3907        for (id, started, ended) in [
3908            ("started", Some(200_i64), None),
3909            ("fallback", None, Some(250_i64)),
3910            ("outside", Some(400_i64), None),
3911            ("too_old", None, Some(50_i64)),
3912            ("started_wins", Some(500_i64), Some(200_i64)),
3913        ] {
3914            store
3915                .conn
3916                .execute(
3917                    "INSERT INTO tool_spans
3918                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3919                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3920                    params![id, started, ended],
3921                )
3922                .unwrap();
3923        }
3924        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3925        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3926        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3927    }
3928
3929    #[test]
3930    fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3931        let dir = TempDir::new().unwrap();
3932        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3933        store.upsert_session(&make_session("s1")).unwrap();
3934        for (id, started, ended) in [
3935            ("inside_started", Some(150_i64), None),
3936            ("inside_ended_only", None, Some(220_i64)),
3937            ("after_window", Some(400_i64), None),
3938            ("before_window", None, Some(50_i64)),
3939        ] {
3940            store
3941                .conn
3942                .execute(
3943                    "INSERT INTO tool_spans
3944                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3945                     VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3946                    params![id, started, ended],
3947                )
3948                .unwrap();
3949        }
3950        let rows = store
3951            .tool_spans_sync_rows_in_window("/ws", 100, 300)
3952            .unwrap();
3953        let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
3954        assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
3955        assert!(rows.iter().all(|r| r.session_id == "s1"));
3956    }
3957
3958    #[test]
3959    fn upsert_idempotent() {
3960        let dir = TempDir::new().unwrap();
3961        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3962        let mut s = make_session("s3");
3963        store.upsert_session(&s).unwrap();
3964        s.status = SessionStatus::Running;
3965        store.upsert_session(&s).unwrap();
3966
3967        let got = store.get_session("s3").unwrap().unwrap();
3968        assert_eq!(got.status, SessionStatus::Running);
3969    }
3970
3971    #[test]
3972    fn append_event_indexes_path_from_payload() {
3973        let dir = TempDir::new().unwrap();
3974        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3975        store.upsert_session(&make_session("sx")).unwrap();
3976        let mut ev = make_event("sx", 0);
3977        ev.payload = json!({"input": {"path": "src/lib.rs"}});
3978        store.append_event(&ev).unwrap();
3979        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3980        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3981    }
3982
3983    #[test]
3984    fn update_session_status_changes_status() {
3985        let dir = TempDir::new().unwrap();
3986        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3987        store.upsert_session(&make_session("s6")).unwrap();
3988        store
3989            .update_session_status("s6", SessionStatus::Running)
3990            .unwrap();
3991        let got = store.get_session("s6").unwrap().unwrap();
3992        assert_eq!(got.status, SessionStatus::Running);
3993    }
3994
3995    #[test]
3996    fn prune_sessions_removes_old_rows_and_keeps_recent() {
3997        let dir = TempDir::new().unwrap();
3998        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3999        let mut old = make_session("old");
4000        old.started_at_ms = 1_000;
4001        let mut new = make_session("new");
4002        new.started_at_ms = 9_000_000_000_000;
4003        store.upsert_session(&old).unwrap();
4004        store.upsert_session(&new).unwrap();
4005        store.append_event(&make_event("old", 0)).unwrap();
4006
4007        let stats = store.prune_sessions_started_before(5_000).unwrap();
4008        assert_eq!(
4009            stats,
4010            PruneStats {
4011                sessions_removed: 1,
4012                events_removed: 1,
4013            }
4014        );
4015        assert!(store.get_session("old").unwrap().is_none());
4016        assert!(store.get_session("new").unwrap().is_some());
4017        let sessions = store.list_sessions("/ws").unwrap();
4018        assert_eq!(sessions.len(), 1);
4019        assert_eq!(sessions[0].id, "new");
4020    }
4021
4022    #[test]
4023    fn append_event_indexes_rules_from_payload() {
4024        let dir = TempDir::new().unwrap();
4025        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4026        store.upsert_session(&make_session("sr")).unwrap();
4027        let mut ev = make_event("sr", 0);
4028        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
4029        store.append_event(&ev).unwrap();
4030        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
4031        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
4032    }
4033
4034    #[test]
4035    fn guidance_report_counts_skill_and_rule_sessions() {
4036        let dir = TempDir::new().unwrap();
4037        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4038        store.upsert_session(&make_session("sx")).unwrap();
4039        let mut ev = make_event("sx", 0);
4040        ev.payload =
4041            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
4042        ev.cost_usd_e6 = Some(500_000);
4043        store.append_event(&ev).unwrap();
4044
4045        let mut skill_slugs = HashSet::new();
4046        skill_slugs.insert("tdd".into());
4047        let mut rule_slugs = HashSet::new();
4048        rule_slugs.insert("style".into());
4049
4050        let rep = store
4051            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
4052            .unwrap();
4053        assert_eq!(rep.sessions_in_window, 1);
4054        let tdd = rep
4055            .rows
4056            .iter()
4057            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
4058            .unwrap();
4059        assert_eq!(tdd.sessions, 1);
4060        assert!(tdd.on_disk);
4061        let style = rep
4062            .rows
4063            .iter()
4064            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
4065            .unwrap();
4066        assert_eq!(style.sessions, 1);
4067        assert!(style.on_disk);
4068    }
4069
4070    #[test]
4071    fn prune_sessions_removes_rules_used_rows() {
4072        let dir = TempDir::new().unwrap();
4073        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4074        let mut old = make_session("old_r");
4075        old.started_at_ms = 1_000;
4076        store.upsert_session(&old).unwrap();
4077        let mut ev = make_event("old_r", 0);
4078        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
4079        store.append_event(&ev).unwrap();
4080
4081        store.prune_sessions_started_before(5_000).unwrap();
4082        let n: i64 = store
4083            .conn
4084            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
4085            .unwrap();
4086        assert_eq!(n, 0);
4087    }
4088}