Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
9use crate::store::tool_span_index::{
10    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
11};
12use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
13use crate::sync::context::SyncIngestContext;
14use crate::sync::outbound::outbound_event_from_row;
15use crate::sync::redact::redact_payload;
16use crate::sync::smart::enqueue_tool_spans_for_session;
17use anyhow::{Context, Result};
18use rusqlite::types::Value;
19use rusqlite::{
20    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
21};
22use std::cell::RefCell;
23use std::collections::{HashMap, HashSet};
24use std::path::{Path, PathBuf};
25
26/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
27/// Rows below this use `sessions.started_at_ms` for time-window matching.
28const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
29const DEFAULT_MMAP_MB: u64 = 256;
30const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
31    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
32    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
33    repo_file_count, repo_total_loc FROM sessions";
34
35const MIGRATIONS: &[&str] = &[
36    "CREATE TABLE IF NOT EXISTS sessions (
37        id TEXT PRIMARY KEY,
38        agent TEXT NOT NULL,
39        model TEXT,
40        workspace TEXT NOT NULL,
41        started_at_ms INTEGER NOT NULL,
42        ended_at_ms INTEGER,
43        status TEXT NOT NULL,
44        trace_path TEXT NOT NULL
45    )",
46    "CREATE TABLE IF NOT EXISTS events (
47        id INTEGER PRIMARY KEY AUTOINCREMENT,
48        session_id TEXT NOT NULL,
49        seq INTEGER NOT NULL,
50        ts_ms INTEGER NOT NULL,
51        kind TEXT NOT NULL,
52        source TEXT NOT NULL,
53        tool TEXT,
54        tokens_in INTEGER,
55        tokens_out INTEGER,
56        cost_usd_e6 INTEGER,
57        payload TEXT NOT NULL
58    )",
59    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
60    "CREATE TABLE IF NOT EXISTS files_touched (
61        id INTEGER PRIMARY KEY AUTOINCREMENT,
62        session_id TEXT NOT NULL,
63        path TEXT NOT NULL
64    )",
65    "CREATE TABLE IF NOT EXISTS skills_used (
66        id INTEGER PRIMARY KEY AUTOINCREMENT,
67        session_id TEXT NOT NULL,
68        skill TEXT NOT NULL
69    )",
70    "CREATE TABLE IF NOT EXISTS sync_outbox (
71        id INTEGER PRIMARY KEY AUTOINCREMENT,
72        session_id TEXT NOT NULL,
73        payload TEXT NOT NULL,
74        sent INTEGER NOT NULL DEFAULT 0
75    )",
76    "CREATE TABLE IF NOT EXISTS experiments (
77        id TEXT PRIMARY KEY,
78        name TEXT NOT NULL,
79        created_at_ms INTEGER NOT NULL,
80        metadata TEXT NOT NULL DEFAULT '{}'
81    )",
82    "CREATE TABLE IF NOT EXISTS experiment_tags (
83        experiment_id TEXT NOT NULL,
84        session_id TEXT NOT NULL,
85        variant TEXT NOT NULL,
86        PRIMARY KEY (experiment_id, session_id)
87    )",
88    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
89    "CREATE TABLE IF NOT EXISTS sync_state (
90        k TEXT PRIMARY KEY,
91        v TEXT NOT NULL
92    )",
93    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
94    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
95    "CREATE TABLE IF NOT EXISTS tool_spans (
96        span_id TEXT PRIMARY KEY,
97        session_id TEXT NOT NULL,
98        tool TEXT,
99        tool_call_id TEXT,
100        status TEXT NOT NULL,
101        started_at_ms INTEGER,
102        ended_at_ms INTEGER,
103        lead_time_ms INTEGER,
104        tokens_in INTEGER,
105        tokens_out INTEGER,
106        reasoning_tokens INTEGER,
107        cost_usd_e6 INTEGER,
108        paths_json TEXT NOT NULL DEFAULT '[]'
109    )",
110    "CREATE TABLE IF NOT EXISTS tool_span_paths (
111        span_id TEXT NOT NULL,
112        path TEXT NOT NULL,
113        PRIMARY KEY (span_id, path)
114    )",
115    "CREATE TABLE IF NOT EXISTS session_repo_binding (
116        session_id TEXT PRIMARY KEY,
117        start_commit TEXT,
118        end_commit TEXT,
119        branch TEXT,
120        dirty_start INTEGER,
121        dirty_end INTEGER,
122        repo_binding_source TEXT NOT NULL DEFAULT ''
123    )",
124    "CREATE TABLE IF NOT EXISTS repo_snapshots (
125        id TEXT PRIMARY KEY,
126        workspace TEXT NOT NULL,
127        head_commit TEXT,
128        dirty_fingerprint TEXT NOT NULL,
129        analyzer_version TEXT NOT NULL,
130        indexed_at_ms INTEGER NOT NULL,
131        dirty INTEGER NOT NULL DEFAULT 0,
132        graph_path TEXT NOT NULL
133    )",
134    "CREATE TABLE IF NOT EXISTS file_facts (
135        snapshot_id TEXT NOT NULL,
136        path TEXT NOT NULL,
137        language TEXT NOT NULL,
138        bytes INTEGER NOT NULL,
139        loc INTEGER NOT NULL,
140        sloc INTEGER NOT NULL,
141        complexity_total INTEGER NOT NULL,
142        max_fn_complexity INTEGER NOT NULL,
143        symbol_count INTEGER NOT NULL,
144        import_count INTEGER NOT NULL,
145        fan_in INTEGER NOT NULL,
146        fan_out INTEGER NOT NULL,
147        churn_30d INTEGER NOT NULL,
148        churn_90d INTEGER NOT NULL,
149        authors_90d INTEGER NOT NULL,
150        last_changed_ms INTEGER,
151        PRIMARY KEY (snapshot_id, path)
152    )",
153    "CREATE TABLE IF NOT EXISTS repo_edges (
154        snapshot_id TEXT NOT NULL,
155        from_id TEXT NOT NULL,
156        to_id TEXT NOT NULL,
157        kind TEXT NOT NULL,
158        weight INTEGER NOT NULL,
159        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
160    )",
161    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
162    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
163    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
164    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
165    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
166        ON sessions(workspace, started_at_ms DESC, id ASC)",
167    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
168        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
169    "CREATE TABLE IF NOT EXISTS rules_used (
170        id INTEGER PRIMARY KEY AUTOINCREMENT,
171        session_id TEXT NOT NULL,
172        rule TEXT NOT NULL
173    )",
174    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
175    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
176    "CREATE TABLE IF NOT EXISTS remote_pull_state (
177        id INTEGER PRIMARY KEY CHECK (id = 1),
178        query_provider TEXT NOT NULL DEFAULT 'none',
179        cursor_json TEXT NOT NULL DEFAULT '',
180        last_success_ms INTEGER
181    )",
182    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
183    "CREATE TABLE IF NOT EXISTS remote_sessions (
184        team_id TEXT NOT NULL,
185        workspace_hash TEXT NOT NULL,
186        session_id_hash TEXT NOT NULL,
187        json TEXT NOT NULL,
188        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
189    )",
190    "CREATE TABLE IF NOT EXISTS remote_events (
191        team_id TEXT NOT NULL,
192        workspace_hash TEXT NOT NULL,
193        session_id_hash TEXT NOT NULL,
194        event_seq INTEGER NOT NULL,
195        json TEXT NOT NULL,
196        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
197    )",
198    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
199        team_id TEXT NOT NULL,
200        workspace_hash TEXT NOT NULL,
201        span_id_hash TEXT NOT NULL,
202        json TEXT NOT NULL,
203        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
204    )",
205    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
206        team_id TEXT NOT NULL,
207        workspace_hash TEXT NOT NULL,
208        snapshot_id_hash TEXT NOT NULL,
209        chunk_index INTEGER NOT NULL,
210        json TEXT NOT NULL,
211        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
212    )",
213    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
214        team_id TEXT NOT NULL,
215        workspace_hash TEXT NOT NULL,
216        fact_key TEXT NOT NULL,
217        json TEXT NOT NULL,
218        PRIMARY KEY (team_id, workspace_hash, fact_key)
219    )",
220    "CREATE TABLE IF NOT EXISTS session_evals (
221        id            TEXT    PRIMARY KEY,
222        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
223        judge_model   TEXT    NOT NULL,
224        rubric_id     TEXT    NOT NULL,
225        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
226        rationale     TEXT    NOT NULL,
227        flagged       INTEGER NOT NULL DEFAULT 0,
228        created_at_ms INTEGER NOT NULL
229    );
230    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
231    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
232    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
233        fingerprint   TEXT    PRIMARY KEY,
234        captured_at_ms INTEGER NOT NULL,
235        files_json    TEXT    NOT NULL,
236        total_bytes   INTEGER NOT NULL
237    )",
238    "CREATE TABLE IF NOT EXISTS session_feedback (
239        id TEXT PRIMARY KEY,
240        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
241        score INTEGER CHECK(score BETWEEN 1 AND 5),
242        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
243        note TEXT,
244        created_at_ms INTEGER NOT NULL
245    );
246    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
247    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
248    "CREATE TABLE IF NOT EXISTS session_outcomes (
249        session_id TEXT PRIMARY KEY NOT NULL,
250        test_passed INTEGER,
251        test_failed INTEGER,
252        test_skipped INTEGER,
253        build_ok INTEGER,
254        lint_errors INTEGER,
255        revert_lines_14d INTEGER,
256        pr_open INTEGER,
257        ci_ok INTEGER,
258        measured_at_ms INTEGER NOT NULL,
259        measure_error TEXT
260    )",
261    "CREATE TABLE IF NOT EXISTS session_samples (
262        session_id TEXT NOT NULL,
263        ts_ms INTEGER NOT NULL,
264        pid INTEGER NOT NULL,
265        cpu_percent REAL,
266        rss_bytes INTEGER,
267        PRIMARY KEY (session_id, ts_ms, pid)
268    )",
269    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
270    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
271    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
272    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
273    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
274    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
275    "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
276    "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
277    "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
278    "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
279    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
280];
281
282/// Per-workspace activity dashboard stats.
283#[derive(Clone)]
284pub struct InsightsStats {
285    pub total_sessions: u64,
286    pub running_sessions: u64,
287    pub total_events: u64,
288    /// (day label e.g. "Mon", count) last 7 days oldest first
289    pub sessions_by_day: Vec<(String, u64)>,
290    /// Recent sessions DESC by started_at, max 3; paired with event count
291    pub recent: Vec<(SessionRecord, u64)>,
292    /// Top tools by event count, max 5
293    pub top_tools: Vec<(String, u64)>,
294    pub total_cost_usd_e6: i64,
295    pub sessions_with_cost: u64,
296}
297
298/// Sync daemon / outbox status for `kaizen sync status`.
299pub struct SyncStatusSnapshot {
300    pub pending_outbox: u64,
301    pub last_success_ms: Option<u64>,
302    pub last_error: Option<String>,
303    pub consecutive_failures: u32,
304}
305
306/// Aggregate stats across sessions + events for a workspace.
307#[derive(serde::Serialize)]
308pub struct SummaryStats {
309    pub session_count: u64,
310    pub total_cost_usd_e6: i64,
311    pub by_agent: Vec<(String, u64)>,
312    pub by_model: Vec<(String, u64)>,
313    pub top_tools: Vec<(String, u64)>,
314}
315
316/// Skill vs Cursor rule for [`GuidancePerfRow`].
317#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
318#[serde(rename_all = "lowercase")]
319pub enum GuidanceKind {
320    Skill,
321    Rule,
322}
323
324/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
325#[derive(Clone, Debug, serde::Serialize)]
326pub struct GuidancePerfRow {
327    pub kind: GuidanceKind,
328    pub id: String,
329    pub sessions: u64,
330    pub sessions_pct: f64,
331    pub total_cost_usd_e6: i64,
332    pub avg_cost_per_session_usd: Option<f64>,
333    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
334    pub on_disk: bool,
335}
336
337/// Aggregated skill/rule adoption and cost proxy for a time window.
338#[derive(Clone, Debug, serde::Serialize)]
339pub struct GuidanceReport {
340    pub workspace: String,
341    pub window_start_ms: u64,
342    pub window_end_ms: u64,
343    pub sessions_in_window: u64,
344    pub workspace_avg_cost_per_session_usd: Option<f64>,
345    pub rows: Vec<GuidancePerfRow>,
346}
347
348/// Result of [`Store::prune_sessions_started_before`].
349#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
350pub struct PruneStats {
351    pub sessions_removed: u64,
352    pub events_removed: u64,
353}
354
355/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
356#[derive(Debug, Clone, Eq, PartialEq)]
357pub struct SessionOutcomeRow {
358    pub session_id: String,
359    pub test_passed: Option<i64>,
360    pub test_failed: Option<i64>,
361    pub test_skipped: Option<i64>,
362    pub build_ok: Option<bool>,
363    pub lint_errors: Option<i64>,
364    pub revert_lines_14d: Option<i64>,
365    pub pr_open: Option<i64>,
366    pub ci_ok: Option<bool>,
367    pub measured_at_ms: u64,
368    pub measure_error: Option<String>,
369}
370
371/// Aggregated process samples for retro (Tier D).
372#[derive(Debug, Clone)]
373pub struct SessionSampleAgg {
374    pub session_id: String,
375    pub sample_count: u64,
376    pub max_cpu_percent: f64,
377    pub max_rss_bytes: u64,
378}
379
380/// `sync_state` keys for agent rescan throttling and auto-prune.
381pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
382pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
383pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
384
385pub struct ToolSpanSyncRow {
386    pub span_id: String,
387    pub session_id: String,
388    pub tool: Option<String>,
389    pub tool_call_id: Option<String>,
390    pub status: String,
391    pub started_at_ms: Option<u64>,
392    pub ended_at_ms: Option<u64>,
393    pub lead_time_ms: Option<u64>,
394    pub tokens_in: Option<u32>,
395    pub tokens_out: Option<u32>,
396    pub reasoning_tokens: Option<u32>,
397    pub cost_usd_e6: Option<i64>,
398    pub paths: Vec<String>,
399}
400
401#[derive(Debug, Clone, Copy, Eq, PartialEq)]
402pub enum StoreOpenMode {
403    ReadWrite,
404    ReadOnlyQuery,
405}
406
407#[derive(Debug, Clone)]
408pub struct SessionStatusRow {
409    pub id: String,
410    pub status: SessionStatus,
411    pub ended_at_ms: Option<u64>,
412}
413
414#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
415pub struct SessionFilter {
416    pub agent_prefix: Option<String>,
417    pub status: Option<SessionStatus>,
418    pub since_ms: Option<u64>,
419}
420
421#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
422pub struct SessionPage {
423    pub rows: Vec<SessionRecord>,
424    pub total: usize,
425    pub next_offset: Option<usize>,
426}
427
428#[derive(Clone)]
429struct SpanTreeCacheEntry {
430    session_id: String,
431    last_event_seq: Option<u64>,
432    nodes: Vec<crate::store::span_tree::SpanNode>,
433}
434
435pub struct Store {
436    conn: Connection,
437    root: PathBuf,
438    hot_log: RefCell<Option<HotLog>>,
439    search_writer: RefCell<Option<crate::search::PendingWriter>>,
440    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
441    projector: RefCell<Projector>,
442}
443
444impl Store {
445    pub(crate) fn conn(&self) -> &Connection {
446        &self.conn
447    }
448
449    pub fn open(path: &Path) -> Result<Self> {
450        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
451    }
452
453    pub fn open_read_only(path: &Path) -> Result<Self> {
454        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
455    }
456
457    pub fn open_query(path: &Path) -> Result<Self> {
458        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
459    }
460
461    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
462        if let Some(parent) = path.parent() {
463            std::fs::create_dir_all(parent)?;
464        }
465        let conn = match mode {
466            StoreOpenMode::ReadWrite => Connection::open(path),
467            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
468                path,
469                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
470            ),
471        }
472        .with_context(|| format!("open db: {}", path.display()))?;
473        apply_pragmas(&conn, mode)?;
474        if mode == StoreOpenMode::ReadWrite {
475            for sql in MIGRATIONS {
476                conn.execute_batch(sql)?;
477            }
478            ensure_schema_columns(&conn)?;
479        }
480        let store = Self {
481            conn,
482            root: path
483                .parent()
484                .unwrap_or_else(|| Path::new("."))
485                .to_path_buf(),
486            hot_log: RefCell::new(None),
487            search_writer: RefCell::new(None),
488            span_tree_cache: RefCell::new(None),
489            projector: RefCell::new(Projector::default()),
490        };
491        if mode == StoreOpenMode::ReadWrite {
492            store.warm_projector()?;
493        }
494        Ok(store)
495    }
496
497    fn invalidate_span_tree_cache(&self) {
498        *self.span_tree_cache.borrow_mut() = None;
499    }
500
501    fn warm_projector(&self) -> Result<()> {
502        let ids = self.running_session_ids()?;
503        let mut projector = self.projector.borrow_mut();
504        for id in ids {
505            for event in self.list_events_for_session(&id)? {
506                let _ = projector.apply(&event);
507            }
508        }
509        Ok(())
510    }
511
512    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
513        self.conn.execute(
514            "INSERT INTO sessions (
515                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
516                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
517                prompt_fingerprint, parent_session_id, agent_version, os, arch,
518                repo_file_count, repo_total_loc
519             )
520             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
521                ?16, ?17, ?18, ?19, ?20, ?21)
522             ON CONFLICT(id) DO UPDATE SET
523               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
524               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
525               status=excluded.status, trace_path=excluded.trace_path,
526               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
527               branch=excluded.branch, dirty_start=excluded.dirty_start,
528               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
529               prompt_fingerprint=excluded.prompt_fingerprint,
530               parent_session_id=excluded.parent_session_id,
531               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
532               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
533            params![
534                s.id,
535                s.agent,
536                s.model,
537                s.workspace,
538                s.started_at_ms as i64,
539                s.ended_at_ms.map(|v| v as i64),
540                format!("{:?}", s.status),
541                s.trace_path,
542                s.start_commit,
543                s.end_commit,
544                s.branch,
545                s.dirty_start.map(bool_to_i64),
546                s.dirty_end.map(bool_to_i64),
547                s.repo_binding_source.clone().unwrap_or_default(),
548                s.prompt_fingerprint.as_deref(),
549                s.parent_session_id.as_deref(),
550                s.agent_version.as_deref(),
551                s.os.as_deref(),
552                s.arch.as_deref(),
553                s.repo_file_count.map(|v| v as i64),
554                s.repo_total_loc.map(|v| v as i64),
555            ],
556        )?;
557        self.conn.execute(
558            "INSERT INTO session_repo_binding (
559                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
560             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
561             ON CONFLICT(session_id) DO UPDATE SET
562                start_commit=excluded.start_commit,
563                end_commit=excluded.end_commit,
564                branch=excluded.branch,
565                dirty_start=excluded.dirty_start,
566                dirty_end=excluded.dirty_end,
567                repo_binding_source=excluded.repo_binding_source",
568            params![
569                s.id,
570                s.start_commit,
571                s.end_commit,
572                s.branch,
573                s.dirty_start.map(bool_to_i64),
574                s.dirty_end.map(bool_to_i64),
575                s.repo_binding_source.clone().unwrap_or_default(),
576            ],
577        )?;
578        Ok(())
579    }
580
581    /// Insert a minimal session row if none exists. Used by hook ingestion when
582    /// the first observed event is not `SessionStart` (hooks installed mid-session).
583    pub fn ensure_session_stub(
584        &self,
585        id: &str,
586        agent: &str,
587        workspace: &str,
588        started_at_ms: u64,
589    ) -> Result<()> {
590        self.conn.execute(
591            "INSERT OR IGNORE INTO sessions (
592                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
593                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
594                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
595             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
596                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
597            params![id, agent, workspace, started_at_ms as i64],
598        )?;
599        Ok(())
600    }
601
602    /// Next `seq` for a new event in this session (0 when there are no events yet).
603    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
604        let n: i64 = self.conn.query_row(
605            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
606            [session_id],
607            |r| r.get(0),
608        )?;
609        Ok(n as u64)
610    }
611
612    pub fn append_event(&self, e: &Event) -> Result<()> {
613        self.append_event_with_sync(e, None)
614    }
615
616    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
617    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
618        let last_before = if projector_legacy_mode() {
619            None
620        } else {
621            self.last_event_seq_for_session(&e.session_id)?
622        };
623        let payload = serde_json::to_string(&e.payload)?;
624        self.conn.execute(
625            "INSERT INTO events (
626                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
627                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
628                stop_reason, latency_ms, ttft_ms, retry_count,
629                context_used_tokens, context_max_tokens,
630                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
631             ) VALUES (
632                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
633                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
634             )
635             ON CONFLICT(session_id, seq) DO UPDATE SET
636                ts_ms = excluded.ts_ms,
637                ts_exact = excluded.ts_exact,
638                kind = excluded.kind,
639                source = excluded.source,
640                tool = excluded.tool,
641                tool_call_id = excluded.tool_call_id,
642                tokens_in = excluded.tokens_in,
643                tokens_out = excluded.tokens_out,
644                reasoning_tokens = excluded.reasoning_tokens,
645                cost_usd_e6 = excluded.cost_usd_e6,
646                payload = excluded.payload,
647                stop_reason = excluded.stop_reason,
648                latency_ms = excluded.latency_ms,
649                ttft_ms = excluded.ttft_ms,
650                retry_count = excluded.retry_count,
651                context_used_tokens = excluded.context_used_tokens,
652                context_max_tokens = excluded.context_max_tokens,
653                cache_creation_tokens = excluded.cache_creation_tokens,
654                cache_read_tokens = excluded.cache_read_tokens,
655                system_prompt_tokens = excluded.system_prompt_tokens",
656            params![
657                e.session_id,
658                e.seq as i64,
659                e.ts_ms as i64,
660                bool_to_i64(e.ts_exact),
661                format!("{:?}", e.kind),
662                format!("{:?}", e.source),
663                e.tool,
664                e.tool_call_id,
665                e.tokens_in.map(|v| v as i64),
666                e.tokens_out.map(|v| v as i64),
667                e.reasoning_tokens.map(|v| v as i64),
668                e.cost_usd_e6,
669                payload,
670                e.stop_reason,
671                e.latency_ms.map(|v| v as i64),
672                e.ttft_ms.map(|v| v as i64),
673                e.retry_count.map(|v| v as i64),
674                e.context_used_tokens.map(|v| v as i64),
675                e.context_max_tokens.map(|v| v as i64),
676                e.cache_creation_tokens.map(|v| v as i64),
677                e.cache_read_tokens.map(|v| v as i64),
678                e.system_prompt_tokens.map(|v| v as i64),
679            ],
680        )?;
681        if self.conn.changes() == 0 {
682            return Ok(());
683        }
684        self.append_hot_event(e)?;
685        if projector_legacy_mode() {
686            index_event_derived(&self.conn, e)?;
687            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
688            self.invalidate_span_tree_cache();
689        } else if last_before.is_some_and(|last| e.seq <= last) {
690            self.replay_projector_session(&e.session_id)?;
691        } else {
692            let deltas = self.projector.borrow_mut().apply(e);
693            self.apply_projector_events(&deltas)?;
694            let expired = self
695                .projector
696                .borrow_mut()
697                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
698            self.apply_projector_events(&expired)?;
699            if is_stop_event(e) {
700                let flushed = self
701                    .projector
702                    .borrow_mut()
703                    .flush_session(&e.session_id, e.ts_ms);
704                self.apply_projector_events(&flushed)?;
705            }
706            self.invalidate_span_tree_cache();
707        }
708        self.append_search_event(e);
709        let Some(ctx) = ctx else {
710            return Ok(());
711        };
712        let sync = &ctx.sync;
713        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
714            return Ok(());
715        }
716        let Some(salt) = try_team_salt(sync) else {
717            tracing::warn!(
718                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
719            );
720            return Ok(());
721        };
722        if sync.sample_rate < 1.0 {
723            let u: f64 = rand::random();
724            if u > sync.sample_rate {
725                return Ok(());
726            }
727        }
728        let Some(session) = self.get_session(&e.session_id)? else {
729            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
730            return Ok(());
731        };
732        let mut outbound = outbound_event_from_row(e, &session, &salt);
733        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
734        let row = serde_json::to_string(&outbound)?;
735        self.outbox()?.append(&e.session_id, "events", &row)?;
736        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
737        Ok(())
738    }
739
740    fn append_hot_event(&self, e: &Event) -> Result<()> {
741        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
742            return Ok(());
743        }
744        let mut slot = self.hot_log.borrow_mut();
745        if slot.is_none() {
746            *slot = Some(HotLog::open(&self.root)?);
747        }
748        if let Some(log) = slot.as_mut() {
749            log.append(e)?;
750        }
751        Ok(())
752    }
753
754    fn append_search_event(&self, e: &Event) {
755        if let Err(err) = self.try_append_search_event(e) {
756            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
757            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
758        }
759    }
760
761    fn try_append_search_event(&self, e: &Event) -> Result<()> {
762        let Some(session) = self.get_session(&e.session_id)? else {
763            return Ok(());
764        };
765        let workspace = PathBuf::from(&session.workspace);
766        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
767        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
768        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
769            return Ok(());
770        };
771        let mut slot = self.search_writer.borrow_mut();
772        if slot.is_none() {
773            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
774        }
775        slot.as_mut().expect("writer").add(&doc)
776    }
777
778    pub fn flush_search(&self) -> Result<()> {
779        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
780            writer.commit()?;
781        }
782        Ok(())
783    }
784
785    fn outbox(&self) -> Result<Outbox> {
786        Outbox::open(&self.root)
787    }
788
789    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
790        if projector_legacy_mode() {
791            rebuild_tool_spans_for_session(&self.conn, session_id)?;
792            self.invalidate_span_tree_cache();
793            return Ok(());
794        }
795        let deltas = self
796            .projector
797            .borrow_mut()
798            .flush_session(session_id, now_ms);
799        if self.apply_projector_events(&deltas)? {
800            self.invalidate_span_tree_cache();
801        }
802        Ok(())
803    }
804
805    fn replay_projector_session(&self, session_id: &str) -> Result<()> {
806        clear_session_spans(&self.conn, session_id)?;
807        self.projector.borrow_mut().reset_session(session_id);
808        let events = self.list_events_for_session(session_id)?;
809        let mut changed = false;
810        for event in &events {
811            let deltas = self.projector.borrow_mut().apply(event);
812            changed |= self.apply_projector_events(&deltas)?;
813        }
814        if self
815            .get_session(session_id)?
816            .is_some_and(|session| session.status == SessionStatus::Done)
817        {
818            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
819            let deltas = self
820                .projector
821                .borrow_mut()
822                .flush_session(session_id, now_ms);
823            changed |= self.apply_projector_events(&deltas)?;
824        }
825        if changed {
826            self.invalidate_span_tree_cache();
827        }
828        Ok(())
829    }
830
831    fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
832        let mut changed = false;
833        for delta in deltas {
834            match delta {
835                ProjectorEvent::SpanClosed(span, sample) => {
836                    upsert_tool_span_record(&self.conn, span)?;
837                    tracing::debug!(
838                        session_id = %sample.session_id,
839                        span_id = %sample.span_id,
840                        tool = ?sample.tool,
841                        lead_time_ms = ?sample.lead_time_ms,
842                        tokens_in = ?sample.tokens_in,
843                        tokens_out = ?sample.tokens_out,
844                        reasoning_tokens = ?sample.reasoning_tokens,
845                        cost_usd_e6 = ?sample.cost_usd_e6,
846                        paths = ?sample.paths,
847                        "tool span closed"
848                    );
849                    changed = true;
850                }
851                ProjectorEvent::SpanPatched(span) => {
852                    upsert_tool_span_record(&self.conn, span)?;
853                    changed = true;
854                }
855                ProjectorEvent::FileTouched { session, path } => {
856                    self.conn.execute(
857                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
858                        params![session, path],
859                    )?;
860                    changed = true;
861                }
862                ProjectorEvent::SkillUsed { session, skill } => {
863                    self.conn.execute(
864                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
865                        params![session, skill],
866                    )?;
867                    changed = true;
868                }
869                ProjectorEvent::RuleUsed { session, rule } => {
870                    self.conn.execute(
871                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
872                        params![session, rule],
873                    )?;
874                    changed = true;
875                }
876            }
877        }
878        Ok(changed)
879    }
880
881    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
882        let rows = self.outbox()?.list_pending(limit)?;
883        if !rows.is_empty() {
884            return Ok(rows);
885        }
886        let mut stmt = self.conn.prepare(
887            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
888        )?;
889        let rows = stmt.query_map(params![limit as i64], |row| {
890            Ok((
891                row.get::<_, i64>(0)?,
892                row.get::<_, String>(1)?,
893                row.get::<_, String>(2)?,
894            ))
895        })?;
896        let mut out = Vec::new();
897        for r in rows {
898            out.push(r?);
899        }
900        Ok(out)
901    }
902
903    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
904        self.outbox()?.delete_ids(ids)?;
905        for id in ids {
906            self.conn
907                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
908        }
909        Ok(())
910    }
911
912    pub fn replace_outbox_rows(
913        &self,
914        owner_id: &str,
915        kind: &str,
916        payloads: &[String],
917    ) -> Result<()> {
918        self.outbox()?.replace(owner_id, kind, payloads)?;
919        self.conn.execute(
920            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
921            params![owner_id, kind],
922        )?;
923        for payload in payloads {
924            self.conn.execute(
925                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
926                params![owner_id, kind, payload],
927            )?;
928        }
929        Ok(())
930    }
931
932    pub fn outbox_pending_count(&self) -> Result<u64> {
933        let redb = self.outbox()?.pending_count()?;
934        if redb > 0 {
935            return Ok(redb);
936        }
937        let c: i64 =
938            self.conn
939                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
940                    r.get(0)
941                })?;
942        Ok(c as u64)
943    }
944
945    pub fn set_sync_state_ok(&self) -> Result<()> {
946        let now = now_ms().to_string();
947        self.conn.execute(
948            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
949             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
950            params![now],
951        )?;
952        self.conn.execute(
953            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
954             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
955            [],
956        )?;
957        self.conn
958            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
959        Ok(())
960    }
961
962    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
963        let prev: i64 = self
964            .conn
965            .query_row(
966                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
967                [],
968                |r| {
969                    let s: String = r.get(0)?;
970                    Ok(s.parse::<i64>().unwrap_or(0))
971                },
972            )
973            .optional()?
974            .unwrap_or(0);
975        let next = prev.saturating_add(1);
976        self.conn.execute(
977            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
978             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
979            params![msg],
980        )?;
981        self.conn.execute(
982            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
983             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
984            params![next.to_string()],
985        )?;
986        Ok(())
987    }
988
989    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
990        let pending_outbox = self.outbox_pending_count()?;
991        let last_success_ms = self
992            .conn
993            .query_row(
994                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
995                [],
996                |r| r.get::<_, String>(0),
997            )
998            .optional()?
999            .and_then(|s| s.parse().ok());
1000        let last_error = self
1001            .conn
1002            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1003                r.get::<_, String>(0)
1004            })
1005            .optional()?;
1006        let consecutive_failures = self
1007            .conn
1008            .query_row(
1009                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1010                [],
1011                |r| r.get::<_, String>(0),
1012            )
1013            .optional()?
1014            .and_then(|s| s.parse().ok())
1015            .unwrap_or(0);
1016        Ok(SyncStatusSnapshot {
1017            pending_outbox,
1018            last_success_ms,
1019            last_error,
1020            consecutive_failures,
1021        })
1022    }
1023
1024    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1025        let row: Option<String> = self
1026            .conn
1027            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1028                r.get::<_, String>(0)
1029            })
1030            .optional()?;
1031        Ok(row.and_then(|s| s.parse().ok()))
1032    }
1033
1034    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1035        self.conn.execute(
1036            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1037             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1038            params![key, v.to_string()],
1039        )?;
1040        Ok(())
1041    }
1042
1043    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
1044    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1045        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1046        let old_ids = old_session_ids(&tx, cutoff_ms)?;
1047        let sessions_to_remove: i64 = tx.query_row(
1048            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1049            params![cutoff_ms],
1050            |r| r.get(0),
1051        )?;
1052        let events_to_remove: i64 = tx.query_row(
1053            "SELECT COUNT(*) FROM events WHERE session_id IN \
1054             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1055            params![cutoff_ms],
1056            |r| r.get(0),
1057        )?;
1058
1059        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1060        tx.execute(
1061            &format!(
1062                "DELETE FROM tool_span_paths WHERE span_id IN \
1063                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1064            ),
1065            params![cutoff_ms],
1066        )?;
1067        tx.execute(
1068            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1069            params![cutoff_ms],
1070        )?;
1071        tx.execute(
1072            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1073            params![cutoff_ms],
1074        )?;
1075        tx.execute(
1076            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1077            params![cutoff_ms],
1078        )?;
1079        tx.execute(
1080            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1081            params![cutoff_ms],
1082        )?;
1083        tx.execute(
1084            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1085            params![cutoff_ms],
1086        )?;
1087        tx.execute(
1088            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1089            params![cutoff_ms],
1090        )?;
1091        tx.execute(
1092            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1093            params![cutoff_ms],
1094        )?;
1095        tx.execute(
1096            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1097            params![cutoff_ms],
1098        )?;
1099        tx.execute(
1100            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1101            params![cutoff_ms],
1102        )?;
1103        tx.execute(
1104            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1105            params![cutoff_ms],
1106        )?;
1107        tx.execute(
1108            "DELETE FROM sessions WHERE started_at_ms < ?1",
1109            params![cutoff_ms],
1110        )?;
1111        tx.commit()?;
1112        if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1113            let _ = writer.commit();
1114        }
1115        if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1116            tracing::warn!("search prune skipped: {err:#}");
1117            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1118        }
1119        self.invalidate_span_tree_cache();
1120        Ok(PruneStats {
1121            sessions_removed: sessions_to_remove as u64,
1122            events_removed: events_to_remove as u64,
1123        })
1124    }
1125
1126    /// Reclaim file space after large deletes (exclusive lock; can be slow).
1127    pub fn vacuum(&self) -> Result<()> {
1128        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1129        Ok(())
1130    }
1131
1132    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1133        Ok(self
1134            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1135            .rows)
1136    }
1137
1138    pub fn list_sessions_page(
1139        &self,
1140        workspace: &str,
1141        offset: usize,
1142        limit: usize,
1143        filter: SessionFilter,
1144    ) -> Result<SessionPage> {
1145        let (where_sql, args) = session_filter_sql(workspace, &filter);
1146        let total = self.query_session_page_count(&where_sql, &args)?;
1147        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1148        let next = offset.saturating_add(rows.len());
1149        Ok(SessionPage {
1150            rows,
1151            total,
1152            next_offset: (next < total).then_some(next),
1153        })
1154    }
1155
1156    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1157        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1158        let total: i64 = self
1159            .conn
1160            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1161        Ok(total as usize)
1162    }
1163
1164    fn query_session_page_rows(
1165        &self,
1166        where_sql: &str,
1167        args: &[Value],
1168        offset: usize,
1169        limit: usize,
1170    ) -> Result<Vec<SessionRecord>> {
1171        let sql = format!(
1172            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1173        );
1174        let mut values = args.to_vec();
1175        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1176        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1177        let mut stmt = self.conn.prepare(&sql)?;
1178        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1179        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1180    }
1181
1182    pub fn list_sessions_started_after(
1183        &self,
1184        workspace: &str,
1185        after_started_at_ms: u64,
1186    ) -> Result<Vec<SessionRecord>> {
1187        let mut stmt = self.conn.prepare(
1188            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1189                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1190                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1191                    repo_file_count, repo_total_loc
1192             FROM sessions
1193             WHERE workspace = ?1 AND started_at_ms > ?2
1194             ORDER BY started_at_ms DESC, id ASC",
1195        )?;
1196        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1197        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1198    }
1199
1200    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1201        if ids.is_empty() {
1202            return Ok(Vec::new());
1203        }
1204        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1205        let sql =
1206            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1207        let mut stmt = self.conn.prepare(&sql)?;
1208        let params: Vec<&dyn rusqlite::ToSql> =
1209            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1210        let rows = stmt.query_map(params.as_slice(), |r| {
1211            let status: String = r.get(1)?;
1212            Ok(SessionStatusRow {
1213                id: r.get(0)?,
1214                status: status_from_str(&status),
1215                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1216            })
1217        })?;
1218        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1219    }
1220
1221    fn running_session_ids(&self) -> Result<Vec<String>> {
1222        let mut stmt = self
1223            .conn
1224            .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1225        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1226        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1227    }
1228
1229    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1230        let session_count: i64 = self.conn.query_row(
1231            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1232            params![workspace],
1233            |r| r.get(0),
1234        )?;
1235
1236        let total_cost: i64 = self.conn.query_row(
1237            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1238             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1239            params![workspace],
1240            |r| r.get(0),
1241        )?;
1242
1243        let mut stmt = self.conn.prepare(
1244            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1245        )?;
1246        let by_agent: Vec<(String, u64)> = stmt
1247            .query_map(params![workspace], |r| {
1248                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1249            })?
1250            .filter_map(|r| r.ok())
1251            .collect();
1252
1253        let mut stmt = self.conn.prepare(
1254            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1255        )?;
1256        let by_model: Vec<(String, u64)> = stmt
1257            .query_map(params![workspace], |r| {
1258                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1259            })?
1260            .filter_map(|r| r.ok())
1261            .collect();
1262
1263        let mut stmt = self.conn.prepare(
1264            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1265             WHERE s.workspace = ?1 AND tool IS NOT NULL
1266             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1267        )?;
1268        let top_tools: Vec<(String, u64)> = stmt
1269            .query_map(params![workspace], |r| {
1270                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1271            })?
1272            .filter_map(|r| r.ok())
1273            .collect();
1274
1275        Ok(SummaryStats {
1276            session_count: session_count as u64,
1277            total_cost_usd_e6: total_cost,
1278            by_agent,
1279            by_model,
1280            top_tools,
1281        })
1282    }
1283
1284    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1285        self.list_events_page(session_id, 0, i64::MAX as usize)
1286    }
1287
1288    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1289        let mut stmt = self.conn.prepare(
1290            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1291                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1292                    stop_reason, latency_ms, ttft_ms, retry_count,
1293                    context_used_tokens, context_max_tokens,
1294                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1295             FROM events WHERE session_id = ?1 AND seq = ?2",
1296        )?;
1297        stmt.query_row(params![session_id, seq as i64], event_row)
1298            .optional()
1299            .map_err(Into::into)
1300    }
1301
1302    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1303        let mut out = Vec::new();
1304        for session in self.list_sessions(workspace)? {
1305            for event in self.list_events_for_session(&session.id)? {
1306                out.push((session.clone(), event));
1307            }
1308        }
1309        out.sort_by(|a, b| {
1310            (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1311        });
1312        Ok(out)
1313    }
1314
1315    pub fn list_events_page(
1316        &self,
1317        session_id: &str,
1318        after_seq: u64,
1319        limit: usize,
1320    ) -> Result<Vec<Event>> {
1321        let mut stmt = self.conn.prepare(
1322            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1323                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1324                    stop_reason, latency_ms, ttft_ms, retry_count,
1325                    context_used_tokens, context_max_tokens,
1326                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1327             FROM events
1328             WHERE session_id = ?1 AND seq >= ?2
1329             ORDER BY seq ASC LIMIT ?3",
1330        )?;
1331        let rows = stmt.query_map(
1332            params![
1333                session_id,
1334                after_seq as i64,
1335                limit.min(i64::MAX as usize) as i64
1336            ],
1337            event_row,
1338        )?;
1339        let mut events = Vec::new();
1340        for row in rows {
1341            events.push(row?);
1342        }
1343        Ok(events)
1344    }
1345
1346    /// Update only status for existing session.
1347    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1348        self.conn.execute(
1349            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1350            params![format!("{:?}", status), id],
1351        )?;
1352        Ok(())
1353    }
1354
1355    /// Workspace activity dashboard — feeds `cmd_insights`.
1356    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1357        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1358        Ok(InsightsStats {
1359            total_sessions: count_q(
1360                &self.conn,
1361                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1362                workspace,
1363            )?,
1364            running_sessions: count_q(
1365                &self.conn,
1366                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1367                workspace,
1368            )?,
1369            total_events: count_q(
1370                &self.conn,
1371                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1372                workspace,
1373            )?,
1374            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1375            recent: recent_sessions_3(&self.conn, workspace)?,
1376            top_tools: top_tools_5(&self.conn, workspace)?,
1377            total_cost_usd_e6,
1378            sessions_with_cost,
1379        })
1380    }
1381
1382    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1383    pub fn retro_events_in_window(
1384        &self,
1385        workspace: &str,
1386        start_ms: u64,
1387        end_ms: u64,
1388    ) -> Result<Vec<(SessionRecord, Event)>> {
1389        let mut stmt = self.conn.prepare(
1390            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1391                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1392                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1393                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1394                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1395                    s.repo_file_count, s.repo_total_loc,
1396                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1397                    e.context_used_tokens, e.context_max_tokens,
1398                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1399             FROM events e
1400             JOIN sessions s ON s.id = e.session_id
1401             WHERE s.workspace = ?1
1402               AND (
1403                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1404                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1405               )
1406             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1407        )?;
1408        let rows = stmt.query_map(
1409            params![
1410                workspace,
1411                start_ms as i64,
1412                end_ms as i64,
1413                SYNTHETIC_TS_CEILING_MS,
1414            ],
1415            |row| {
1416                let payload_str: String = row.get(12)?;
1417                let status_str: String = row.get(19)?;
1418                Ok((
1419                    SessionRecord {
1420                        id: row.get(13)?,
1421                        agent: row.get(14)?,
1422                        model: row.get(15)?,
1423                        workspace: row.get(16)?,
1424                        started_at_ms: row.get::<_, i64>(17)? as u64,
1425                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1426                        status: status_from_str(&status_str),
1427                        trace_path: row.get(20)?,
1428                        start_commit: row.get(21)?,
1429                        end_commit: row.get(22)?,
1430                        branch: row.get(23)?,
1431                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1432                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1433                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1434                        prompt_fingerprint: row.get(27)?,
1435                        parent_session_id: row.get(28)?,
1436                        agent_version: row.get(29)?,
1437                        os: row.get(30)?,
1438                        arch: row.get(31)?,
1439                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1440                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1441                    },
1442                    Event {
1443                        session_id: row.get(0)?,
1444                        seq: row.get::<_, i64>(1)? as u64,
1445                        ts_ms: row.get::<_, i64>(2)? as u64,
1446                        ts_exact: row.get::<_, i64>(3)? != 0,
1447                        kind: kind_from_str(&row.get::<_, String>(4)?),
1448                        source: source_from_str(&row.get::<_, String>(5)?),
1449                        tool: row.get(6)?,
1450                        tool_call_id: row.get(7)?,
1451                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1452                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1453                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1454                        cost_usd_e6: row.get(11)?,
1455                        payload: serde_json::from_str(&payload_str)
1456                            .unwrap_or(serde_json::Value::Null),
1457                        stop_reason: row.get(34)?,
1458                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1459                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1460                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1461                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1462                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1463                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1464                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1465                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1466                    },
1467                ))
1468            },
1469        )?;
1470
1471        let mut out = Vec::new();
1472        for r in rows {
1473            out.push(r?);
1474        }
1475        Ok(out)
1476    }
1477
1478    pub fn experiment_metric_values_in_window(
1479        &self,
1480        workspace: &str,
1481        start_ms: u64,
1482        end_ms: u64,
1483        metric: crate::experiment::types::Metric,
1484    ) -> Result<Vec<(SessionRecord, f64)>> {
1485        use crate::experiment::types::Metric;
1486        let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1487            s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1488            s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1489            s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1490        let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1491            OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1492        let sql = match metric {
1493            Metric::TokensPerSession => format!(
1494                "SELECT {session_cols},
1495                    SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1496                 FROM sessions s JOIN events e ON e.session_id = s.id
1497                 WHERE {window}
1498                 GROUP BY s.id"
1499            ),
1500            Metric::CostPerSession => format!(
1501                "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1502                 FROM sessions s JOIN events e ON e.session_id = s.id
1503                 WHERE {window}
1504                 GROUP BY s.id"
1505            ),
1506            Metric::SuccessRate => format!(
1507                "SELECT {session_cols},
1508                    CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1509                 FROM sessions s JOIN events e ON e.session_id = s.id
1510                 WHERE {window}
1511                 GROUP BY s.id"
1512            ),
1513            Metric::DurationMinutes => format!(
1514                "SELECT {session_cols},
1515                    (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1516                 FROM sessions s
1517                 WHERE s.workspace = ?1
1518                   AND s.ended_at_ms IS NOT NULL
1519                   AND EXISTS (
1520                     SELECT 1 FROM events e
1521                     WHERE e.session_id = s.id
1522                       AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1523                         OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1524                   )"
1525            ),
1526            Metric::FilesPerSession => format!(
1527                "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1528                 FROM sessions s
1529                 JOIN events e ON e.session_id = s.id
1530                 LEFT JOIN files_touched ft ON ft.session_id = s.id
1531                 WHERE {window}
1532                 GROUP BY s.id"
1533            ),
1534            Metric::SuccessRateByPrompt => format!(
1535                "SELECT {session_cols},
1536                    1.0 - (MIN(
1537                      SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1538                      SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1539                    ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1540                 FROM sessions s JOIN events e ON e.session_id = s.id
1541                 WHERE {window}
1542                 GROUP BY s.id
1543                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1544            ),
1545            Metric::CostByPrompt => format!(
1546                "SELECT {session_cols},
1547                    SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1548                    SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1549                 FROM sessions s JOIN events e ON e.session_id = s.id
1550                 WHERE {window}
1551                 GROUP BY s.id
1552                 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1553            ),
1554            Metric::ToolLoops => format!(
1555                "WITH calls AS (
1556                   SELECT e.session_id, e.tool,
1557                     LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1558                   FROM events e JOIN sessions s ON s.id = e.session_id
1559                   WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1560                 )
1561                 SELECT {session_cols},
1562                    SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1563                 FROM sessions s JOIN calls ON calls.session_id = s.id
1564                 GROUP BY s.id"
1565            ),
1566        };
1567        let mut stmt = self.conn.prepare(&sql)?;
1568        let rows = stmt.query_map(
1569            params![
1570                workspace,
1571                start_ms as i64,
1572                end_ms as i64,
1573                SYNTHETIC_TS_CEILING_MS,
1574            ],
1575            |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1576        )?;
1577        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1578    }
1579
1580    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1581    pub fn files_touched_in_window(
1582        &self,
1583        workspace: &str,
1584        start_ms: u64,
1585        end_ms: u64,
1586    ) -> Result<Vec<(String, String)>> {
1587        let mut stmt = self.conn.prepare(
1588            "SELECT DISTINCT ft.session_id, ft.path
1589             FROM files_touched ft
1590             JOIN sessions s ON s.id = ft.session_id
1591             WHERE s.workspace = ?1
1592               AND EXISTS (
1593                 SELECT 1 FROM events e
1594                 JOIN sessions ss ON ss.id = e.session_id
1595                 WHERE e.session_id = ft.session_id
1596                   AND (
1597                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1598                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1599                   )
1600               )
1601             ORDER BY ft.session_id, ft.path",
1602        )?;
1603        let out: Vec<(String, String)> = stmt
1604            .query_map(
1605                params![
1606                    workspace,
1607                    start_ms as i64,
1608                    end_ms as i64,
1609                    SYNTHETIC_TS_CEILING_MS,
1610                ],
1611                |r| Ok((r.get(0)?, r.get(1)?)),
1612            )?
1613            .filter_map(|r| r.ok())
1614            .collect();
1615        Ok(out)
1616    }
1617
1618    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1619    /// (any session with an indexed skill row; join events optional — use row existence).
1620    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1621        let mut stmt = self.conn.prepare(
1622            "SELECT DISTINCT su.skill
1623             FROM skills_used su
1624             JOIN sessions s ON s.id = su.session_id
1625             WHERE s.workspace = ?1
1626               AND EXISTS (
1627                 SELECT 1 FROM events e
1628                 JOIN sessions ss ON ss.id = e.session_id
1629                 WHERE e.session_id = su.session_id
1630                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1631               )
1632             ORDER BY su.skill",
1633        )?;
1634        let out: Vec<String> = stmt
1635            .query_map(
1636                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1637                |r| r.get::<_, String>(0),
1638            )?
1639            .filter_map(|r| r.ok())
1640            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1641            .collect();
1642        Ok(out)
1643    }
1644
1645    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1646    pub fn skills_used_in_window(
1647        &self,
1648        workspace: &str,
1649        start_ms: u64,
1650        end_ms: u64,
1651    ) -> Result<Vec<(String, String)>> {
1652        let mut stmt = self.conn.prepare(
1653            "SELECT DISTINCT su.session_id, su.skill
1654             FROM skills_used su
1655             JOIN sessions s ON s.id = su.session_id
1656             WHERE s.workspace = ?1
1657               AND EXISTS (
1658                 SELECT 1 FROM events e
1659                 JOIN sessions ss ON ss.id = e.session_id
1660                 WHERE e.session_id = su.session_id
1661                   AND (
1662                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1663                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1664                   )
1665               )
1666             ORDER BY su.session_id, su.skill",
1667        )?;
1668        let out: Vec<(String, String)> = stmt
1669            .query_map(
1670                params![
1671                    workspace,
1672                    start_ms as i64,
1673                    end_ms as i64,
1674                    SYNTHETIC_TS_CEILING_MS,
1675                ],
1676                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1677            )?
1678            .filter_map(|r| r.ok())
1679            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1680            .collect();
1681        Ok(out)
1682    }
1683
1684    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1685    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1686        let mut stmt = self.conn.prepare(
1687            "SELECT DISTINCT ru.rule
1688             FROM rules_used ru
1689             JOIN sessions s ON s.id = ru.session_id
1690             WHERE s.workspace = ?1
1691               AND EXISTS (
1692                 SELECT 1 FROM events e
1693                 JOIN sessions ss ON ss.id = e.session_id
1694                 WHERE e.session_id = ru.session_id
1695                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1696               )
1697             ORDER BY ru.rule",
1698        )?;
1699        let out: Vec<String> = stmt
1700            .query_map(
1701                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1702                |r| r.get::<_, String>(0),
1703            )?
1704            .filter_map(|r| r.ok())
1705            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1706            .collect();
1707        Ok(out)
1708    }
1709
1710    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1711    pub fn rules_used_in_window(
1712        &self,
1713        workspace: &str,
1714        start_ms: u64,
1715        end_ms: u64,
1716    ) -> Result<Vec<(String, String)>> {
1717        let mut stmt = self.conn.prepare(
1718            "SELECT DISTINCT ru.session_id, ru.rule
1719             FROM rules_used ru
1720             JOIN sessions s ON s.id = ru.session_id
1721             WHERE s.workspace = ?1
1722               AND EXISTS (
1723                 SELECT 1 FROM events e
1724                 JOIN sessions ss ON ss.id = e.session_id
1725                 WHERE e.session_id = ru.session_id
1726                   AND (
1727                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1728                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1729                   )
1730               )
1731             ORDER BY ru.session_id, ru.rule",
1732        )?;
1733        let out: Vec<(String, String)> = stmt
1734            .query_map(
1735                params![
1736                    workspace,
1737                    start_ms as i64,
1738                    end_ms as i64,
1739                    SYNTHETIC_TS_CEILING_MS,
1740                ],
1741                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1742            )?
1743            .filter_map(|r| r.ok())
1744            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1745            .collect();
1746        Ok(out)
1747    }
1748
1749    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1750    pub fn sessions_active_in_window(
1751        &self,
1752        workspace: &str,
1753        start_ms: u64,
1754        end_ms: u64,
1755    ) -> Result<HashSet<String>> {
1756        let mut stmt = self.conn.prepare(
1757            "SELECT DISTINCT s.id
1758             FROM sessions s
1759             WHERE s.workspace = ?1
1760               AND EXISTS (
1761                 SELECT 1 FROM events e
1762                 WHERE e.session_id = s.id
1763                   AND (
1764                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1765                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1766                   )
1767               )",
1768        )?;
1769        let out: HashSet<String> = stmt
1770            .query_map(
1771                params![
1772                    workspace,
1773                    start_ms as i64,
1774                    end_ms as i64,
1775                    SYNTHETIC_TS_CEILING_MS,
1776                ],
1777                |r| r.get(0),
1778            )?
1779            .filter_map(|r| r.ok())
1780            .collect();
1781        Ok(out)
1782    }
1783
1784    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1785    pub fn session_costs_usd_e6_in_window(
1786        &self,
1787        workspace: &str,
1788        start_ms: u64,
1789        end_ms: u64,
1790    ) -> Result<HashMap<String, i64>> {
1791        let mut stmt = self.conn.prepare(
1792            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1793             FROM events e
1794             JOIN sessions s ON s.id = e.session_id
1795             WHERE s.workspace = ?1
1796               AND (
1797                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1798                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1799               )
1800             GROUP BY e.session_id",
1801        )?;
1802        let rows: Vec<(String, i64)> = stmt
1803            .query_map(
1804                params![
1805                    workspace,
1806                    start_ms as i64,
1807                    end_ms as i64,
1808                    SYNTHETIC_TS_CEILING_MS,
1809                ],
1810                |r| Ok((r.get(0)?, r.get(1)?)),
1811            )?
1812            .filter_map(|r| r.ok())
1813            .collect();
1814        Ok(rows.into_iter().collect())
1815    }
1816
1817    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1818    pub fn guidance_report(
1819        &self,
1820        workspace: &str,
1821        window_start_ms: u64,
1822        window_end_ms: u64,
1823        skill_slugs_on_disk: &HashSet<String>,
1824        rule_slugs_on_disk: &HashSet<String>,
1825    ) -> Result<GuidanceReport> {
1826        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1827        let denom = active.len() as u64;
1828        let costs =
1829            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1830
1831        let workspace_avg_cost_per_session_usd = if denom > 0 {
1832            let total_e6: i64 = active
1833                .iter()
1834                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1835                .sum();
1836            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1837        } else {
1838            None
1839        };
1840
1841        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1842        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1843            skill_sessions.entry(skill).or_default().insert(sid);
1844        }
1845        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1846        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1847            rule_sessions.entry(rule).or_default().insert(sid);
1848        }
1849
1850        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1851
1852        let mut push_row =
1853            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1854                let sessions = sids.len() as u64;
1855                let sessions_pct = if denom > 0 {
1856                    sessions as f64 * 100.0 / denom as f64
1857                } else {
1858                    0.0
1859                };
1860                let total_cost_usd_e6: i64 = sids
1861                    .iter()
1862                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1863                    .sum();
1864                let avg_cost_per_session_usd = if sessions > 0 {
1865                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1866                } else {
1867                    None
1868                };
1869                let vs_workspace_avg_cost_per_session_usd =
1870                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1871                        (Some(avg), Some(w)) => Some(avg - w),
1872                        _ => None,
1873                    };
1874                rows.push(GuidancePerfRow {
1875                    kind,
1876                    id,
1877                    sessions,
1878                    sessions_pct,
1879                    total_cost_usd_e6,
1880                    avg_cost_per_session_usd,
1881                    vs_workspace_avg_cost_per_session_usd,
1882                    on_disk,
1883                });
1884            };
1885
1886        let mut seen_skills: HashSet<String> = HashSet::new();
1887        for (id, sids) in &skill_sessions {
1888            seen_skills.insert(id.clone());
1889            push_row(
1890                GuidanceKind::Skill,
1891                id.clone(),
1892                sids,
1893                skill_slugs_on_disk.contains(id),
1894            );
1895        }
1896        for slug in skill_slugs_on_disk {
1897            if seen_skills.contains(slug) {
1898                continue;
1899            }
1900            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1901        }
1902
1903        let mut seen_rules: HashSet<String> = HashSet::new();
1904        for (id, sids) in &rule_sessions {
1905            seen_rules.insert(id.clone());
1906            push_row(
1907                GuidanceKind::Rule,
1908                id.clone(),
1909                sids,
1910                rule_slugs_on_disk.contains(id),
1911            );
1912        }
1913        for slug in rule_slugs_on_disk {
1914            if seen_rules.contains(slug) {
1915                continue;
1916            }
1917            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1918        }
1919
1920        rows.sort_by(|a, b| {
1921            b.sessions
1922                .cmp(&a.sessions)
1923                .then_with(|| a.kind.cmp(&b.kind))
1924                .then_with(|| a.id.cmp(&b.id))
1925        });
1926
1927        Ok(GuidanceReport {
1928            workspace: workspace.to_string(),
1929            window_start_ms,
1930            window_end_ms,
1931            sessions_in_window: denom,
1932            workspace_avg_cost_per_session_usd,
1933            rows,
1934        })
1935    }
1936
1937    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1938        let mut stmt = self.conn.prepare(
1939            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1940                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1941                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1942                    repo_file_count, repo_total_loc
1943             FROM sessions WHERE id = ?1",
1944        )?;
1945        let mut rows = stmt.query_map(params![id], |row| {
1946            Ok((
1947                row.get::<_, String>(0)?,
1948                row.get::<_, String>(1)?,
1949                row.get::<_, Option<String>>(2)?,
1950                row.get::<_, String>(3)?,
1951                row.get::<_, i64>(4)?,
1952                row.get::<_, Option<i64>>(5)?,
1953                row.get::<_, String>(6)?,
1954                row.get::<_, String>(7)?,
1955                row.get::<_, Option<String>>(8)?,
1956                row.get::<_, Option<String>>(9)?,
1957                row.get::<_, Option<String>>(10)?,
1958                row.get::<_, Option<i64>>(11)?,
1959                row.get::<_, Option<i64>>(12)?,
1960                row.get::<_, String>(13)?,
1961                row.get::<_, Option<String>>(14)?,
1962                row.get::<_, Option<String>>(15)?,
1963                row.get::<_, Option<String>>(16)?,
1964                row.get::<_, Option<String>>(17)?,
1965                row.get::<_, Option<String>>(18)?,
1966                row.get::<_, Option<i64>>(19)?,
1967                row.get::<_, Option<i64>>(20)?,
1968            ))
1969        })?;
1970
1971        if let Some(row) = rows.next() {
1972            let (
1973                id,
1974                agent,
1975                model,
1976                workspace,
1977                started,
1978                ended,
1979                status_str,
1980                trace,
1981                start_commit,
1982                end_commit,
1983                branch,
1984                dirty_start,
1985                dirty_end,
1986                source,
1987                prompt_fingerprint,
1988                parent_session_id,
1989                agent_version,
1990                os,
1991                arch,
1992                repo_file_count,
1993                repo_total_loc,
1994            ) = row?;
1995            Ok(Some(SessionRecord {
1996                id,
1997                agent,
1998                model,
1999                workspace,
2000                started_at_ms: started as u64,
2001                ended_at_ms: ended.map(|v| v as u64),
2002                status: status_from_str(&status_str),
2003                trace_path: trace,
2004                start_commit,
2005                end_commit,
2006                branch,
2007                dirty_start: dirty_start.map(i64_to_bool),
2008                dirty_end: dirty_end.map(i64_to_bool),
2009                repo_binding_source: empty_to_none(source),
2010                prompt_fingerprint,
2011                parent_session_id,
2012                agent_version,
2013                os,
2014                arch,
2015                repo_file_count: repo_file_count.map(|v| v as u32),
2016                repo_total_loc: repo_total_loc.map(|v| v as u64),
2017            }))
2018        } else {
2019            Ok(None)
2020        }
2021    }
2022
2023    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2024        let mut stmt = self.conn.prepare(
2025            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2026                    indexed_at_ms, dirty, graph_path
2027             FROM repo_snapshots WHERE workspace = ?1
2028             ORDER BY indexed_at_ms DESC LIMIT 1",
2029        )?;
2030        let mut rows = stmt.query_map(params![workspace], |row| {
2031            Ok(RepoSnapshotRecord {
2032                id: row.get(0)?,
2033                workspace: row.get(1)?,
2034                head_commit: row.get(2)?,
2035                dirty_fingerprint: row.get(3)?,
2036                analyzer_version: row.get(4)?,
2037                indexed_at_ms: row.get::<_, i64>(5)? as u64,
2038                dirty: row.get::<_, i64>(6)? != 0,
2039                graph_path: row.get(7)?,
2040            })
2041        })?;
2042        Ok(rows.next().transpose()?)
2043    }
2044
2045    pub fn save_repo_snapshot(
2046        &self,
2047        snapshot: &RepoSnapshotRecord,
2048        facts: &[FileFact],
2049        edges: &[RepoEdge],
2050    ) -> Result<()> {
2051        self.conn.execute(
2052            "INSERT INTO repo_snapshots (
2053                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2054                indexed_at_ms, dirty, graph_path
2055             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2056             ON CONFLICT(id) DO UPDATE SET
2057                workspace=excluded.workspace,
2058                head_commit=excluded.head_commit,
2059                dirty_fingerprint=excluded.dirty_fingerprint,
2060                analyzer_version=excluded.analyzer_version,
2061                indexed_at_ms=excluded.indexed_at_ms,
2062                dirty=excluded.dirty,
2063                graph_path=excluded.graph_path",
2064            params![
2065                snapshot.id,
2066                snapshot.workspace,
2067                snapshot.head_commit,
2068                snapshot.dirty_fingerprint,
2069                snapshot.analyzer_version,
2070                snapshot.indexed_at_ms as i64,
2071                bool_to_i64(snapshot.dirty),
2072                snapshot.graph_path,
2073            ],
2074        )?;
2075        self.conn.execute(
2076            "DELETE FROM file_facts WHERE snapshot_id = ?1",
2077            params![snapshot.id],
2078        )?;
2079        self.conn.execute(
2080            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2081            params![snapshot.id],
2082        )?;
2083        for fact in facts {
2084            self.conn.execute(
2085                "INSERT INTO file_facts (
2086                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2087                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2088                    churn_30d, churn_90d, authors_90d, last_changed_ms
2089                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2090                params![
2091                    fact.snapshot_id,
2092                    fact.path,
2093                    fact.language,
2094                    fact.bytes as i64,
2095                    fact.loc as i64,
2096                    fact.sloc as i64,
2097                    fact.complexity_total as i64,
2098                    fact.max_fn_complexity as i64,
2099                    fact.symbol_count as i64,
2100                    fact.import_count as i64,
2101                    fact.fan_in as i64,
2102                    fact.fan_out as i64,
2103                    fact.churn_30d as i64,
2104                    fact.churn_90d as i64,
2105                    fact.authors_90d as i64,
2106                    fact.last_changed_ms.map(|v| v as i64),
2107                ],
2108            )?;
2109        }
2110        for edge in edges {
2111            self.conn.execute(
2112                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2113                 VALUES (?1, ?2, ?3, ?4, ?5)
2114                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2115                 DO UPDATE SET weight = weight + excluded.weight",
2116                params![
2117                    snapshot.id,
2118                    edge.from_path,
2119                    edge.to_path,
2120                    edge.kind,
2121                    edge.weight as i64,
2122                ],
2123            )?;
2124        }
2125        Ok(())
2126    }
2127
2128    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2129        let mut stmt = self.conn.prepare(
2130            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2131                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2132                    churn_30d, churn_90d, authors_90d, last_changed_ms
2133             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2134        )?;
2135        let rows = stmt.query_map(params![snapshot_id], |row| {
2136            Ok(FileFact {
2137                snapshot_id: row.get(0)?,
2138                path: row.get(1)?,
2139                language: row.get(2)?,
2140                bytes: row.get::<_, i64>(3)? as u64,
2141                loc: row.get::<_, i64>(4)? as u32,
2142                sloc: row.get::<_, i64>(5)? as u32,
2143                complexity_total: row.get::<_, i64>(6)? as u32,
2144                max_fn_complexity: row.get::<_, i64>(7)? as u32,
2145                symbol_count: row.get::<_, i64>(8)? as u32,
2146                import_count: row.get::<_, i64>(9)? as u32,
2147                fan_in: row.get::<_, i64>(10)? as u32,
2148                fan_out: row.get::<_, i64>(11)? as u32,
2149                churn_30d: row.get::<_, i64>(12)? as u32,
2150                churn_90d: row.get::<_, i64>(13)? as u32,
2151                authors_90d: row.get::<_, i64>(14)? as u32,
2152                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2153            })
2154        })?;
2155        Ok(rows.filter_map(|row| row.ok()).collect())
2156    }
2157
2158    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2159        let mut stmt = self.conn.prepare(
2160            "SELECT from_id, to_id, kind, weight
2161             FROM repo_edges WHERE snapshot_id = ?1
2162             ORDER BY kind, from_id, to_id",
2163        )?;
2164        let rows = stmt.query_map(params![snapshot_id], |row| {
2165            Ok(RepoEdge {
2166                from_path: row.get(0)?,
2167                to_path: row.get(1)?,
2168                kind: row.get(2)?,
2169                weight: row.get::<_, i64>(3)? as u32,
2170            })
2171        })?;
2172        Ok(rows.filter_map(|row| row.ok()).collect())
2173    }
2174
2175    pub fn tool_spans_in_window(
2176        &self,
2177        workspace: &str,
2178        start_ms: u64,
2179        end_ms: u64,
2180    ) -> Result<Vec<ToolSpanView>> {
2181        let mut stmt = self.conn.prepare(
2182            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2183                    reasoning_tokens, cost_usd_e6, paths_json,
2184                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2185             FROM (
2186                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2187                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2188                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2189                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2190                        ts.started_at_ms AS sort_ms
2191                 FROM tool_spans ts
2192                 JOIN sessions s ON s.id = ts.session_id
2193                 WHERE s.workspace = ?1
2194                   AND ts.started_at_ms >= ?2
2195                   AND ts.started_at_ms <= ?3
2196                 UNION ALL
2197                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2198                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2199                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2200                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2201                        ts.ended_at_ms AS sort_ms
2202                 FROM tool_spans ts
2203                 JOIN sessions s ON s.id = ts.session_id
2204                 WHERE s.workspace = ?1
2205                   AND ts.started_at_ms IS NULL
2206                   AND ts.ended_at_ms >= ?2
2207                   AND ts.ended_at_ms <= ?3
2208             )
2209             ORDER BY sort_ms DESC",
2210        )?;
2211        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2212            let paths_json: String = row.get(8)?;
2213            Ok(ToolSpanView {
2214                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2215                tool: row
2216                    .get::<_, Option<String>>(1)?
2217                    .unwrap_or_else(|| "unknown".into()),
2218                status: row.get(2)?,
2219                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2220                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2221                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2222                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2223                cost_usd_e6: row.get(7)?,
2224                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2225                parent_span_id: row.get(9)?,
2226                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2227                subtree_cost_usd_e6: row.get(11)?,
2228                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2229            })
2230        })?;
2231        Ok(rows.filter_map(|row| row.ok()).collect())
2232    }
2233
2234    pub fn session_span_tree(
2235        &self,
2236        session_id: &str,
2237    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2238        let last_event_seq = self.last_event_seq_for_session(session_id)?;
2239        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2240            && entry.session_id == session_id
2241            && entry.last_event_seq == last_event_seq
2242        {
2243            return Ok(entry.nodes.clone());
2244        }
2245        let mut stmt = self.conn.prepare(
2246            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2247                    reasoning_tokens, cost_usd_e6, paths_json,
2248                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2249             FROM tool_spans
2250             WHERE session_id = ?1
2251             ORDER BY depth ASC, started_at_ms ASC",
2252        )?;
2253        let rows = stmt.query_map(params![session_id], |row| {
2254            let paths_json: String = row.get(8)?;
2255            Ok(crate::metrics::types::ToolSpanView {
2256                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2257                tool: row
2258                    .get::<_, Option<String>>(1)?
2259                    .unwrap_or_else(|| "unknown".into()),
2260                status: row.get(2)?,
2261                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2262                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2263                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2264                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2265                cost_usd_e6: row.get(7)?,
2266                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2267                parent_span_id: row.get(9)?,
2268                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2269                subtree_cost_usd_e6: row.get(11)?,
2270                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2271            })
2272        })?;
2273        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2274        let nodes = crate::store::span_tree::build_tree(spans);
2275        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2276            session_id: session_id.to_string(),
2277            last_event_seq,
2278            nodes: nodes.clone(),
2279        });
2280        Ok(nodes)
2281    }
2282
2283    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2284        let seq = self
2285            .conn
2286            .query_row(
2287                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2288                params![session_id],
2289                |r| r.get::<_, Option<i64>>(0),
2290            )?
2291            .map(|v| v as u64);
2292        Ok(seq)
2293    }
2294
2295    /// Sync-shaped tool spans whose session falls in `[start_ms, end_ms]`. Mirrors
2296    /// `retro_events_in_window` for the spans table so `kaizen telemetry push` can ship
2297    /// `IngestExportBatch::ToolSpans` next to the events batch. Window matches on
2298    /// `started_at_ms` first, falling back to `ended_at_ms` for spans that never started a
2299    /// timer (status-only rows). Workspace filter joins through `sessions.workspace`.
2300    pub fn tool_spans_sync_rows_in_window(
2301        &self,
2302        workspace: &str,
2303        start_ms: u64,
2304        end_ms: u64,
2305    ) -> Result<Vec<ToolSpanSyncRow>> {
2306        let mut stmt = self.conn.prepare(
2307            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2308                    lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2309             FROM (
2310                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2311                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2312                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2313                        ts.started_at_ms AS sort_ms
2314                 FROM tool_spans ts
2315                 JOIN sessions s ON s.id = ts.session_id
2316                 WHERE s.workspace = ?1
2317                   AND ts.started_at_ms IS NOT NULL
2318                   AND ts.started_at_ms >= ?2
2319                   AND ts.started_at_ms <= ?3
2320                 UNION ALL
2321                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2322                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2323                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2324                        ts.ended_at_ms AS sort_ms
2325                 FROM tool_spans ts
2326                 JOIN sessions s ON s.id = ts.session_id
2327                 WHERE s.workspace = ?1
2328                   AND ts.started_at_ms IS NULL
2329                   AND ts.ended_at_ms IS NOT NULL
2330                   AND ts.ended_at_ms >= ?2
2331                   AND ts.ended_at_ms <= ?3
2332             )
2333             ORDER BY sort_ms ASC, span_id ASC",
2334        )?;
2335        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2336            let paths_json: String = row.get(12)?;
2337            Ok(ToolSpanSyncRow {
2338                span_id: row.get(0)?,
2339                session_id: row.get(1)?,
2340                tool: row.get(2)?,
2341                tool_call_id: row.get(3)?,
2342                status: row.get(4)?,
2343                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2344                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2345                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2346                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2347                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2348                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2349                cost_usd_e6: row.get(11)?,
2350                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2351            })
2352        })?;
2353        Ok(rows.filter_map(|row| row.ok()).collect())
2354    }
2355
2356    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2357        let mut stmt = self.conn.prepare(
2358            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2359                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2360             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2361        )?;
2362        let rows = stmt.query_map(params![session_id], |row| {
2363            let paths_json: String = row.get(12)?;
2364            Ok(ToolSpanSyncRow {
2365                span_id: row.get(0)?,
2366                session_id: row.get(1)?,
2367                tool: row.get(2)?,
2368                tool_call_id: row.get(3)?,
2369                status: row.get(4)?,
2370                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2371                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2372                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2373                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2374                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2375                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2376                cost_usd_e6: row.get(11)?,
2377                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2378            })
2379        })?;
2380        Ok(rows.filter_map(|row| row.ok()).collect())
2381    }
2382
2383    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2384        self.conn.execute(
2385            "INSERT OR REPLACE INTO session_evals
2386             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2387             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2388            rusqlite::params![
2389                eval.id,
2390                eval.session_id,
2391                eval.judge_model,
2392                eval.rubric_id,
2393                eval.score,
2394                eval.rationale,
2395                eval.flagged as i64,
2396                eval.created_at_ms as i64,
2397            ],
2398        )?;
2399        Ok(())
2400    }
2401
2402    pub fn list_evals_in_window(
2403        &self,
2404        start_ms: u64,
2405        end_ms: u64,
2406    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2407        let mut stmt = self.conn.prepare(
2408            "SELECT id, session_id, judge_model, rubric_id, score,
2409                    rationale, flagged, created_at_ms
2410             FROM session_evals
2411             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2412             ORDER BY created_at_ms ASC",
2413        )?;
2414        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2415            Ok(crate::eval::types::EvalRow {
2416                id: r.get(0)?,
2417                session_id: r.get(1)?,
2418                judge_model: r.get(2)?,
2419                rubric_id: r.get(3)?,
2420                score: r.get(4)?,
2421                rationale: r.get(5)?,
2422                flagged: r.get::<_, i64>(6)? != 0,
2423                created_at_ms: r.get::<_, i64>(7)? as u64,
2424            })
2425        })?;
2426        rows.collect()
2427    }
2428
2429    pub fn list_evals_for_session(
2430        &self,
2431        session_id: &str,
2432    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2433        let mut stmt = self.conn.prepare(
2434            "SELECT id, session_id, judge_model, rubric_id, score,
2435                    rationale, flagged, created_at_ms
2436             FROM session_evals
2437             WHERE session_id = ?1
2438             ORDER BY created_at_ms DESC",
2439        )?;
2440        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2441            Ok(crate::eval::types::EvalRow {
2442                id: r.get(0)?,
2443                session_id: r.get(1)?,
2444                judge_model: r.get(2)?,
2445                rubric_id: r.get(3)?,
2446                score: r.get(4)?,
2447                rationale: r.get(5)?,
2448                flagged: r.get::<_, i64>(6)? != 0,
2449                created_at_ms: r.get::<_, i64>(7)? as u64,
2450            })
2451        })?;
2452        rows.collect()
2453    }
2454
2455    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2456        use crate::feedback::types::FeedbackLabel;
2457        self.conn.execute(
2458            "INSERT OR REPLACE INTO session_feedback
2459             (id, session_id, score, label, note, created_at_ms)
2460             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2461            rusqlite::params![
2462                r.id,
2463                r.session_id,
2464                r.score.as_ref().map(|s| s.0 as i64),
2465                r.label.as_ref().map(FeedbackLabel::to_db_str),
2466                r.note,
2467                r.created_at_ms as i64,
2468            ],
2469        )?;
2470        let payload = serde_json::to_string(r).unwrap_or_default();
2471        self.conn.execute(
2472            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2473             VALUES (?1, 'session_feedback', ?2, 0)",
2474            rusqlite::params![r.session_id, payload],
2475        )?;
2476        Ok(())
2477    }
2478
2479    pub fn list_feedback_in_window(
2480        &self,
2481        start_ms: u64,
2482        end_ms: u64,
2483    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2484        let mut stmt = self.conn.prepare(
2485            "SELECT id, session_id, score, label, note, created_at_ms
2486             FROM session_feedback
2487             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2488             ORDER BY created_at_ms ASC",
2489        )?;
2490        let rows = stmt.query_map(
2491            rusqlite::params![start_ms as i64, end_ms as i64],
2492            feedback_row,
2493        )?;
2494        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2495    }
2496
2497    pub fn feedback_for_sessions(
2498        &self,
2499        ids: &[String],
2500    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2501        if ids.is_empty() {
2502            return Ok(std::collections::HashMap::new());
2503        }
2504        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2505        let sql = format!(
2506            "SELECT id, session_id, score, label, note, created_at_ms
2507             FROM session_feedback WHERE session_id IN ({placeholders})
2508             ORDER BY created_at_ms DESC"
2509        );
2510        let mut stmt = self.conn.prepare(&sql)?;
2511        let params: Vec<&dyn rusqlite::ToSql> =
2512            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2513        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2514        let mut map = std::collections::HashMap::new();
2515        for row in rows {
2516            let r = row?;
2517            map.entry(r.session_id.clone()).or_insert(r);
2518        }
2519        Ok(map)
2520    }
2521
2522    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2523        self.conn.execute(
2524            "INSERT INTO session_outcomes (
2525                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2526                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2527            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2528            ON CONFLICT(session_id) DO UPDATE SET
2529                test_passed=excluded.test_passed,
2530                test_failed=excluded.test_failed,
2531                test_skipped=excluded.test_skipped,
2532                build_ok=excluded.build_ok,
2533                lint_errors=excluded.lint_errors,
2534                revert_lines_14d=excluded.revert_lines_14d,
2535                pr_open=excluded.pr_open,
2536                ci_ok=excluded.ci_ok,
2537                measured_at_ms=excluded.measured_at_ms,
2538                measure_error=excluded.measure_error",
2539            params![
2540                row.session_id,
2541                row.test_passed,
2542                row.test_failed,
2543                row.test_skipped,
2544                row.build_ok.map(bool_to_i64),
2545                row.lint_errors,
2546                row.revert_lines_14d,
2547                row.pr_open,
2548                row.ci_ok.map(bool_to_i64),
2549                row.measured_at_ms as i64,
2550                row.measure_error.as_deref(),
2551            ],
2552        )?;
2553        Ok(())
2554    }
2555
2556    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2557        let mut stmt = self.conn.prepare(
2558            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2559                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2560             FROM session_outcomes WHERE session_id = ?1",
2561        )?;
2562        let row = stmt
2563            .query_row(params![session_id], outcome_row)
2564            .optional()?;
2565        Ok(row)
2566    }
2567
2568    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2569    pub fn list_session_outcomes_in_window(
2570        &self,
2571        workspace: &str,
2572        start_ms: u64,
2573        end_ms: u64,
2574    ) -> Result<Vec<SessionOutcomeRow>> {
2575        let mut stmt = self.conn.prepare(
2576            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2577                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2578             FROM session_outcomes o
2579             JOIN sessions s ON s.id = o.session_id
2580             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2581             ORDER BY o.measured_at_ms ASC",
2582        )?;
2583        let rows = stmt.query_map(
2584            params![workspace, start_ms as i64, end_ms as i64],
2585            outcome_row,
2586        )?;
2587        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2588    }
2589
2590    pub fn append_session_sample(
2591        &self,
2592        session_id: &str,
2593        ts_ms: u64,
2594        pid: u32,
2595        cpu_percent: Option<f64>,
2596        rss_bytes: Option<u64>,
2597    ) -> Result<()> {
2598        self.conn.execute(
2599            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2600             VALUES (?1, ?2, ?3, ?4, ?5)",
2601            params![
2602                session_id,
2603                ts_ms as i64,
2604                pid as i64,
2605                cpu_percent,
2606                rss_bytes.map(|b| b as i64)
2607            ],
2608        )?;
2609        Ok(())
2610    }
2611
2612    /// Per-session maxima for retro heuristics.
2613    pub fn list_session_sample_aggs_in_window(
2614        &self,
2615        workspace: &str,
2616        start_ms: u64,
2617        end_ms: u64,
2618    ) -> Result<Vec<SessionSampleAgg>> {
2619        let mut stmt = self.conn.prepare(
2620            "SELECT ss.session_id, COUNT(*) AS n,
2621                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2622             FROM session_samples ss
2623             JOIN sessions s ON s.id = ss.session_id
2624             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2625             GROUP BY ss.session_id",
2626        )?;
2627        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2628            let sid: String = r.get(0)?;
2629            let n: i64 = r.get(1)?;
2630            let max_cpu: Option<f64> = r.get(2)?;
2631            let max_rss: Option<i64> = r.get(3)?;
2632            Ok(SessionSampleAgg {
2633                session_id: sid,
2634                sample_count: n as u64,
2635                max_cpu_percent: max_cpu.unwrap_or(0.0),
2636                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2637            })
2638        })?;
2639        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2640    }
2641
2642    pub fn list_sessions_for_eval(
2643        &self,
2644        since_ms: u64,
2645        min_cost_usd: f64,
2646    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2647        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2648        let mut stmt = self.conn.prepare(
2649            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2650                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2651                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2652                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2653             FROM sessions s
2654             WHERE s.started_at_ms >= ?1
2655               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2656               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2657             ORDER BY s.started_at_ms DESC",
2658        )?;
2659        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2660            Ok((
2661                r.get::<_, String>(0)?,
2662                r.get::<_, String>(1)?,
2663                r.get::<_, Option<String>>(2)?,
2664                r.get::<_, String>(3)?,
2665                r.get::<_, i64>(4)?,
2666                r.get::<_, Option<i64>>(5)?,
2667                r.get::<_, String>(6)?,
2668                r.get::<_, String>(7)?,
2669                r.get::<_, Option<String>>(8)?,
2670                r.get::<_, Option<String>>(9)?,
2671                r.get::<_, Option<String>>(10)?,
2672                r.get::<_, Option<i64>>(11)?,
2673                r.get::<_, Option<i64>>(12)?,
2674                r.get::<_, Option<String>>(13)?,
2675                r.get::<_, Option<String>>(14)?,
2676                r.get::<_, Option<String>>(15)?,
2677                r.get::<_, Option<String>>(16)?,
2678                r.get::<_, Option<String>>(17)?,
2679                r.get::<_, Option<String>>(18)?,
2680                r.get::<_, Option<i64>>(19)?,
2681                r.get::<_, Option<i64>>(20)?,
2682            ))
2683        })?;
2684        let mut out = Vec::new();
2685        for row in rows {
2686            let (
2687                id,
2688                agent,
2689                model,
2690                workspace,
2691                started,
2692                ended,
2693                status_str,
2694                trace,
2695                start_commit,
2696                end_commit,
2697                branch,
2698                dirty_start,
2699                dirty_end,
2700                source,
2701                prompt_fingerprint,
2702                parent_session_id,
2703                agent_version,
2704                os,
2705                arch,
2706                repo_file_count,
2707                repo_total_loc,
2708            ) = row?;
2709            out.push(crate::core::event::SessionRecord {
2710                id,
2711                agent,
2712                model,
2713                workspace,
2714                started_at_ms: started as u64,
2715                ended_at_ms: ended.map(|v| v as u64),
2716                status: status_from_str(&status_str),
2717                trace_path: trace,
2718                start_commit,
2719                end_commit,
2720                branch,
2721                dirty_start: dirty_start.map(i64_to_bool),
2722                dirty_end: dirty_end.map(i64_to_bool),
2723                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2724                prompt_fingerprint,
2725                parent_session_id,
2726                agent_version,
2727                os,
2728                arch,
2729                repo_file_count: repo_file_count.map(|v| v as u32),
2730                repo_total_loc: repo_total_loc.map(|v| v as u64),
2731            });
2732        }
2733        Ok(out)
2734    }
2735
2736    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2737        self.conn.execute(
2738            "INSERT OR IGNORE INTO prompt_snapshots
2739             (fingerprint, captured_at_ms, files_json, total_bytes)
2740             VALUES (?1, ?2, ?3, ?4)",
2741            params![
2742                snap.fingerprint,
2743                snap.captured_at_ms as i64,
2744                snap.files_json,
2745                snap.total_bytes as i64
2746            ],
2747        )?;
2748        Ok(())
2749    }
2750
2751    pub fn get_prompt_snapshot(
2752        &self,
2753        fingerprint: &str,
2754    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2755        self.conn
2756            .query_row(
2757                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2758                 FROM prompt_snapshots WHERE fingerprint = ?1",
2759                params![fingerprint],
2760                |r| {
2761                    Ok(crate::prompt::PromptSnapshot {
2762                        fingerprint: r.get(0)?,
2763                        captured_at_ms: r.get::<_, i64>(1)? as u64,
2764                        files_json: r.get(2)?,
2765                        total_bytes: r.get::<_, i64>(3)? as u64,
2766                    })
2767                },
2768            )
2769            .optional()
2770            .map_err(Into::into)
2771    }
2772
2773    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2774        let mut stmt = self.conn.prepare(
2775            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2776             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2777        )?;
2778        let rows = stmt.query_map([], |r| {
2779            Ok(crate::prompt::PromptSnapshot {
2780                fingerprint: r.get(0)?,
2781                captured_at_ms: r.get::<_, i64>(1)? as u64,
2782                files_json: r.get(2)?,
2783                total_bytes: r.get::<_, i64>(3)? as u64,
2784            })
2785        })?;
2786        Ok(rows.filter_map(|r| r.ok()).collect())
2787    }
2788
2789    /// Sessions with a non-null prompt_fingerprint in the given window.
2790    pub fn sessions_with_prompt_fingerprint(
2791        &self,
2792        workspace: &str,
2793        start_ms: u64,
2794        end_ms: u64,
2795    ) -> Result<Vec<(String, String)>> {
2796        let mut stmt = self.conn.prepare(
2797            "SELECT id, prompt_fingerprint FROM sessions
2798             WHERE workspace = ?1
2799               AND started_at_ms >= ?2 AND started_at_ms < ?3
2800               AND prompt_fingerprint IS NOT NULL",
2801        )?;
2802        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2803            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2804        })?;
2805        Ok(rows.filter_map(|r| r.ok()).collect())
2806    }
2807}
2808
2809impl Drop for Store {
2810    fn drop(&mut self) {
2811        if let Some(writer) = self.search_writer.get_mut().as_mut() {
2812            let _ = writer.commit();
2813        }
2814    }
2815}
2816
2817fn now_ms() -> u64 {
2818    std::time::SystemTime::now()
2819        .duration_since(std::time::UNIX_EPOCH)
2820        .unwrap_or_default()
2821        .as_millis() as u64
2822}
2823
2824fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
2825    let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
2826    let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
2827    Ok(rows.filter_map(|r| r.ok()).collect())
2828}
2829
2830fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2831    raw.and_then(|s| s.trim().parse::<u64>().ok())
2832        .unwrap_or(DEFAULT_MMAP_MB)
2833        .saturating_mul(1024)
2834        .saturating_mul(1024)
2835        .min(i64::MAX as u64) as i64
2836}
2837
2838fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2839    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2840    conn.execute_batch(&format!(
2841        "
2842        PRAGMA journal_mode=WAL;
2843        PRAGMA busy_timeout=5000;
2844        PRAGMA synchronous=NORMAL;
2845        PRAGMA cache_size=-65536;
2846        PRAGMA mmap_size={mmap_size};
2847        PRAGMA temp_store=MEMORY;
2848        PRAGMA wal_autocheckpoint=1000;
2849        "
2850    ))?;
2851    if mode == StoreOpenMode::ReadOnlyQuery {
2852        conn.execute_batch("PRAGMA query_only=ON;")?;
2853    }
2854    Ok(())
2855}
2856
2857fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2858    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2859}
2860
2861fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2862    let status_str: String = row.get(6)?;
2863    Ok(SessionRecord {
2864        id: row.get(0)?,
2865        agent: row.get(1)?,
2866        model: row.get(2)?,
2867        workspace: row.get(3)?,
2868        started_at_ms: row.get::<_, i64>(4)? as u64,
2869        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2870        status: status_from_str(&status_str),
2871        trace_path: row.get(7)?,
2872        start_commit: row.get(8)?,
2873        end_commit: row.get(9)?,
2874        branch: row.get(10)?,
2875        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2876        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2877        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2878        prompt_fingerprint: row.get(14)?,
2879        parent_session_id: row.get(15)?,
2880        agent_version: row.get(16)?,
2881        os: row.get(17)?,
2882        arch: row.get(18)?,
2883        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2884        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2885    })
2886}
2887
2888fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2889    let payload_str: String = row.get(12)?;
2890    Ok(Event {
2891        session_id: row.get(0)?,
2892        seq: row.get::<_, i64>(1)? as u64,
2893        ts_ms: row.get::<_, i64>(2)? as u64,
2894        ts_exact: row.get::<_, i64>(3)? != 0,
2895        kind: kind_from_str(&row.get::<_, String>(4)?),
2896        source: source_from_str(&row.get::<_, String>(5)?),
2897        tool: row.get(6)?,
2898        tool_call_id: row.get(7)?,
2899        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2900        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2901        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2902        cost_usd_e6: row.get(11)?,
2903        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2904        stop_reason: row.get(13)?,
2905        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2906        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2907        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2908        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2909        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2910        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2911        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2912        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2913    })
2914}
2915
2916fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2917    let mut clauses = vec!["workspace = ?".to_string()];
2918    let mut args = vec![Value::Text(workspace.to_string())];
2919    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2920        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2921        args.push(Value::Text(format!("{}%", escape_like(prefix))));
2922    }
2923    if let Some(status) = &filter.status {
2924        clauses.push("status = ?".to_string());
2925        args.push(Value::Text(format!("{status:?}")));
2926    }
2927    if let Some(since_ms) = filter.since_ms {
2928        clauses.push("started_at_ms >= ?".to_string());
2929        args.push(Value::Integer(since_ms as i64));
2930    }
2931    (format!("WHERE {}", clauses.join(" AND ")), args)
2932}
2933
2934fn escape_like(raw: &str) -> String {
2935    raw.to_lowercase()
2936        .replace('\\', "\\\\")
2937        .replace('%', "\\%")
2938        .replace('_', "\\_")
2939}
2940
2941fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2942    let cost: i64 = conn.query_row(
2943        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2944        params![workspace], |r| r.get(0),
2945    )?;
2946    let with_cost: i64 = conn.query_row(
2947        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2948        params![workspace], |r| r.get(0),
2949    )?;
2950    Ok((cost, with_cost as u64))
2951}
2952
2953fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2954    let build_raw: Option<i64> = r.get(4)?;
2955    let ci_raw: Option<i64> = r.get(8)?;
2956    Ok(SessionOutcomeRow {
2957        session_id: r.get(0)?,
2958        test_passed: r.get(1)?,
2959        test_failed: r.get(2)?,
2960        test_skipped: r.get(3)?,
2961        build_ok: build_raw.map(|v| v != 0),
2962        lint_errors: r.get(5)?,
2963        revert_lines_14d: r.get(6)?,
2964        pr_open: r.get(7)?,
2965        ci_ok: ci_raw.map(|v| v != 0),
2966        measured_at_ms: r.get::<_, i64>(9)? as u64,
2967        measure_error: r.get(10)?,
2968    })
2969}
2970
2971fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2972    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2973    let score = r
2974        .get::<_, Option<i64>>(2)?
2975        .and_then(|v| FeedbackScore::new(v as u8));
2976    let label = r
2977        .get::<_, Option<String>>(3)?
2978        .and_then(|s| FeedbackLabel::from_str_opt(&s));
2979    Ok(FeedbackRecord {
2980        id: r.get(0)?,
2981        session_id: r.get(1)?,
2982        score,
2983        label,
2984        note: r.get(4)?,
2985        created_at_ms: r.get::<_, i64>(5)? as u64,
2986    })
2987}
2988
2989fn day_label(day_idx: u64) -> &'static str {
2990    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2991}
2992
2993fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2994    let week_ago = now.saturating_sub(7 * 86_400_000);
2995    let mut stmt = conn
2996        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2997    let days: Vec<u64> = stmt
2998        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2999        .filter_map(|r| r.ok())
3000        .map(|v| v as u64 / 86_400_000)
3001        .collect();
3002    let today = now / 86_400_000;
3003    Ok((0u64..7)
3004        .map(|i| {
3005            let d = today.saturating_sub(6 - i);
3006            (
3007                day_label(d).to_string(),
3008                days.iter().filter(|&&x| x == d).count() as u64,
3009            )
3010        })
3011        .collect())
3012}
3013
3014fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3015    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3016               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3017               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3018               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3019               COUNT(e.id) FROM sessions s \
3020               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3021               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3022    let mut stmt = conn.prepare(sql)?;
3023    let out: Vec<(SessionRecord, u64)> = stmt
3024        .query_map(params![workspace], |r| {
3025            let st: String = r.get(6)?;
3026            Ok((
3027                SessionRecord {
3028                    id: r.get(0)?,
3029                    agent: r.get(1)?,
3030                    model: r.get(2)?,
3031                    workspace: r.get(3)?,
3032                    started_at_ms: r.get::<_, i64>(4)? as u64,
3033                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3034                    status: status_from_str(&st),
3035                    trace_path: r.get(7)?,
3036                    start_commit: r.get(8)?,
3037                    end_commit: r.get(9)?,
3038                    branch: r.get(10)?,
3039                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3040                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3041                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3042                    prompt_fingerprint: r.get(14)?,
3043                    parent_session_id: r.get(15)?,
3044                    agent_version: r.get(16)?,
3045                    os: r.get(17)?,
3046                    arch: r.get(18)?,
3047                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3048                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3049                },
3050                r.get::<_, i64>(21)? as u64,
3051            ))
3052        })?
3053        .filter_map(|r| r.ok())
3054        .collect();
3055    Ok(out)
3056}
3057
3058fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3059    let mut stmt = conn.prepare(
3060        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3061         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3062    )?;
3063    let out: Vec<(String, u64)> = stmt
3064        .query_map(params![workspace], |r| {
3065            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3066        })?
3067        .filter_map(|r| r.ok())
3068        .collect();
3069    Ok(out)
3070}
3071
3072fn status_from_str(s: &str) -> SessionStatus {
3073    match s {
3074        "Running" => SessionStatus::Running,
3075        "Waiting" => SessionStatus::Waiting,
3076        "Idle" => SessionStatus::Idle,
3077        _ => SessionStatus::Done,
3078    }
3079}
3080
3081fn projector_legacy_mode() -> bool {
3082    std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3083}
3084
3085fn is_stop_event(e: &Event) -> bool {
3086    if !matches!(e.kind, EventKind::Hook) {
3087        return false;
3088    }
3089    e.payload
3090        .get("event")
3091        .and_then(|v| v.as_str())
3092        .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3093        == Some("Stop")
3094}
3095
3096fn kind_from_str(s: &str) -> EventKind {
3097    match s {
3098        "ToolCall" => EventKind::ToolCall,
3099        "ToolResult" => EventKind::ToolResult,
3100        "Message" => EventKind::Message,
3101        "Error" => EventKind::Error,
3102        "Cost" => EventKind::Cost,
3103        "Hook" => EventKind::Hook,
3104        "Lifecycle" => EventKind::Lifecycle,
3105        _ => EventKind::Hook,
3106    }
3107}
3108
3109fn source_from_str(s: &str) -> EventSource {
3110    match s {
3111        "Tail" => EventSource::Tail,
3112        "Hook" => EventSource::Hook,
3113        _ => EventSource::Proxy,
3114    }
3115}
3116
3117fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3118    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3119    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3120    ensure_column(conn, "sessions", "branch", "TEXT")?;
3121    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3122    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3123    ensure_column(
3124        conn,
3125        "sessions",
3126        "repo_binding_source",
3127        "TEXT NOT NULL DEFAULT ''",
3128    )?;
3129    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3130    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3131    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3132    ensure_column(conn, "events", "stop_reason", "TEXT")?;
3133    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3134    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3135    ensure_column(conn, "events", "retry_count", "INTEGER")?;
3136    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3137    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3138    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3139    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3140    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3141    ensure_column(
3142        conn,
3143        "sync_outbox",
3144        "kind",
3145        "TEXT NOT NULL DEFAULT 'events'",
3146    )?;
3147    ensure_column(
3148        conn,
3149        "experiments",
3150        "state",
3151        "TEXT NOT NULL DEFAULT 'Draft'",
3152    )?;
3153    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3154    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3155    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3156    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3157    ensure_column(conn, "sessions", "os", "TEXT")?;
3158    ensure_column(conn, "sessions", "arch", "TEXT")?;
3159    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3160    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3161    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3162    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3163    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3164    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3165    conn.execute_batch(
3166        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3167         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3168    )?;
3169    Ok(())
3170}
3171
3172fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3173    if has_column(conn, table, column)? {
3174        return Ok(());
3175    }
3176    conn.execute(
3177        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
3178        [],
3179    )?;
3180    Ok(())
3181}
3182
3183fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3184    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3185    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3186    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3187}
3188
3189fn bool_to_i64(v: bool) -> i64 {
3190    if v { 1 } else { 0 }
3191}
3192
3193fn i64_to_bool(v: i64) -> bool {
3194    v != 0
3195}
3196
3197fn empty_to_none(s: String) -> Option<String> {
3198    if s.is_empty() { None } else { Some(s) }
3199}
3200
3201#[cfg(test)]
3202mod tests {
3203    use super::*;
3204    use serde_json::json;
3205    use std::collections::HashSet;
3206    use tempfile::TempDir;
3207
3208    fn make_session(id: &str) -> SessionRecord {
3209        SessionRecord {
3210            id: id.to_string(),
3211            agent: "cursor".to_string(),
3212            model: None,
3213            workspace: "/ws".to_string(),
3214            started_at_ms: 1000,
3215            ended_at_ms: None,
3216            status: SessionStatus::Done,
3217            trace_path: "/trace".to_string(),
3218            start_commit: None,
3219            end_commit: None,
3220            branch: None,
3221            dirty_start: None,
3222            dirty_end: None,
3223            repo_binding_source: None,
3224            prompt_fingerprint: None,
3225            parent_session_id: None,
3226            agent_version: None,
3227            os: None,
3228            arch: None,
3229            repo_file_count: None,
3230            repo_total_loc: None,
3231        }
3232    }
3233
3234    fn make_event(session_id: &str, seq: u64) -> Event {
3235        Event {
3236            session_id: session_id.to_string(),
3237            seq,
3238            ts_ms: 1000 + seq * 100,
3239            ts_exact: false,
3240            kind: EventKind::ToolCall,
3241            source: EventSource::Tail,
3242            tool: Some("read_file".to_string()),
3243            tool_call_id: Some(format!("call_{seq}")),
3244            tokens_in: None,
3245            tokens_out: None,
3246            reasoning_tokens: None,
3247            cost_usd_e6: None,
3248            stop_reason: None,
3249            latency_ms: None,
3250            ttft_ms: None,
3251            retry_count: None,
3252            context_used_tokens: None,
3253            context_max_tokens: None,
3254            cache_creation_tokens: None,
3255            cache_read_tokens: None,
3256            system_prompt_tokens: None,
3257            payload: json!({}),
3258        }
3259    }
3260
3261    #[test]
3262    fn open_and_wal_mode() {
3263        let dir = TempDir::new().unwrap();
3264        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3265        let mode: String = store
3266            .conn
3267            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3268            .unwrap();
3269        assert_eq!(mode, "wal");
3270    }
3271
3272    #[test]
3273    fn open_applies_phase0_pragmas() {
3274        let dir = TempDir::new().unwrap();
3275        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3276        let synchronous: i64 = store
3277            .conn
3278            .query_row("PRAGMA synchronous", [], |r| r.get(0))
3279            .unwrap();
3280        let cache_size: i64 = store
3281            .conn
3282            .query_row("PRAGMA cache_size", [], |r| r.get(0))
3283            .unwrap();
3284        let temp_store: i64 = store
3285            .conn
3286            .query_row("PRAGMA temp_store", [], |r| r.get(0))
3287            .unwrap();
3288        let wal_autocheckpoint: i64 = store
3289            .conn
3290            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3291            .unwrap();
3292        assert_eq!(synchronous, 1);
3293        assert_eq!(cache_size, -65_536);
3294        assert_eq!(temp_store, 2);
3295        assert_eq!(wal_autocheckpoint, 1_000);
3296        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3297    }
3298
3299    #[test]
3300    fn read_only_open_sets_query_only() {
3301        let dir = TempDir::new().unwrap();
3302        let db = dir.path().join("kaizen.db");
3303        Store::open(&db).unwrap();
3304        let store = Store::open_read_only(&db).unwrap();
3305        let query_only: i64 = store
3306            .conn
3307            .query_row("PRAGMA query_only", [], |r| r.get(0))
3308            .unwrap();
3309        assert_eq!(query_only, 1);
3310    }
3311
3312    #[test]
3313    fn phase0_indexes_exist() {
3314        let dir = TempDir::new().unwrap();
3315        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3316        for name in [
3317            "tool_spans_session_idx",
3318            "tool_spans_started_idx",
3319            "session_samples_ts_idx",
3320            "events_ts_idx",
3321            "feedback_session_idx",
3322        ] {
3323            let found: i64 = store
3324                .conn
3325                .query_row(
3326                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3327                    params![name],
3328                    |r| r.get(0),
3329                )
3330                .unwrap();
3331            assert_eq!(found, 1, "{name}");
3332        }
3333    }
3334
3335    #[test]
3336    fn upsert_and_get_session() {
3337        let dir = TempDir::new().unwrap();
3338        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3339        let s = make_session("s1");
3340        store.upsert_session(&s).unwrap();
3341
3342        let got = store.get_session("s1").unwrap().unwrap();
3343        assert_eq!(got.id, "s1");
3344        assert_eq!(got.status, SessionStatus::Done);
3345    }
3346
3347    #[test]
3348    fn append_and_list_events_round_trip() {
3349        let dir = TempDir::new().unwrap();
3350        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3351        let s = make_session("s2");
3352        store.upsert_session(&s).unwrap();
3353        store.append_event(&make_event("s2", 0)).unwrap();
3354        store.append_event(&make_event("s2", 1)).unwrap();
3355
3356        let sessions = store.list_sessions("/ws").unwrap();
3357        assert_eq!(sessions.len(), 1);
3358        assert_eq!(sessions[0].id, "s2");
3359    }
3360
3361    #[test]
3362    fn list_sessions_page_orders_and_counts() {
3363        let dir = TempDir::new().unwrap();
3364        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3365        let mut a = make_session("a");
3366        a.started_at_ms = 2_000;
3367        let mut b = make_session("b");
3368        b.started_at_ms = 2_000;
3369        let mut c = make_session("c");
3370        c.started_at_ms = 1_000;
3371        store.upsert_session(&c).unwrap();
3372        store.upsert_session(&b).unwrap();
3373        store.upsert_session(&a).unwrap();
3374
3375        let page = store
3376            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3377            .unwrap();
3378        assert_eq!(page.total, 3);
3379        assert_eq!(page.next_offset, Some(2));
3380        assert_eq!(
3381            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3382            vec!["a", "b"]
3383        );
3384
3385        let all = store.list_sessions("/ws").unwrap();
3386        assert_eq!(
3387            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3388            vec!["a", "b", "c"]
3389        );
3390    }
3391
3392    #[test]
3393    fn list_sessions_page_filters_in_sql_shape() {
3394        let dir = TempDir::new().unwrap();
3395        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3396        let mut cursor = make_session("cursor");
3397        cursor.agent = "Cursor".into();
3398        cursor.started_at_ms = 2_000;
3399        cursor.status = SessionStatus::Running;
3400        let mut claude = make_session("claude");
3401        claude.agent = "claude".into();
3402        claude.started_at_ms = 3_000;
3403        store.upsert_session(&cursor).unwrap();
3404        store.upsert_session(&claude).unwrap();
3405
3406        let page = store
3407            .list_sessions_page(
3408                "/ws",
3409                0,
3410                10,
3411                SessionFilter {
3412                    agent_prefix: Some("cur".into()),
3413                    status: Some(SessionStatus::Running),
3414                    since_ms: Some(1_500),
3415                },
3416            )
3417            .unwrap();
3418        assert_eq!(page.total, 1);
3419        assert_eq!(page.rows[0].id, "cursor");
3420    }
3421
3422    #[test]
3423    fn incremental_session_helpers_find_new_rows_and_statuses() {
3424        let dir = TempDir::new().unwrap();
3425        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3426        let mut old = make_session("old");
3427        old.started_at_ms = 1_000;
3428        let mut new = make_session("new");
3429        new.started_at_ms = 2_000;
3430        new.status = SessionStatus::Running;
3431        store.upsert_session(&old).unwrap();
3432        store.upsert_session(&new).unwrap();
3433
3434        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3435        assert_eq!(rows.len(), 1);
3436        assert_eq!(rows[0].id, "new");
3437
3438        store
3439            .update_session_status("new", SessionStatus::Done)
3440            .unwrap();
3441        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3442        assert_eq!(statuses.len(), 1);
3443        assert_eq!(statuses[0].status, SessionStatus::Done);
3444    }
3445
3446    #[test]
3447    fn summary_stats_empty() {
3448        let dir = TempDir::new().unwrap();
3449        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3450        let stats = store.summary_stats("/ws").unwrap();
3451        assert_eq!(stats.session_count, 0);
3452        assert_eq!(stats.total_cost_usd_e6, 0);
3453    }
3454
3455    #[test]
3456    fn summary_stats_counts_sessions() {
3457        let dir = TempDir::new().unwrap();
3458        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3459        store.upsert_session(&make_session("a")).unwrap();
3460        store.upsert_session(&make_session("b")).unwrap();
3461        let stats = store.summary_stats("/ws").unwrap();
3462        assert_eq!(stats.session_count, 2);
3463        assert_eq!(stats.by_agent.len(), 1);
3464        assert_eq!(stats.by_agent[0].0, "cursor");
3465        assert_eq!(stats.by_agent[0].1, 2);
3466    }
3467
3468    #[test]
3469    fn list_events_for_session_round_trip() {
3470        let dir = TempDir::new().unwrap();
3471        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3472        store.upsert_session(&make_session("s4")).unwrap();
3473        store.append_event(&make_event("s4", 0)).unwrap();
3474        store.append_event(&make_event("s4", 1)).unwrap();
3475        let events = store.list_events_for_session("s4").unwrap();
3476        assert_eq!(events.len(), 2);
3477        assert_eq!(events[0].seq, 0);
3478        assert_eq!(events[1].seq, 1);
3479    }
3480
3481    #[test]
3482    fn list_events_page_uses_inclusive_seq_cursor() {
3483        let dir = TempDir::new().unwrap();
3484        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3485        store.upsert_session(&make_session("paged")).unwrap();
3486        for seq in 0..5 {
3487            store.append_event(&make_event("paged", seq)).unwrap();
3488        }
3489        let first = store.list_events_page("paged", 0, 2).unwrap();
3490        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3491        let second = store
3492            .list_events_page("paged", first[1].seq + 1, 2)
3493            .unwrap();
3494        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3495    }
3496
3497    #[test]
3498    fn append_event_dedup() {
3499        let dir = TempDir::new().unwrap();
3500        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3501        store.upsert_session(&make_session("s5")).unwrap();
3502        store.append_event(&make_event("s5", 0)).unwrap();
3503        // Duplicate — should be silently ignored
3504        store.append_event(&make_event("s5", 0)).unwrap();
3505        let events = store.list_events_for_session("s5").unwrap();
3506        assert_eq!(events.len(), 1);
3507    }
3508
3509    #[test]
3510    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3511        let dir = TempDir::new().unwrap();
3512        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3513        assert!(store.session_span_tree("missing").unwrap().is_empty());
3514        assert!(store.span_tree_cache.borrow().is_some());
3515
3516        store.upsert_session(&make_session("tree")).unwrap();
3517        let call = make_event("tree", 0);
3518        store.append_event(&call).unwrap();
3519        assert!(store.span_tree_cache.borrow().is_none());
3520        assert!(store.session_span_tree("tree").unwrap().is_empty());
3521        assert!(store.span_tree_cache.borrow().is_some());
3522        let mut result = make_event("tree", 1);
3523        result.kind = EventKind::ToolResult;
3524        result.tool_call_id = call.tool_call_id.clone();
3525        store.append_event(&result).unwrap();
3526        assert!(store.span_tree_cache.borrow().is_none());
3527        let first = store.session_span_tree("tree").unwrap();
3528        assert_eq!(first.len(), 1);
3529        assert!(store.span_tree_cache.borrow().is_some());
3530        store.append_event(&make_event("tree", 2)).unwrap();
3531        assert!(store.span_tree_cache.borrow().is_none());
3532    }
3533
3534    #[test]
3535    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3536        let dir = TempDir::new().unwrap();
3537        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3538        store.upsert_session(&make_session("spans")).unwrap();
3539        for (id, started, ended) in [
3540            ("started", Some(200_i64), None),
3541            ("fallback", None, Some(250_i64)),
3542            ("outside", Some(400_i64), None),
3543            ("too_old", None, Some(50_i64)),
3544            ("started_wins", Some(500_i64), Some(200_i64)),
3545        ] {
3546            store
3547                .conn
3548                .execute(
3549                    "INSERT INTO tool_spans
3550                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3551                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3552                    params![id, started, ended],
3553                )
3554                .unwrap();
3555        }
3556        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3557        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3558        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3559    }
3560
3561    #[test]
3562    fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3563        let dir = TempDir::new().unwrap();
3564        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3565        store.upsert_session(&make_session("s1")).unwrap();
3566        for (id, started, ended) in [
3567            ("inside_started", Some(150_i64), None),
3568            ("inside_ended_only", None, Some(220_i64)),
3569            ("after_window", Some(400_i64), None),
3570            ("before_window", None, Some(50_i64)),
3571        ] {
3572            store
3573                .conn
3574                .execute(
3575                    "INSERT INTO tool_spans
3576                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3577                     VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3578                    params![id, started, ended],
3579                )
3580                .unwrap();
3581        }
3582        let rows = store
3583            .tool_spans_sync_rows_in_window("/ws", 100, 300)
3584            .unwrap();
3585        let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
3586        assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
3587        assert!(rows.iter().all(|r| r.session_id == "s1"));
3588    }
3589
3590    #[test]
3591    fn upsert_idempotent() {
3592        let dir = TempDir::new().unwrap();
3593        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3594        let mut s = make_session("s3");
3595        store.upsert_session(&s).unwrap();
3596        s.status = SessionStatus::Running;
3597        store.upsert_session(&s).unwrap();
3598
3599        let got = store.get_session("s3").unwrap().unwrap();
3600        assert_eq!(got.status, SessionStatus::Running);
3601    }
3602
3603    #[test]
3604    fn append_event_indexes_path_from_payload() {
3605        let dir = TempDir::new().unwrap();
3606        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3607        store.upsert_session(&make_session("sx")).unwrap();
3608        let mut ev = make_event("sx", 0);
3609        ev.payload = json!({"input": {"path": "src/lib.rs"}});
3610        store.append_event(&ev).unwrap();
3611        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3612        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3613    }
3614
3615    #[test]
3616    fn update_session_status_changes_status() {
3617        let dir = TempDir::new().unwrap();
3618        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3619        store.upsert_session(&make_session("s6")).unwrap();
3620        store
3621            .update_session_status("s6", SessionStatus::Running)
3622            .unwrap();
3623        let got = store.get_session("s6").unwrap().unwrap();
3624        assert_eq!(got.status, SessionStatus::Running);
3625    }
3626
3627    #[test]
3628    fn prune_sessions_removes_old_rows_and_keeps_recent() {
3629        let dir = TempDir::new().unwrap();
3630        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3631        let mut old = make_session("old");
3632        old.started_at_ms = 1_000;
3633        let mut new = make_session("new");
3634        new.started_at_ms = 9_000_000_000_000;
3635        store.upsert_session(&old).unwrap();
3636        store.upsert_session(&new).unwrap();
3637        store.append_event(&make_event("old", 0)).unwrap();
3638
3639        let stats = store.prune_sessions_started_before(5_000).unwrap();
3640        assert_eq!(
3641            stats,
3642            PruneStats {
3643                sessions_removed: 1,
3644                events_removed: 1,
3645            }
3646        );
3647        assert!(store.get_session("old").unwrap().is_none());
3648        assert!(store.get_session("new").unwrap().is_some());
3649        let sessions = store.list_sessions("/ws").unwrap();
3650        assert_eq!(sessions.len(), 1);
3651        assert_eq!(sessions[0].id, "new");
3652    }
3653
3654    #[test]
3655    fn append_event_indexes_rules_from_payload() {
3656        let dir = TempDir::new().unwrap();
3657        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3658        store.upsert_session(&make_session("sr")).unwrap();
3659        let mut ev = make_event("sr", 0);
3660        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3661        store.append_event(&ev).unwrap();
3662        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3663        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3664    }
3665
3666    #[test]
3667    fn guidance_report_counts_skill_and_rule_sessions() {
3668        let dir = TempDir::new().unwrap();
3669        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3670        store.upsert_session(&make_session("sx")).unwrap();
3671        let mut ev = make_event("sx", 0);
3672        ev.payload =
3673            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3674        ev.cost_usd_e6 = Some(500_000);
3675        store.append_event(&ev).unwrap();
3676
3677        let mut skill_slugs = HashSet::new();
3678        skill_slugs.insert("tdd".into());
3679        let mut rule_slugs = HashSet::new();
3680        rule_slugs.insert("style".into());
3681
3682        let rep = store
3683            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3684            .unwrap();
3685        assert_eq!(rep.sessions_in_window, 1);
3686        let tdd = rep
3687            .rows
3688            .iter()
3689            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3690            .unwrap();
3691        assert_eq!(tdd.sessions, 1);
3692        assert!(tdd.on_disk);
3693        let style = rep
3694            .rows
3695            .iter()
3696            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3697            .unwrap();
3698        assert_eq!(style.sessions, 1);
3699        assert!(style.on_disk);
3700    }
3701
3702    #[test]
3703    fn prune_sessions_removes_rules_used_rows() {
3704        let dir = TempDir::new().unwrap();
3705        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3706        let mut old = make_session("old_r");
3707        old.started_at_ms = 1_000;
3708        store.upsert_session(&old).unwrap();
3709        let mut ev = make_event("old_r", 0);
3710        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3711        store.append_event(&ev).unwrap();
3712
3713        store.prune_sessions_started_before(5_000).unwrap();
3714        let n: i64 = store
3715            .conn
3716            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3717            .unwrap();
3718        assert_eq!(n, 0);
3719    }
3720}