Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::types::Value;
15use rusqlite::{
16    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
17};
18use std::cell::RefCell;
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21
22/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
23/// Rows below this use `sessions.started_at_ms` for time-window matching.
24const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
25const DEFAULT_MMAP_MB: u64 = 256;
26const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
27    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
28    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
29    repo_file_count, repo_total_loc FROM sessions";
30
31const MIGRATIONS: &[&str] = &[
32    "CREATE TABLE IF NOT EXISTS sessions (
33        id TEXT PRIMARY KEY,
34        agent TEXT NOT NULL,
35        model TEXT,
36        workspace TEXT NOT NULL,
37        started_at_ms INTEGER NOT NULL,
38        ended_at_ms INTEGER,
39        status TEXT NOT NULL,
40        trace_path TEXT NOT NULL
41    )",
42    "CREATE TABLE IF NOT EXISTS events (
43        id INTEGER PRIMARY KEY AUTOINCREMENT,
44        session_id TEXT NOT NULL,
45        seq INTEGER NOT NULL,
46        ts_ms INTEGER NOT NULL,
47        kind TEXT NOT NULL,
48        source TEXT NOT NULL,
49        tool TEXT,
50        tokens_in INTEGER,
51        tokens_out INTEGER,
52        cost_usd_e6 INTEGER,
53        payload TEXT NOT NULL
54    )",
55    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
56    "CREATE TABLE IF NOT EXISTS files_touched (
57        id INTEGER PRIMARY KEY AUTOINCREMENT,
58        session_id TEXT NOT NULL,
59        path TEXT NOT NULL
60    )",
61    "CREATE TABLE IF NOT EXISTS skills_used (
62        id INTEGER PRIMARY KEY AUTOINCREMENT,
63        session_id TEXT NOT NULL,
64        skill TEXT NOT NULL
65    )",
66    "CREATE TABLE IF NOT EXISTS sync_outbox (
67        id INTEGER PRIMARY KEY AUTOINCREMENT,
68        session_id TEXT NOT NULL,
69        payload TEXT NOT NULL,
70        sent INTEGER NOT NULL DEFAULT 0
71    )",
72    "CREATE TABLE IF NOT EXISTS experiments (
73        id TEXT PRIMARY KEY,
74        name TEXT NOT NULL,
75        created_at_ms INTEGER NOT NULL,
76        metadata TEXT NOT NULL DEFAULT '{}'
77    )",
78    "CREATE TABLE IF NOT EXISTS experiment_tags (
79        experiment_id TEXT NOT NULL,
80        session_id TEXT NOT NULL,
81        variant TEXT NOT NULL,
82        PRIMARY KEY (experiment_id, session_id)
83    )",
84    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
85    "CREATE TABLE IF NOT EXISTS sync_state (
86        k TEXT PRIMARY KEY,
87        v TEXT NOT NULL
88    )",
89    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
90    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
91    "CREATE TABLE IF NOT EXISTS tool_spans (
92        span_id TEXT PRIMARY KEY,
93        session_id TEXT NOT NULL,
94        tool TEXT,
95        tool_call_id TEXT,
96        status TEXT NOT NULL,
97        started_at_ms INTEGER,
98        ended_at_ms INTEGER,
99        lead_time_ms INTEGER,
100        tokens_in INTEGER,
101        tokens_out INTEGER,
102        reasoning_tokens INTEGER,
103        cost_usd_e6 INTEGER,
104        paths_json TEXT NOT NULL DEFAULT '[]'
105    )",
106    "CREATE TABLE IF NOT EXISTS tool_span_paths (
107        span_id TEXT NOT NULL,
108        path TEXT NOT NULL,
109        PRIMARY KEY (span_id, path)
110    )",
111    "CREATE TABLE IF NOT EXISTS session_repo_binding (
112        session_id TEXT PRIMARY KEY,
113        start_commit TEXT,
114        end_commit TEXT,
115        branch TEXT,
116        dirty_start INTEGER,
117        dirty_end INTEGER,
118        repo_binding_source TEXT NOT NULL DEFAULT ''
119    )",
120    "CREATE TABLE IF NOT EXISTS repo_snapshots (
121        id TEXT PRIMARY KEY,
122        workspace TEXT NOT NULL,
123        head_commit TEXT,
124        dirty_fingerprint TEXT NOT NULL,
125        analyzer_version TEXT NOT NULL,
126        indexed_at_ms INTEGER NOT NULL,
127        dirty INTEGER NOT NULL DEFAULT 0,
128        graph_path TEXT NOT NULL
129    )",
130    "CREATE TABLE IF NOT EXISTS file_facts (
131        snapshot_id TEXT NOT NULL,
132        path TEXT NOT NULL,
133        language TEXT NOT NULL,
134        bytes INTEGER NOT NULL,
135        loc INTEGER NOT NULL,
136        sloc INTEGER NOT NULL,
137        complexity_total INTEGER NOT NULL,
138        max_fn_complexity INTEGER NOT NULL,
139        symbol_count INTEGER NOT NULL,
140        import_count INTEGER NOT NULL,
141        fan_in INTEGER NOT NULL,
142        fan_out INTEGER NOT NULL,
143        churn_30d INTEGER NOT NULL,
144        churn_90d INTEGER NOT NULL,
145        authors_90d INTEGER NOT NULL,
146        last_changed_ms INTEGER,
147        PRIMARY KEY (snapshot_id, path)
148    )",
149    "CREATE TABLE IF NOT EXISTS repo_edges (
150        snapshot_id TEXT NOT NULL,
151        from_id TEXT NOT NULL,
152        to_id TEXT NOT NULL,
153        kind TEXT NOT NULL,
154        weight INTEGER NOT NULL,
155        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
156    )",
157    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
158    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
159    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
160    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
161    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
162        ON sessions(workspace, started_at_ms DESC, id ASC)",
163    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
164        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
165    "CREATE TABLE IF NOT EXISTS rules_used (
166        id INTEGER PRIMARY KEY AUTOINCREMENT,
167        session_id TEXT NOT NULL,
168        rule TEXT NOT NULL
169    )",
170    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
171    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
172    "CREATE TABLE IF NOT EXISTS remote_pull_state (
173        id INTEGER PRIMARY KEY CHECK (id = 1),
174        query_provider TEXT NOT NULL DEFAULT 'none',
175        cursor_json TEXT NOT NULL DEFAULT '',
176        last_success_ms INTEGER
177    )",
178    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
179    "CREATE TABLE IF NOT EXISTS remote_sessions (
180        team_id TEXT NOT NULL,
181        workspace_hash TEXT NOT NULL,
182        session_id_hash TEXT NOT NULL,
183        json TEXT NOT NULL,
184        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
185    )",
186    "CREATE TABLE IF NOT EXISTS remote_events (
187        team_id TEXT NOT NULL,
188        workspace_hash TEXT NOT NULL,
189        session_id_hash TEXT NOT NULL,
190        event_seq INTEGER NOT NULL,
191        json TEXT NOT NULL,
192        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
193    )",
194    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
195        team_id TEXT NOT NULL,
196        workspace_hash TEXT NOT NULL,
197        span_id_hash TEXT NOT NULL,
198        json TEXT NOT NULL,
199        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
200    )",
201    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
202        team_id TEXT NOT NULL,
203        workspace_hash TEXT NOT NULL,
204        snapshot_id_hash TEXT NOT NULL,
205        chunk_index INTEGER NOT NULL,
206        json TEXT NOT NULL,
207        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
208    )",
209    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
210        team_id TEXT NOT NULL,
211        workspace_hash TEXT NOT NULL,
212        fact_key TEXT NOT NULL,
213        json TEXT NOT NULL,
214        PRIMARY KEY (team_id, workspace_hash, fact_key)
215    )",
216    "CREATE TABLE IF NOT EXISTS session_evals (
217        id            TEXT    PRIMARY KEY,
218        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
219        judge_model   TEXT    NOT NULL,
220        rubric_id     TEXT    NOT NULL,
221        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
222        rationale     TEXT    NOT NULL,
223        flagged       INTEGER NOT NULL DEFAULT 0,
224        created_at_ms INTEGER NOT NULL
225    );
226    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
227    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
228    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
229        fingerprint   TEXT    PRIMARY KEY,
230        captured_at_ms INTEGER NOT NULL,
231        files_json    TEXT    NOT NULL,
232        total_bytes   INTEGER NOT NULL
233    )",
234    "CREATE TABLE IF NOT EXISTS session_feedback (
235        id TEXT PRIMARY KEY,
236        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
237        score INTEGER CHECK(score BETWEEN 1 AND 5),
238        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
239        note TEXT,
240        created_at_ms INTEGER NOT NULL
241    );
242    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
243    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
244    "CREATE TABLE IF NOT EXISTS session_outcomes (
245        session_id TEXT PRIMARY KEY NOT NULL,
246        test_passed INTEGER,
247        test_failed INTEGER,
248        test_skipped INTEGER,
249        build_ok INTEGER,
250        lint_errors INTEGER,
251        revert_lines_14d INTEGER,
252        pr_open INTEGER,
253        ci_ok INTEGER,
254        measured_at_ms INTEGER NOT NULL,
255        measure_error TEXT
256    )",
257    "CREATE TABLE IF NOT EXISTS session_samples (
258        session_id TEXT NOT NULL,
259        ts_ms INTEGER NOT NULL,
260        pid INTEGER NOT NULL,
261        cpu_percent REAL,
262        rss_bytes INTEGER,
263        PRIMARY KEY (session_id, ts_ms, pid)
264    )",
265    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
266    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
267    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
268    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
269    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
270    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
271    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
272];
273
274/// Per-workspace activity dashboard stats.
275#[derive(Clone)]
276pub struct InsightsStats {
277    pub total_sessions: u64,
278    pub running_sessions: u64,
279    pub total_events: u64,
280    /// (day label e.g. "Mon", count) last 7 days oldest first
281    pub sessions_by_day: Vec<(String, u64)>,
282    /// Recent sessions DESC by started_at, max 3; paired with event count
283    pub recent: Vec<(SessionRecord, u64)>,
284    /// Top tools by event count, max 5
285    pub top_tools: Vec<(String, u64)>,
286    pub total_cost_usd_e6: i64,
287    pub sessions_with_cost: u64,
288}
289
290/// Sync daemon / outbox status for `kaizen sync status`.
291pub struct SyncStatusSnapshot {
292    pub pending_outbox: u64,
293    pub last_success_ms: Option<u64>,
294    pub last_error: Option<String>,
295    pub consecutive_failures: u32,
296}
297
298/// Aggregate stats across sessions + events for a workspace.
299#[derive(serde::Serialize)]
300pub struct SummaryStats {
301    pub session_count: u64,
302    pub total_cost_usd_e6: i64,
303    pub by_agent: Vec<(String, u64)>,
304    pub by_model: Vec<(String, u64)>,
305    pub top_tools: Vec<(String, u64)>,
306}
307
308/// Skill vs Cursor rule for [`GuidancePerfRow`].
309#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
310#[serde(rename_all = "lowercase")]
311pub enum GuidanceKind {
312    Skill,
313    Rule,
314}
315
316/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
317#[derive(Clone, Debug, serde::Serialize)]
318pub struct GuidancePerfRow {
319    pub kind: GuidanceKind,
320    pub id: String,
321    pub sessions: u64,
322    pub sessions_pct: f64,
323    pub total_cost_usd_e6: i64,
324    pub avg_cost_per_session_usd: Option<f64>,
325    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
326    pub on_disk: bool,
327}
328
329/// Aggregated skill/rule adoption and cost proxy for a time window.
330#[derive(Clone, Debug, serde::Serialize)]
331pub struct GuidanceReport {
332    pub workspace: String,
333    pub window_start_ms: u64,
334    pub window_end_ms: u64,
335    pub sessions_in_window: u64,
336    pub workspace_avg_cost_per_session_usd: Option<f64>,
337    pub rows: Vec<GuidancePerfRow>,
338}
339
340/// Result of [`Store::prune_sessions_started_before`].
341#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
342pub struct PruneStats {
343    pub sessions_removed: u64,
344    pub events_removed: u64,
345}
346
347/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
348#[derive(Debug, Clone, Eq, PartialEq)]
349pub struct SessionOutcomeRow {
350    pub session_id: String,
351    pub test_passed: Option<i64>,
352    pub test_failed: Option<i64>,
353    pub test_skipped: Option<i64>,
354    pub build_ok: Option<bool>,
355    pub lint_errors: Option<i64>,
356    pub revert_lines_14d: Option<i64>,
357    pub pr_open: Option<i64>,
358    pub ci_ok: Option<bool>,
359    pub measured_at_ms: u64,
360    pub measure_error: Option<String>,
361}
362
363/// Aggregated process samples for retro (Tier D).
364#[derive(Debug, Clone)]
365pub struct SessionSampleAgg {
366    pub session_id: String,
367    pub sample_count: u64,
368    pub max_cpu_percent: f64,
369    pub max_rss_bytes: u64,
370}
371
372/// `sync_state` keys for agent rescan throttling and auto-prune.
373pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
374pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
375
376pub struct ToolSpanSyncRow {
377    pub span_id: String,
378    pub session_id: String,
379    pub tool: Option<String>,
380    pub tool_call_id: Option<String>,
381    pub status: String,
382    pub started_at_ms: Option<u64>,
383    pub ended_at_ms: Option<u64>,
384    pub lead_time_ms: Option<u64>,
385    pub tokens_in: Option<u32>,
386    pub tokens_out: Option<u32>,
387    pub reasoning_tokens: Option<u32>,
388    pub cost_usd_e6: Option<i64>,
389    pub paths: Vec<String>,
390}
391
392#[derive(Debug, Clone, Copy, Eq, PartialEq)]
393pub enum StoreOpenMode {
394    ReadWrite,
395    ReadOnlyQuery,
396}
397
398#[derive(Debug, Clone)]
399pub struct SessionStatusRow {
400    pub id: String,
401    pub status: SessionStatus,
402    pub ended_at_ms: Option<u64>,
403}
404
405#[derive(Debug, Clone, Default)]
406pub struct SessionFilter {
407    pub agent_prefix: Option<String>,
408    pub status: Option<SessionStatus>,
409    pub since_ms: Option<u64>,
410}
411
412#[derive(Debug, Clone)]
413pub struct SessionPage {
414    pub rows: Vec<SessionRecord>,
415    pub total: usize,
416    pub next_offset: Option<usize>,
417}
418
419#[derive(Clone)]
420struct SpanTreeCacheEntry {
421    session_id: String,
422    last_event_seq: Option<u64>,
423    nodes: Vec<crate::store::span_tree::SpanNode>,
424}
425
426pub struct Store {
427    conn: Connection,
428    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
429}
430
431impl Store {
432    pub(crate) fn conn(&self) -> &Connection {
433        &self.conn
434    }
435
436    pub fn open(path: &Path) -> Result<Self> {
437        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
438    }
439
440    pub fn open_read_only(path: &Path) -> Result<Self> {
441        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
442    }
443
444    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
445        if let Some(parent) = path.parent() {
446            std::fs::create_dir_all(parent)?;
447        }
448        let conn = match mode {
449            StoreOpenMode::ReadWrite => Connection::open(path),
450            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
451                path,
452                OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
453            ),
454        }
455        .with_context(|| format!("open db: {}", path.display()))?;
456        apply_pragmas(&conn, mode)?;
457        if mode == StoreOpenMode::ReadWrite {
458            for sql in MIGRATIONS {
459                conn.execute_batch(sql)?;
460            }
461            ensure_schema_columns(&conn)?;
462        }
463        Ok(Self {
464            conn,
465            span_tree_cache: RefCell::new(None),
466        })
467    }
468
469    fn invalidate_span_tree_cache(&self) {
470        *self.span_tree_cache.borrow_mut() = None;
471    }
472
473    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
474        self.conn.execute(
475            "INSERT INTO sessions (
476                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
477                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
478                prompt_fingerprint, parent_session_id, agent_version, os, arch,
479                repo_file_count, repo_total_loc
480             )
481             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
482                ?16, ?17, ?18, ?19, ?20, ?21)
483             ON CONFLICT(id) DO UPDATE SET
484               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
485               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
486               status=excluded.status, trace_path=excluded.trace_path,
487               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
488               branch=excluded.branch, dirty_start=excluded.dirty_start,
489               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
490               prompt_fingerprint=excluded.prompt_fingerprint,
491               parent_session_id=excluded.parent_session_id,
492               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
493               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
494            params![
495                s.id,
496                s.agent,
497                s.model,
498                s.workspace,
499                s.started_at_ms as i64,
500                s.ended_at_ms.map(|v| v as i64),
501                format!("{:?}", s.status),
502                s.trace_path,
503                s.start_commit,
504                s.end_commit,
505                s.branch,
506                s.dirty_start.map(bool_to_i64),
507                s.dirty_end.map(bool_to_i64),
508                s.repo_binding_source.clone().unwrap_or_default(),
509                s.prompt_fingerprint.as_deref(),
510                s.parent_session_id.as_deref(),
511                s.agent_version.as_deref(),
512                s.os.as_deref(),
513                s.arch.as_deref(),
514                s.repo_file_count.map(|v| v as i64),
515                s.repo_total_loc.map(|v| v as i64),
516            ],
517        )?;
518        self.conn.execute(
519            "INSERT INTO session_repo_binding (
520                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
521             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
522             ON CONFLICT(session_id) DO UPDATE SET
523                start_commit=excluded.start_commit,
524                end_commit=excluded.end_commit,
525                branch=excluded.branch,
526                dirty_start=excluded.dirty_start,
527                dirty_end=excluded.dirty_end,
528                repo_binding_source=excluded.repo_binding_source",
529            params![
530                s.id,
531                s.start_commit,
532                s.end_commit,
533                s.branch,
534                s.dirty_start.map(bool_to_i64),
535                s.dirty_end.map(bool_to_i64),
536                s.repo_binding_source.clone().unwrap_or_default(),
537            ],
538        )?;
539        Ok(())
540    }
541
542    /// Insert a minimal session row if none exists. Used by hook ingestion when
543    /// the first observed event is not `SessionStart` (hooks installed mid-session).
544    pub fn ensure_session_stub(
545        &self,
546        id: &str,
547        agent: &str,
548        workspace: &str,
549        started_at_ms: u64,
550    ) -> Result<()> {
551        self.conn.execute(
552            "INSERT OR IGNORE INTO sessions (
553                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
554                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
555                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
556             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
557                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
558            params![id, agent, workspace, started_at_ms as i64],
559        )?;
560        Ok(())
561    }
562
563    /// Next `seq` for a new event in this session (0 when there are no events yet).
564    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
565        let n: i64 = self.conn.query_row(
566            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
567            [session_id],
568            |r| r.get(0),
569        )?;
570        Ok(n as u64)
571    }
572
573    pub fn append_event(&self, e: &Event) -> Result<()> {
574        self.append_event_with_sync(e, None)
575    }
576
577    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
578    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
579        let payload = serde_json::to_string(&e.payload)?;
580        self.conn.execute(
581            "INSERT INTO events (
582                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
583                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
584                stop_reason, latency_ms, ttft_ms, retry_count,
585                context_used_tokens, context_max_tokens,
586                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
587             ) VALUES (
588                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
589                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
590             )
591             ON CONFLICT(session_id, seq) DO UPDATE SET
592                ts_ms = excluded.ts_ms,
593                ts_exact = excluded.ts_exact,
594                kind = excluded.kind,
595                source = excluded.source,
596                tool = excluded.tool,
597                tool_call_id = excluded.tool_call_id,
598                tokens_in = excluded.tokens_in,
599                tokens_out = excluded.tokens_out,
600                reasoning_tokens = excluded.reasoning_tokens,
601                cost_usd_e6 = excluded.cost_usd_e6,
602                payload = excluded.payload,
603                stop_reason = excluded.stop_reason,
604                latency_ms = excluded.latency_ms,
605                ttft_ms = excluded.ttft_ms,
606                retry_count = excluded.retry_count,
607                context_used_tokens = excluded.context_used_tokens,
608                context_max_tokens = excluded.context_max_tokens,
609                cache_creation_tokens = excluded.cache_creation_tokens,
610                cache_read_tokens = excluded.cache_read_tokens,
611                system_prompt_tokens = excluded.system_prompt_tokens",
612            params![
613                e.session_id,
614                e.seq as i64,
615                e.ts_ms as i64,
616                bool_to_i64(e.ts_exact),
617                format!("{:?}", e.kind),
618                format!("{:?}", e.source),
619                e.tool,
620                e.tool_call_id,
621                e.tokens_in.map(|v| v as i64),
622                e.tokens_out.map(|v| v as i64),
623                e.reasoning_tokens.map(|v| v as i64),
624                e.cost_usd_e6,
625                payload,
626                e.stop_reason,
627                e.latency_ms.map(|v| v as i64),
628                e.ttft_ms.map(|v| v as i64),
629                e.retry_count.map(|v| v as i64),
630                e.context_used_tokens.map(|v| v as i64),
631                e.context_max_tokens.map(|v| v as i64),
632                e.cache_creation_tokens.map(|v| v as i64),
633                e.cache_read_tokens.map(|v| v as i64),
634                e.system_prompt_tokens.map(|v| v as i64),
635            ],
636        )?;
637        if self.conn.changes() == 0 {
638            return Ok(());
639        }
640        index_event_derived(&self.conn, e)?;
641        rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
642        self.invalidate_span_tree_cache();
643        let Some(ctx) = ctx else {
644            return Ok(());
645        };
646        let sync = &ctx.sync;
647        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
648            return Ok(());
649        }
650        let Some(salt) = try_team_salt(sync) else {
651            tracing::warn!(
652                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
653            );
654            return Ok(());
655        };
656        if sync.sample_rate < 1.0 {
657            let u: f64 = rand::random();
658            if u > sync.sample_rate {
659                return Ok(());
660            }
661        }
662        let Some(session) = self.get_session(&e.session_id)? else {
663            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
664            return Ok(());
665        };
666        let mut outbound = outbound_event_from_row(e, &session, &salt);
667        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
668        let row = serde_json::to_string(&outbound)?;
669        self.conn.execute(
670            "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
671            params![e.session_id, row],
672        )?;
673        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
674        Ok(())
675    }
676
677    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
678        let mut stmt = self.conn.prepare(
679            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
680        )?;
681        let rows = stmt.query_map(params![limit as i64], |row| {
682            Ok((
683                row.get::<_, i64>(0)?,
684                row.get::<_, String>(1)?,
685                row.get::<_, String>(2)?,
686            ))
687        })?;
688        let mut out = Vec::new();
689        for r in rows {
690            out.push(r?);
691        }
692        Ok(out)
693    }
694
695    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
696        for id in ids {
697            self.conn
698                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
699        }
700        Ok(())
701    }
702
703    pub fn replace_outbox_rows(
704        &self,
705        owner_id: &str,
706        kind: &str,
707        payloads: &[String],
708    ) -> Result<()> {
709        self.conn.execute(
710            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
711            params![owner_id, kind],
712        )?;
713        for payload in payloads {
714            self.conn.execute(
715                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
716                params![owner_id, kind, payload],
717            )?;
718        }
719        Ok(())
720    }
721
722    pub fn outbox_pending_count(&self) -> Result<u64> {
723        let c: i64 =
724            self.conn
725                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
726                    r.get(0)
727                })?;
728        Ok(c as u64)
729    }
730
731    pub fn set_sync_state_ok(&self) -> Result<()> {
732        let now = now_ms().to_string();
733        self.conn.execute(
734            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
735             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
736            params![now],
737        )?;
738        self.conn.execute(
739            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
740             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
741            [],
742        )?;
743        self.conn
744            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
745        Ok(())
746    }
747
748    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
749        let prev: i64 = self
750            .conn
751            .query_row(
752                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
753                [],
754                |r| {
755                    let s: String = r.get(0)?;
756                    Ok(s.parse::<i64>().unwrap_or(0))
757                },
758            )
759            .optional()?
760            .unwrap_or(0);
761        let next = prev.saturating_add(1);
762        self.conn.execute(
763            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
764             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
765            params![msg],
766        )?;
767        self.conn.execute(
768            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
769             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
770            params![next.to_string()],
771        )?;
772        Ok(())
773    }
774
775    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
776        let pending_outbox = self.outbox_pending_count()?;
777        let last_success_ms = self
778            .conn
779            .query_row(
780                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
781                [],
782                |r| r.get::<_, String>(0),
783            )
784            .optional()?
785            .and_then(|s| s.parse().ok());
786        let last_error = self
787            .conn
788            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
789                r.get::<_, String>(0)
790            })
791            .optional()?;
792        let consecutive_failures = self
793            .conn
794            .query_row(
795                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
796                [],
797                |r| r.get::<_, String>(0),
798            )
799            .optional()?
800            .and_then(|s| s.parse().ok())
801            .unwrap_or(0);
802        Ok(SyncStatusSnapshot {
803            pending_outbox,
804            last_success_ms,
805            last_error,
806            consecutive_failures,
807        })
808    }
809
810    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
811        let row: Option<String> = self
812            .conn
813            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
814                r.get::<_, String>(0)
815            })
816            .optional()?;
817        Ok(row.and_then(|s| s.parse().ok()))
818    }
819
820    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
821        self.conn.execute(
822            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
823             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
824            params![key, v.to_string()],
825        )?;
826        Ok(())
827    }
828
829    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
830    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
831        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
832        let sessions_to_remove: i64 = tx.query_row(
833            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
834            params![cutoff_ms],
835            |r| r.get(0),
836        )?;
837        let events_to_remove: i64 = tx.query_row(
838            "SELECT COUNT(*) FROM events WHERE session_id IN \
839             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
840            params![cutoff_ms],
841            |r| r.get(0),
842        )?;
843
844        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
845        tx.execute(
846            &format!(
847                "DELETE FROM tool_span_paths WHERE span_id IN \
848                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
849            ),
850            params![cutoff_ms],
851        )?;
852        tx.execute(
853            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
854            params![cutoff_ms],
855        )?;
856        tx.execute(
857            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
858            params![cutoff_ms],
859        )?;
860        tx.execute(
861            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
862            params![cutoff_ms],
863        )?;
864        tx.execute(
865            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
866            params![cutoff_ms],
867        )?;
868        tx.execute(
869            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
870            params![cutoff_ms],
871        )?;
872        tx.execute(
873            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
874            params![cutoff_ms],
875        )?;
876        tx.execute(
877            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
878            params![cutoff_ms],
879        )?;
880        tx.execute(
881            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
882            params![cutoff_ms],
883        )?;
884        tx.execute(
885            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
886            params![cutoff_ms],
887        )?;
888        tx.execute(
889            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
890            params![cutoff_ms],
891        )?;
892        tx.execute(
893            "DELETE FROM sessions WHERE started_at_ms < ?1",
894            params![cutoff_ms],
895        )?;
896        tx.commit()?;
897        self.invalidate_span_tree_cache();
898        Ok(PruneStats {
899            sessions_removed: sessions_to_remove as u64,
900            events_removed: events_to_remove as u64,
901        })
902    }
903
904    /// Reclaim file space after large deletes (exclusive lock; can be slow).
905    pub fn vacuum(&self) -> Result<()> {
906        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
907        Ok(())
908    }
909
910    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
911        Ok(self
912            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
913            .rows)
914    }
915
916    pub fn list_sessions_page(
917        &self,
918        workspace: &str,
919        offset: usize,
920        limit: usize,
921        filter: SessionFilter,
922    ) -> Result<SessionPage> {
923        let (where_sql, args) = session_filter_sql(workspace, &filter);
924        let total = self.query_session_page_count(&where_sql, &args)?;
925        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
926        let next = offset.saturating_add(rows.len());
927        Ok(SessionPage {
928            rows,
929            total,
930            next_offset: (next < total).then_some(next),
931        })
932    }
933
934    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
935        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
936        let total: i64 = self
937            .conn
938            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
939        Ok(total as usize)
940    }
941
942    fn query_session_page_rows(
943        &self,
944        where_sql: &str,
945        args: &[Value],
946        offset: usize,
947        limit: usize,
948    ) -> Result<Vec<SessionRecord>> {
949        let sql = format!(
950            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
951        );
952        let mut values = args.to_vec();
953        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
954        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
955        let mut stmt = self.conn.prepare(&sql)?;
956        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
957        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
958    }
959
960    pub fn list_sessions_started_after(
961        &self,
962        workspace: &str,
963        after_started_at_ms: u64,
964    ) -> Result<Vec<SessionRecord>> {
965        let mut stmt = self.conn.prepare(
966            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
967                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
968                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
969                    repo_file_count, repo_total_loc
970             FROM sessions
971             WHERE workspace = ?1 AND started_at_ms > ?2
972             ORDER BY started_at_ms DESC, id ASC",
973        )?;
974        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
975        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
976    }
977
978    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
979        if ids.is_empty() {
980            return Ok(Vec::new());
981        }
982        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
983        let sql =
984            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
985        let mut stmt = self.conn.prepare(&sql)?;
986        let params: Vec<&dyn rusqlite::ToSql> =
987            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
988        let rows = stmt.query_map(params.as_slice(), |r| {
989            let status: String = r.get(1)?;
990            Ok(SessionStatusRow {
991                id: r.get(0)?,
992                status: status_from_str(&status),
993                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
994            })
995        })?;
996        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
997    }
998
999    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1000        let session_count: i64 = self.conn.query_row(
1001            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1002            params![workspace],
1003            |r| r.get(0),
1004        )?;
1005
1006        let total_cost: i64 = self.conn.query_row(
1007            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1008             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1009            params![workspace],
1010            |r| r.get(0),
1011        )?;
1012
1013        let mut stmt = self.conn.prepare(
1014            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1015        )?;
1016        let by_agent: Vec<(String, u64)> = stmt
1017            .query_map(params![workspace], |r| {
1018                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1019            })?
1020            .filter_map(|r| r.ok())
1021            .collect();
1022
1023        let mut stmt = self.conn.prepare(
1024            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1025        )?;
1026        let by_model: Vec<(String, u64)> = stmt
1027            .query_map(params![workspace], |r| {
1028                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1029            })?
1030            .filter_map(|r| r.ok())
1031            .collect();
1032
1033        let mut stmt = self.conn.prepare(
1034            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1035             WHERE s.workspace = ?1 AND tool IS NOT NULL
1036             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1037        )?;
1038        let top_tools: Vec<(String, u64)> = stmt
1039            .query_map(params![workspace], |r| {
1040                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1041            })?
1042            .filter_map(|r| r.ok())
1043            .collect();
1044
1045        Ok(SummaryStats {
1046            session_count: session_count as u64,
1047            total_cost_usd_e6: total_cost,
1048            by_agent,
1049            by_model,
1050            top_tools,
1051        })
1052    }
1053
1054    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1055        self.list_events_page(session_id, 0, i64::MAX as usize)
1056    }
1057
1058    pub fn list_events_page(
1059        &self,
1060        session_id: &str,
1061        after_seq: u64,
1062        limit: usize,
1063    ) -> Result<Vec<Event>> {
1064        let mut stmt = self.conn.prepare(
1065            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1066                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1067                    stop_reason, latency_ms, ttft_ms, retry_count,
1068                    context_used_tokens, context_max_tokens,
1069                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1070             FROM events
1071             WHERE session_id = ?1 AND seq >= ?2
1072             ORDER BY seq ASC LIMIT ?3",
1073        )?;
1074        let rows = stmt.query_map(
1075            params![
1076                session_id,
1077                after_seq as i64,
1078                limit.min(i64::MAX as usize) as i64
1079            ],
1080            event_row,
1081        )?;
1082        let mut events = Vec::new();
1083        for row in rows {
1084            events.push(row?);
1085        }
1086        Ok(events)
1087    }
1088
1089    /// Update only status for existing session.
1090    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1091        self.conn.execute(
1092            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1093            params![format!("{:?}", status), id],
1094        )?;
1095        Ok(())
1096    }
1097
1098    /// Workspace activity dashboard — feeds `cmd_insights`.
1099    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1100        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1101        Ok(InsightsStats {
1102            total_sessions: count_q(
1103                &self.conn,
1104                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1105                workspace,
1106            )?,
1107            running_sessions: count_q(
1108                &self.conn,
1109                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1110                workspace,
1111            )?,
1112            total_events: count_q(
1113                &self.conn,
1114                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1115                workspace,
1116            )?,
1117            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1118            recent: recent_sessions_3(&self.conn, workspace)?,
1119            top_tools: top_tools_5(&self.conn, workspace)?,
1120            total_cost_usd_e6,
1121            sessions_with_cost,
1122        })
1123    }
1124
1125    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1126    pub fn retro_events_in_window(
1127        &self,
1128        workspace: &str,
1129        start_ms: u64,
1130        end_ms: u64,
1131    ) -> Result<Vec<(SessionRecord, Event)>> {
1132        let mut stmt = self.conn.prepare(
1133            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1134                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1135                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1136                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1137                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1138                    s.repo_file_count, s.repo_total_loc,
1139                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1140                    e.context_used_tokens, e.context_max_tokens,
1141                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1142             FROM events e
1143             JOIN sessions s ON s.id = e.session_id
1144             WHERE s.workspace = ?1
1145               AND (
1146                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1147                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1148               )
1149             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1150        )?;
1151        let rows = stmt.query_map(
1152            params![
1153                workspace,
1154                start_ms as i64,
1155                end_ms as i64,
1156                SYNTHETIC_TS_CEILING_MS,
1157            ],
1158            |row| {
1159                let payload_str: String = row.get(12)?;
1160                let status_str: String = row.get(19)?;
1161                Ok((
1162                    SessionRecord {
1163                        id: row.get(13)?,
1164                        agent: row.get(14)?,
1165                        model: row.get(15)?,
1166                        workspace: row.get(16)?,
1167                        started_at_ms: row.get::<_, i64>(17)? as u64,
1168                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1169                        status: status_from_str(&status_str),
1170                        trace_path: row.get(20)?,
1171                        start_commit: row.get(21)?,
1172                        end_commit: row.get(22)?,
1173                        branch: row.get(23)?,
1174                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1175                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1176                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1177                        prompt_fingerprint: row.get(27)?,
1178                        parent_session_id: row.get(28)?,
1179                        agent_version: row.get(29)?,
1180                        os: row.get(30)?,
1181                        arch: row.get(31)?,
1182                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1183                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1184                    },
1185                    Event {
1186                        session_id: row.get(0)?,
1187                        seq: row.get::<_, i64>(1)? as u64,
1188                        ts_ms: row.get::<_, i64>(2)? as u64,
1189                        ts_exact: row.get::<_, i64>(3)? != 0,
1190                        kind: kind_from_str(&row.get::<_, String>(4)?),
1191                        source: source_from_str(&row.get::<_, String>(5)?),
1192                        tool: row.get(6)?,
1193                        tool_call_id: row.get(7)?,
1194                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1195                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1196                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1197                        cost_usd_e6: row.get(11)?,
1198                        payload: serde_json::from_str(&payload_str)
1199                            .unwrap_or(serde_json::Value::Null),
1200                        stop_reason: row.get(34)?,
1201                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1202                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1203                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1204                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1205                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1206                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1207                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1208                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1209                    },
1210                ))
1211            },
1212        )?;
1213
1214        let mut out = Vec::new();
1215        for r in rows {
1216            out.push(r?);
1217        }
1218        Ok(out)
1219    }
1220
1221    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1222    pub fn files_touched_in_window(
1223        &self,
1224        workspace: &str,
1225        start_ms: u64,
1226        end_ms: u64,
1227    ) -> Result<Vec<(String, String)>> {
1228        let mut stmt = self.conn.prepare(
1229            "SELECT DISTINCT ft.session_id, ft.path
1230             FROM files_touched ft
1231             JOIN sessions s ON s.id = ft.session_id
1232             WHERE s.workspace = ?1
1233               AND EXISTS (
1234                 SELECT 1 FROM events e
1235                 JOIN sessions ss ON ss.id = e.session_id
1236                 WHERE e.session_id = ft.session_id
1237                   AND (
1238                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1239                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1240                   )
1241               )
1242             ORDER BY ft.session_id, ft.path",
1243        )?;
1244        let out: Vec<(String, String)> = stmt
1245            .query_map(
1246                params![
1247                    workspace,
1248                    start_ms as i64,
1249                    end_ms as i64,
1250                    SYNTHETIC_TS_CEILING_MS,
1251                ],
1252                |r| Ok((r.get(0)?, r.get(1)?)),
1253            )?
1254            .filter_map(|r| r.ok())
1255            .collect();
1256        Ok(out)
1257    }
1258
1259    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1260    /// (any session with an indexed skill row; join events optional — use row existence).
1261    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1262        let mut stmt = self.conn.prepare(
1263            "SELECT DISTINCT su.skill
1264             FROM skills_used su
1265             JOIN sessions s ON s.id = su.session_id
1266             WHERE s.workspace = ?1
1267               AND EXISTS (
1268                 SELECT 1 FROM events e
1269                 JOIN sessions ss ON ss.id = e.session_id
1270                 WHERE e.session_id = su.session_id
1271                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1272               )
1273             ORDER BY su.skill",
1274        )?;
1275        let out: Vec<String> = stmt
1276            .query_map(
1277                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1278                |r| r.get::<_, String>(0),
1279            )?
1280            .filter_map(|r| r.ok())
1281            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1282            .collect();
1283        Ok(out)
1284    }
1285
1286    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1287    pub fn skills_used_in_window(
1288        &self,
1289        workspace: &str,
1290        start_ms: u64,
1291        end_ms: u64,
1292    ) -> Result<Vec<(String, String)>> {
1293        let mut stmt = self.conn.prepare(
1294            "SELECT DISTINCT su.session_id, su.skill
1295             FROM skills_used su
1296             JOIN sessions s ON s.id = su.session_id
1297             WHERE s.workspace = ?1
1298               AND EXISTS (
1299                 SELECT 1 FROM events e
1300                 JOIN sessions ss ON ss.id = e.session_id
1301                 WHERE e.session_id = su.session_id
1302                   AND (
1303                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1304                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1305                   )
1306               )
1307             ORDER BY su.session_id, su.skill",
1308        )?;
1309        let out: Vec<(String, String)> = stmt
1310            .query_map(
1311                params![
1312                    workspace,
1313                    start_ms as i64,
1314                    end_ms as i64,
1315                    SYNTHETIC_TS_CEILING_MS,
1316                ],
1317                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1318            )?
1319            .filter_map(|r| r.ok())
1320            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1321            .collect();
1322        Ok(out)
1323    }
1324
1325    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1326    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1327        let mut stmt = self.conn.prepare(
1328            "SELECT DISTINCT ru.rule
1329             FROM rules_used ru
1330             JOIN sessions s ON s.id = ru.session_id
1331             WHERE s.workspace = ?1
1332               AND EXISTS (
1333                 SELECT 1 FROM events e
1334                 JOIN sessions ss ON ss.id = e.session_id
1335                 WHERE e.session_id = ru.session_id
1336                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1337               )
1338             ORDER BY ru.rule",
1339        )?;
1340        let out: Vec<String> = stmt
1341            .query_map(
1342                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1343                |r| r.get::<_, String>(0),
1344            )?
1345            .filter_map(|r| r.ok())
1346            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1347            .collect();
1348        Ok(out)
1349    }
1350
1351    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1352    pub fn rules_used_in_window(
1353        &self,
1354        workspace: &str,
1355        start_ms: u64,
1356        end_ms: u64,
1357    ) -> Result<Vec<(String, String)>> {
1358        let mut stmt = self.conn.prepare(
1359            "SELECT DISTINCT ru.session_id, ru.rule
1360             FROM rules_used ru
1361             JOIN sessions s ON s.id = ru.session_id
1362             WHERE s.workspace = ?1
1363               AND EXISTS (
1364                 SELECT 1 FROM events e
1365                 JOIN sessions ss ON ss.id = e.session_id
1366                 WHERE e.session_id = ru.session_id
1367                   AND (
1368                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1369                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1370                   )
1371               )
1372             ORDER BY ru.session_id, ru.rule",
1373        )?;
1374        let out: Vec<(String, String)> = stmt
1375            .query_map(
1376                params![
1377                    workspace,
1378                    start_ms as i64,
1379                    end_ms as i64,
1380                    SYNTHETIC_TS_CEILING_MS,
1381                ],
1382                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1383            )?
1384            .filter_map(|r| r.ok())
1385            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1386            .collect();
1387        Ok(out)
1388    }
1389
1390    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1391    pub fn sessions_active_in_window(
1392        &self,
1393        workspace: &str,
1394        start_ms: u64,
1395        end_ms: u64,
1396    ) -> Result<HashSet<String>> {
1397        let mut stmt = self.conn.prepare(
1398            "SELECT DISTINCT s.id
1399             FROM sessions s
1400             WHERE s.workspace = ?1
1401               AND EXISTS (
1402                 SELECT 1 FROM events e
1403                 WHERE e.session_id = s.id
1404                   AND (
1405                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1406                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1407                   )
1408               )",
1409        )?;
1410        let out: HashSet<String> = stmt
1411            .query_map(
1412                params![
1413                    workspace,
1414                    start_ms as i64,
1415                    end_ms as i64,
1416                    SYNTHETIC_TS_CEILING_MS,
1417                ],
1418                |r| r.get(0),
1419            )?
1420            .filter_map(|r| r.ok())
1421            .collect();
1422        Ok(out)
1423    }
1424
1425    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1426    pub fn session_costs_usd_e6_in_window(
1427        &self,
1428        workspace: &str,
1429        start_ms: u64,
1430        end_ms: u64,
1431    ) -> Result<HashMap<String, i64>> {
1432        let mut stmt = self.conn.prepare(
1433            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1434             FROM events e
1435             JOIN sessions s ON s.id = e.session_id
1436             WHERE s.workspace = ?1
1437               AND (
1438                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1439                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1440               )
1441             GROUP BY e.session_id",
1442        )?;
1443        let rows: Vec<(String, i64)> = stmt
1444            .query_map(
1445                params![
1446                    workspace,
1447                    start_ms as i64,
1448                    end_ms as i64,
1449                    SYNTHETIC_TS_CEILING_MS,
1450                ],
1451                |r| Ok((r.get(0)?, r.get(1)?)),
1452            )?
1453            .filter_map(|r| r.ok())
1454            .collect();
1455        Ok(rows.into_iter().collect())
1456    }
1457
1458    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1459    pub fn guidance_report(
1460        &self,
1461        workspace: &str,
1462        window_start_ms: u64,
1463        window_end_ms: u64,
1464        skill_slugs_on_disk: &HashSet<String>,
1465        rule_slugs_on_disk: &HashSet<String>,
1466    ) -> Result<GuidanceReport> {
1467        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1468        let denom = active.len() as u64;
1469        let costs =
1470            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1471
1472        let workspace_avg_cost_per_session_usd = if denom > 0 {
1473            let total_e6: i64 = active
1474                .iter()
1475                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1476                .sum();
1477            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1478        } else {
1479            None
1480        };
1481
1482        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1483        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1484            skill_sessions.entry(skill).or_default().insert(sid);
1485        }
1486        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1487        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1488            rule_sessions.entry(rule).or_default().insert(sid);
1489        }
1490
1491        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1492
1493        let mut push_row =
1494            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1495                let sessions = sids.len() as u64;
1496                let sessions_pct = if denom > 0 {
1497                    sessions as f64 * 100.0 / denom as f64
1498                } else {
1499                    0.0
1500                };
1501                let total_cost_usd_e6: i64 = sids
1502                    .iter()
1503                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1504                    .sum();
1505                let avg_cost_per_session_usd = if sessions > 0 {
1506                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1507                } else {
1508                    None
1509                };
1510                let vs_workspace_avg_cost_per_session_usd =
1511                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1512                        (Some(avg), Some(w)) => Some(avg - w),
1513                        _ => None,
1514                    };
1515                rows.push(GuidancePerfRow {
1516                    kind,
1517                    id,
1518                    sessions,
1519                    sessions_pct,
1520                    total_cost_usd_e6,
1521                    avg_cost_per_session_usd,
1522                    vs_workspace_avg_cost_per_session_usd,
1523                    on_disk,
1524                });
1525            };
1526
1527        let mut seen_skills: HashSet<String> = HashSet::new();
1528        for (id, sids) in &skill_sessions {
1529            seen_skills.insert(id.clone());
1530            push_row(
1531                GuidanceKind::Skill,
1532                id.clone(),
1533                sids,
1534                skill_slugs_on_disk.contains(id),
1535            );
1536        }
1537        for slug in skill_slugs_on_disk {
1538            if seen_skills.contains(slug) {
1539                continue;
1540            }
1541            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1542        }
1543
1544        let mut seen_rules: HashSet<String> = HashSet::new();
1545        for (id, sids) in &rule_sessions {
1546            seen_rules.insert(id.clone());
1547            push_row(
1548                GuidanceKind::Rule,
1549                id.clone(),
1550                sids,
1551                rule_slugs_on_disk.contains(id),
1552            );
1553        }
1554        for slug in rule_slugs_on_disk {
1555            if seen_rules.contains(slug) {
1556                continue;
1557            }
1558            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1559        }
1560
1561        rows.sort_by(|a, b| {
1562            b.sessions
1563                .cmp(&a.sessions)
1564                .then_with(|| a.kind.cmp(&b.kind))
1565                .then_with(|| a.id.cmp(&b.id))
1566        });
1567
1568        Ok(GuidanceReport {
1569            workspace: workspace.to_string(),
1570            window_start_ms,
1571            window_end_ms,
1572            sessions_in_window: denom,
1573            workspace_avg_cost_per_session_usd,
1574            rows,
1575        })
1576    }
1577
1578    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1579        let mut stmt = self.conn.prepare(
1580            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1581                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1582                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1583                    repo_file_count, repo_total_loc
1584             FROM sessions WHERE id = ?1",
1585        )?;
1586        let mut rows = stmt.query_map(params![id], |row| {
1587            Ok((
1588                row.get::<_, String>(0)?,
1589                row.get::<_, String>(1)?,
1590                row.get::<_, Option<String>>(2)?,
1591                row.get::<_, String>(3)?,
1592                row.get::<_, i64>(4)?,
1593                row.get::<_, Option<i64>>(5)?,
1594                row.get::<_, String>(6)?,
1595                row.get::<_, String>(7)?,
1596                row.get::<_, Option<String>>(8)?,
1597                row.get::<_, Option<String>>(9)?,
1598                row.get::<_, Option<String>>(10)?,
1599                row.get::<_, Option<i64>>(11)?,
1600                row.get::<_, Option<i64>>(12)?,
1601                row.get::<_, String>(13)?,
1602                row.get::<_, Option<String>>(14)?,
1603                row.get::<_, Option<String>>(15)?,
1604                row.get::<_, Option<String>>(16)?,
1605                row.get::<_, Option<String>>(17)?,
1606                row.get::<_, Option<String>>(18)?,
1607                row.get::<_, Option<i64>>(19)?,
1608                row.get::<_, Option<i64>>(20)?,
1609            ))
1610        })?;
1611
1612        if let Some(row) = rows.next() {
1613            let (
1614                id,
1615                agent,
1616                model,
1617                workspace,
1618                started,
1619                ended,
1620                status_str,
1621                trace,
1622                start_commit,
1623                end_commit,
1624                branch,
1625                dirty_start,
1626                dirty_end,
1627                source,
1628                prompt_fingerprint,
1629                parent_session_id,
1630                agent_version,
1631                os,
1632                arch,
1633                repo_file_count,
1634                repo_total_loc,
1635            ) = row?;
1636            Ok(Some(SessionRecord {
1637                id,
1638                agent,
1639                model,
1640                workspace,
1641                started_at_ms: started as u64,
1642                ended_at_ms: ended.map(|v| v as u64),
1643                status: status_from_str(&status_str),
1644                trace_path: trace,
1645                start_commit,
1646                end_commit,
1647                branch,
1648                dirty_start: dirty_start.map(i64_to_bool),
1649                dirty_end: dirty_end.map(i64_to_bool),
1650                repo_binding_source: empty_to_none(source),
1651                prompt_fingerprint,
1652                parent_session_id,
1653                agent_version,
1654                os,
1655                arch,
1656                repo_file_count: repo_file_count.map(|v| v as u32),
1657                repo_total_loc: repo_total_loc.map(|v| v as u64),
1658            }))
1659        } else {
1660            Ok(None)
1661        }
1662    }
1663
1664    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1665        let mut stmt = self.conn.prepare(
1666            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1667                    indexed_at_ms, dirty, graph_path
1668             FROM repo_snapshots WHERE workspace = ?1
1669             ORDER BY indexed_at_ms DESC LIMIT 1",
1670        )?;
1671        let mut rows = stmt.query_map(params![workspace], |row| {
1672            Ok(RepoSnapshotRecord {
1673                id: row.get(0)?,
1674                workspace: row.get(1)?,
1675                head_commit: row.get(2)?,
1676                dirty_fingerprint: row.get(3)?,
1677                analyzer_version: row.get(4)?,
1678                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1679                dirty: row.get::<_, i64>(6)? != 0,
1680                graph_path: row.get(7)?,
1681            })
1682        })?;
1683        Ok(rows.next().transpose()?)
1684    }
1685
1686    pub fn save_repo_snapshot(
1687        &self,
1688        snapshot: &RepoSnapshotRecord,
1689        facts: &[FileFact],
1690        edges: &[RepoEdge],
1691    ) -> Result<()> {
1692        self.conn.execute(
1693            "INSERT INTO repo_snapshots (
1694                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1695                indexed_at_ms, dirty, graph_path
1696             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1697             ON CONFLICT(id) DO UPDATE SET
1698                workspace=excluded.workspace,
1699                head_commit=excluded.head_commit,
1700                dirty_fingerprint=excluded.dirty_fingerprint,
1701                analyzer_version=excluded.analyzer_version,
1702                indexed_at_ms=excluded.indexed_at_ms,
1703                dirty=excluded.dirty,
1704                graph_path=excluded.graph_path",
1705            params![
1706                snapshot.id,
1707                snapshot.workspace,
1708                snapshot.head_commit,
1709                snapshot.dirty_fingerprint,
1710                snapshot.analyzer_version,
1711                snapshot.indexed_at_ms as i64,
1712                bool_to_i64(snapshot.dirty),
1713                snapshot.graph_path,
1714            ],
1715        )?;
1716        self.conn.execute(
1717            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1718            params![snapshot.id],
1719        )?;
1720        self.conn.execute(
1721            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1722            params![snapshot.id],
1723        )?;
1724        for fact in facts {
1725            self.conn.execute(
1726                "INSERT INTO file_facts (
1727                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1728                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1729                    churn_30d, churn_90d, authors_90d, last_changed_ms
1730                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1731                params![
1732                    fact.snapshot_id,
1733                    fact.path,
1734                    fact.language,
1735                    fact.bytes as i64,
1736                    fact.loc as i64,
1737                    fact.sloc as i64,
1738                    fact.complexity_total as i64,
1739                    fact.max_fn_complexity as i64,
1740                    fact.symbol_count as i64,
1741                    fact.import_count as i64,
1742                    fact.fan_in as i64,
1743                    fact.fan_out as i64,
1744                    fact.churn_30d as i64,
1745                    fact.churn_90d as i64,
1746                    fact.authors_90d as i64,
1747                    fact.last_changed_ms.map(|v| v as i64),
1748                ],
1749            )?;
1750        }
1751        for edge in edges {
1752            self.conn.execute(
1753                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1754                 VALUES (?1, ?2, ?3, ?4, ?5)
1755                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1756                 DO UPDATE SET weight = weight + excluded.weight",
1757                params![
1758                    snapshot.id,
1759                    edge.from_path,
1760                    edge.to_path,
1761                    edge.kind,
1762                    edge.weight as i64,
1763                ],
1764            )?;
1765        }
1766        Ok(())
1767    }
1768
1769    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1770        let mut stmt = self.conn.prepare(
1771            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1772                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1773                    churn_30d, churn_90d, authors_90d, last_changed_ms
1774             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1775        )?;
1776        let rows = stmt.query_map(params![snapshot_id], |row| {
1777            Ok(FileFact {
1778                snapshot_id: row.get(0)?,
1779                path: row.get(1)?,
1780                language: row.get(2)?,
1781                bytes: row.get::<_, i64>(3)? as u64,
1782                loc: row.get::<_, i64>(4)? as u32,
1783                sloc: row.get::<_, i64>(5)? as u32,
1784                complexity_total: row.get::<_, i64>(6)? as u32,
1785                max_fn_complexity: row.get::<_, i64>(7)? as u32,
1786                symbol_count: row.get::<_, i64>(8)? as u32,
1787                import_count: row.get::<_, i64>(9)? as u32,
1788                fan_in: row.get::<_, i64>(10)? as u32,
1789                fan_out: row.get::<_, i64>(11)? as u32,
1790                churn_30d: row.get::<_, i64>(12)? as u32,
1791                churn_90d: row.get::<_, i64>(13)? as u32,
1792                authors_90d: row.get::<_, i64>(14)? as u32,
1793                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1794            })
1795        })?;
1796        Ok(rows.filter_map(|row| row.ok()).collect())
1797    }
1798
1799    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1800        let mut stmt = self.conn.prepare(
1801            "SELECT from_id, to_id, kind, weight
1802             FROM repo_edges WHERE snapshot_id = ?1
1803             ORDER BY kind, from_id, to_id",
1804        )?;
1805        let rows = stmt.query_map(params![snapshot_id], |row| {
1806            Ok(RepoEdge {
1807                from_path: row.get(0)?,
1808                to_path: row.get(1)?,
1809                kind: row.get(2)?,
1810                weight: row.get::<_, i64>(3)? as u32,
1811            })
1812        })?;
1813        Ok(rows.filter_map(|row| row.ok()).collect())
1814    }
1815
1816    pub fn tool_spans_in_window(
1817        &self,
1818        workspace: &str,
1819        start_ms: u64,
1820        end_ms: u64,
1821    ) -> Result<Vec<ToolSpanView>> {
1822        let mut stmt = self.conn.prepare(
1823            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1824                    reasoning_tokens, cost_usd_e6, paths_json,
1825                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1826             FROM (
1827                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1828                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1829                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1830                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1831                        ts.started_at_ms AS sort_ms
1832                 FROM tool_spans ts
1833                 JOIN sessions s ON s.id = ts.session_id
1834                 WHERE s.workspace = ?1
1835                   AND ts.started_at_ms >= ?2
1836                   AND ts.started_at_ms <= ?3
1837                 UNION ALL
1838                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1839                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1840                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1841                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1842                        ts.ended_at_ms AS sort_ms
1843                 FROM tool_spans ts
1844                 JOIN sessions s ON s.id = ts.session_id
1845                 WHERE s.workspace = ?1
1846                   AND ts.started_at_ms IS NULL
1847                   AND ts.ended_at_ms >= ?2
1848                   AND ts.ended_at_ms <= ?3
1849             )
1850             ORDER BY sort_ms DESC",
1851        )?;
1852        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1853            let paths_json: String = row.get(8)?;
1854            Ok(ToolSpanView {
1855                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1856                tool: row
1857                    .get::<_, Option<String>>(1)?
1858                    .unwrap_or_else(|| "unknown".into()),
1859                status: row.get(2)?,
1860                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1861                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1862                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1863                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1864                cost_usd_e6: row.get(7)?,
1865                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1866                parent_span_id: row.get(9)?,
1867                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1868                subtree_cost_usd_e6: row.get(11)?,
1869                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1870            })
1871        })?;
1872        Ok(rows.filter_map(|row| row.ok()).collect())
1873    }
1874
1875    pub fn session_span_tree(
1876        &self,
1877        session_id: &str,
1878    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
1879        let last_event_seq = self.last_event_seq_for_session(session_id)?;
1880        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
1881            && entry.session_id == session_id
1882            && entry.last_event_seq == last_event_seq
1883        {
1884            return Ok(entry.nodes.clone());
1885        }
1886        let mut stmt = self.conn.prepare(
1887            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1888                    reasoning_tokens, cost_usd_e6, paths_json,
1889                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1890             FROM tool_spans
1891             WHERE session_id = ?1
1892             ORDER BY depth ASC, started_at_ms ASC",
1893        )?;
1894        let rows = stmt.query_map(params![session_id], |row| {
1895            let paths_json: String = row.get(8)?;
1896            Ok(crate::metrics::types::ToolSpanView {
1897                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1898                tool: row
1899                    .get::<_, Option<String>>(1)?
1900                    .unwrap_or_else(|| "unknown".into()),
1901                status: row.get(2)?,
1902                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1903                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1904                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1905                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1906                cost_usd_e6: row.get(7)?,
1907                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1908                parent_span_id: row.get(9)?,
1909                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1910                subtree_cost_usd_e6: row.get(11)?,
1911                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1912            })
1913        })?;
1914        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1915        let nodes = crate::store::span_tree::build_tree(spans);
1916        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
1917            session_id: session_id.to_string(),
1918            last_event_seq,
1919            nodes: nodes.clone(),
1920        });
1921        Ok(nodes)
1922    }
1923
1924    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
1925        let seq = self
1926            .conn
1927            .query_row(
1928                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
1929                params![session_id],
1930                |r| r.get::<_, Option<i64>>(0),
1931            )?
1932            .map(|v| v as u64);
1933        Ok(seq)
1934    }
1935
1936    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1937        let mut stmt = self.conn.prepare(
1938            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1939                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1940             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1941        )?;
1942        let rows = stmt.query_map(params![session_id], |row| {
1943            let paths_json: String = row.get(12)?;
1944            Ok(ToolSpanSyncRow {
1945                span_id: row.get(0)?,
1946                session_id: row.get(1)?,
1947                tool: row.get(2)?,
1948                tool_call_id: row.get(3)?,
1949                status: row.get(4)?,
1950                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1951                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1952                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1953                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1954                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1955                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1956                cost_usd_e6: row.get(11)?,
1957                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1958            })
1959        })?;
1960        Ok(rows.filter_map(|row| row.ok()).collect())
1961    }
1962
1963    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1964        self.conn.execute(
1965            "INSERT OR REPLACE INTO session_evals
1966             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1967             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1968            rusqlite::params![
1969                eval.id,
1970                eval.session_id,
1971                eval.judge_model,
1972                eval.rubric_id,
1973                eval.score,
1974                eval.rationale,
1975                eval.flagged as i64,
1976                eval.created_at_ms as i64,
1977            ],
1978        )?;
1979        Ok(())
1980    }
1981
1982    pub fn list_evals_in_window(
1983        &self,
1984        start_ms: u64,
1985        end_ms: u64,
1986    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1987        let mut stmt = self.conn.prepare(
1988            "SELECT id, session_id, judge_model, rubric_id, score,
1989                    rationale, flagged, created_at_ms
1990             FROM session_evals
1991             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1992             ORDER BY created_at_ms ASC",
1993        )?;
1994        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1995            Ok(crate::eval::types::EvalRow {
1996                id: r.get(0)?,
1997                session_id: r.get(1)?,
1998                judge_model: r.get(2)?,
1999                rubric_id: r.get(3)?,
2000                score: r.get(4)?,
2001                rationale: r.get(5)?,
2002                flagged: r.get::<_, i64>(6)? != 0,
2003                created_at_ms: r.get::<_, i64>(7)? as u64,
2004            })
2005        })?;
2006        rows.collect()
2007    }
2008
2009    pub fn list_evals_for_session(
2010        &self,
2011        session_id: &str,
2012    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2013        let mut stmt = self.conn.prepare(
2014            "SELECT id, session_id, judge_model, rubric_id, score,
2015                    rationale, flagged, created_at_ms
2016             FROM session_evals
2017             WHERE session_id = ?1
2018             ORDER BY created_at_ms DESC",
2019        )?;
2020        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2021            Ok(crate::eval::types::EvalRow {
2022                id: r.get(0)?,
2023                session_id: r.get(1)?,
2024                judge_model: r.get(2)?,
2025                rubric_id: r.get(3)?,
2026                score: r.get(4)?,
2027                rationale: r.get(5)?,
2028                flagged: r.get::<_, i64>(6)? != 0,
2029                created_at_ms: r.get::<_, i64>(7)? as u64,
2030            })
2031        })?;
2032        rows.collect()
2033    }
2034
2035    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2036        use crate::feedback::types::FeedbackLabel;
2037        self.conn.execute(
2038            "INSERT OR REPLACE INTO session_feedback
2039             (id, session_id, score, label, note, created_at_ms)
2040             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2041            rusqlite::params![
2042                r.id,
2043                r.session_id,
2044                r.score.as_ref().map(|s| s.0 as i64),
2045                r.label.as_ref().map(FeedbackLabel::to_db_str),
2046                r.note,
2047                r.created_at_ms as i64,
2048            ],
2049        )?;
2050        let payload = serde_json::to_string(r).unwrap_or_default();
2051        self.conn.execute(
2052            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2053             VALUES (?1, 'session_feedback', ?2, 0)",
2054            rusqlite::params![r.session_id, payload],
2055        )?;
2056        Ok(())
2057    }
2058
2059    pub fn list_feedback_in_window(
2060        &self,
2061        start_ms: u64,
2062        end_ms: u64,
2063    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2064        let mut stmt = self.conn.prepare(
2065            "SELECT id, session_id, score, label, note, created_at_ms
2066             FROM session_feedback
2067             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2068             ORDER BY created_at_ms ASC",
2069        )?;
2070        let rows = stmt.query_map(
2071            rusqlite::params![start_ms as i64, end_ms as i64],
2072            feedback_row,
2073        )?;
2074        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2075    }
2076
2077    pub fn feedback_for_sessions(
2078        &self,
2079        ids: &[String],
2080    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2081        if ids.is_empty() {
2082            return Ok(std::collections::HashMap::new());
2083        }
2084        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2085        let sql = format!(
2086            "SELECT id, session_id, score, label, note, created_at_ms
2087             FROM session_feedback WHERE session_id IN ({placeholders})
2088             ORDER BY created_at_ms DESC"
2089        );
2090        let mut stmt = self.conn.prepare(&sql)?;
2091        let params: Vec<&dyn rusqlite::ToSql> =
2092            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2093        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2094        let mut map = std::collections::HashMap::new();
2095        for row in rows {
2096            let r = row?;
2097            map.entry(r.session_id.clone()).or_insert(r);
2098        }
2099        Ok(map)
2100    }
2101
2102    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2103        self.conn.execute(
2104            "INSERT INTO session_outcomes (
2105                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2106                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2107            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2108            ON CONFLICT(session_id) DO UPDATE SET
2109                test_passed=excluded.test_passed,
2110                test_failed=excluded.test_failed,
2111                test_skipped=excluded.test_skipped,
2112                build_ok=excluded.build_ok,
2113                lint_errors=excluded.lint_errors,
2114                revert_lines_14d=excluded.revert_lines_14d,
2115                pr_open=excluded.pr_open,
2116                ci_ok=excluded.ci_ok,
2117                measured_at_ms=excluded.measured_at_ms,
2118                measure_error=excluded.measure_error",
2119            params![
2120                row.session_id,
2121                row.test_passed,
2122                row.test_failed,
2123                row.test_skipped,
2124                row.build_ok.map(bool_to_i64),
2125                row.lint_errors,
2126                row.revert_lines_14d,
2127                row.pr_open,
2128                row.ci_ok.map(bool_to_i64),
2129                row.measured_at_ms as i64,
2130                row.measure_error.as_deref(),
2131            ],
2132        )?;
2133        Ok(())
2134    }
2135
2136    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2137        let mut stmt = self.conn.prepare(
2138            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2139                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2140             FROM session_outcomes WHERE session_id = ?1",
2141        )?;
2142        let row = stmt
2143            .query_row(params![session_id], outcome_row)
2144            .optional()?;
2145        Ok(row)
2146    }
2147
2148    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2149    pub fn list_session_outcomes_in_window(
2150        &self,
2151        workspace: &str,
2152        start_ms: u64,
2153        end_ms: u64,
2154    ) -> Result<Vec<SessionOutcomeRow>> {
2155        let mut stmt = self.conn.prepare(
2156            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2157                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2158             FROM session_outcomes o
2159             JOIN sessions s ON s.id = o.session_id
2160             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2161             ORDER BY o.measured_at_ms ASC",
2162        )?;
2163        let rows = stmt.query_map(
2164            params![workspace, start_ms as i64, end_ms as i64],
2165            outcome_row,
2166        )?;
2167        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2168    }
2169
2170    pub fn append_session_sample(
2171        &self,
2172        session_id: &str,
2173        ts_ms: u64,
2174        pid: u32,
2175        cpu_percent: Option<f64>,
2176        rss_bytes: Option<u64>,
2177    ) -> Result<()> {
2178        self.conn.execute(
2179            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2180             VALUES (?1, ?2, ?3, ?4, ?5)",
2181            params![
2182                session_id,
2183                ts_ms as i64,
2184                pid as i64,
2185                cpu_percent,
2186                rss_bytes.map(|b| b as i64)
2187            ],
2188        )?;
2189        Ok(())
2190    }
2191
2192    /// Per-session maxima for retro heuristics.
2193    pub fn list_session_sample_aggs_in_window(
2194        &self,
2195        workspace: &str,
2196        start_ms: u64,
2197        end_ms: u64,
2198    ) -> Result<Vec<SessionSampleAgg>> {
2199        let mut stmt = self.conn.prepare(
2200            "SELECT ss.session_id, COUNT(*) AS n,
2201                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2202             FROM session_samples ss
2203             JOIN sessions s ON s.id = ss.session_id
2204             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2205             GROUP BY ss.session_id",
2206        )?;
2207        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2208            let sid: String = r.get(0)?;
2209            let n: i64 = r.get(1)?;
2210            let max_cpu: Option<f64> = r.get(2)?;
2211            let max_rss: Option<i64> = r.get(3)?;
2212            Ok(SessionSampleAgg {
2213                session_id: sid,
2214                sample_count: n as u64,
2215                max_cpu_percent: max_cpu.unwrap_or(0.0),
2216                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2217            })
2218        })?;
2219        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2220    }
2221
2222    pub fn list_sessions_for_eval(
2223        &self,
2224        since_ms: u64,
2225        min_cost_usd: f64,
2226    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2227        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2228        let mut stmt = self.conn.prepare(
2229            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2230                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2231                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2232                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2233             FROM sessions s
2234             WHERE s.started_at_ms >= ?1
2235               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2236               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2237             ORDER BY s.started_at_ms DESC",
2238        )?;
2239        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2240            Ok((
2241                r.get::<_, String>(0)?,
2242                r.get::<_, String>(1)?,
2243                r.get::<_, Option<String>>(2)?,
2244                r.get::<_, String>(3)?,
2245                r.get::<_, i64>(4)?,
2246                r.get::<_, Option<i64>>(5)?,
2247                r.get::<_, String>(6)?,
2248                r.get::<_, String>(7)?,
2249                r.get::<_, Option<String>>(8)?,
2250                r.get::<_, Option<String>>(9)?,
2251                r.get::<_, Option<String>>(10)?,
2252                r.get::<_, Option<i64>>(11)?,
2253                r.get::<_, Option<i64>>(12)?,
2254                r.get::<_, Option<String>>(13)?,
2255                r.get::<_, Option<String>>(14)?,
2256                r.get::<_, Option<String>>(15)?,
2257                r.get::<_, Option<String>>(16)?,
2258                r.get::<_, Option<String>>(17)?,
2259                r.get::<_, Option<String>>(18)?,
2260                r.get::<_, Option<i64>>(19)?,
2261                r.get::<_, Option<i64>>(20)?,
2262            ))
2263        })?;
2264        let mut out = Vec::new();
2265        for row in rows {
2266            let (
2267                id,
2268                agent,
2269                model,
2270                workspace,
2271                started,
2272                ended,
2273                status_str,
2274                trace,
2275                start_commit,
2276                end_commit,
2277                branch,
2278                dirty_start,
2279                dirty_end,
2280                source,
2281                prompt_fingerprint,
2282                parent_session_id,
2283                agent_version,
2284                os,
2285                arch,
2286                repo_file_count,
2287                repo_total_loc,
2288            ) = row?;
2289            out.push(crate::core::event::SessionRecord {
2290                id,
2291                agent,
2292                model,
2293                workspace,
2294                started_at_ms: started as u64,
2295                ended_at_ms: ended.map(|v| v as u64),
2296                status: status_from_str(&status_str),
2297                trace_path: trace,
2298                start_commit,
2299                end_commit,
2300                branch,
2301                dirty_start: dirty_start.map(i64_to_bool),
2302                dirty_end: dirty_end.map(i64_to_bool),
2303                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2304                prompt_fingerprint,
2305                parent_session_id,
2306                agent_version,
2307                os,
2308                arch,
2309                repo_file_count: repo_file_count.map(|v| v as u32),
2310                repo_total_loc: repo_total_loc.map(|v| v as u64),
2311            });
2312        }
2313        Ok(out)
2314    }
2315
2316    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2317        self.conn.execute(
2318            "INSERT OR IGNORE INTO prompt_snapshots
2319             (fingerprint, captured_at_ms, files_json, total_bytes)
2320             VALUES (?1, ?2, ?3, ?4)",
2321            params![
2322                snap.fingerprint,
2323                snap.captured_at_ms as i64,
2324                snap.files_json,
2325                snap.total_bytes as i64
2326            ],
2327        )?;
2328        Ok(())
2329    }
2330
2331    pub fn get_prompt_snapshot(
2332        &self,
2333        fingerprint: &str,
2334    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2335        self.conn
2336            .query_row(
2337                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2338                 FROM prompt_snapshots WHERE fingerprint = ?1",
2339                params![fingerprint],
2340                |r| {
2341                    Ok(crate::prompt::PromptSnapshot {
2342                        fingerprint: r.get(0)?,
2343                        captured_at_ms: r.get::<_, i64>(1)? as u64,
2344                        files_json: r.get(2)?,
2345                        total_bytes: r.get::<_, i64>(3)? as u64,
2346                    })
2347                },
2348            )
2349            .optional()
2350            .map_err(Into::into)
2351    }
2352
2353    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2354        let mut stmt = self.conn.prepare(
2355            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2356             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2357        )?;
2358        let rows = stmt.query_map([], |r| {
2359            Ok(crate::prompt::PromptSnapshot {
2360                fingerprint: r.get(0)?,
2361                captured_at_ms: r.get::<_, i64>(1)? as u64,
2362                files_json: r.get(2)?,
2363                total_bytes: r.get::<_, i64>(3)? as u64,
2364            })
2365        })?;
2366        Ok(rows.filter_map(|r| r.ok()).collect())
2367    }
2368
2369    /// Sessions with a non-null prompt_fingerprint in the given window.
2370    pub fn sessions_with_prompt_fingerprint(
2371        &self,
2372        workspace: &str,
2373        start_ms: u64,
2374        end_ms: u64,
2375    ) -> Result<Vec<(String, String)>> {
2376        let mut stmt = self.conn.prepare(
2377            "SELECT id, prompt_fingerprint FROM sessions
2378             WHERE workspace = ?1
2379               AND started_at_ms >= ?2 AND started_at_ms < ?3
2380               AND prompt_fingerprint IS NOT NULL",
2381        )?;
2382        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2383            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2384        })?;
2385        Ok(rows.filter_map(|r| r.ok()).collect())
2386    }
2387}
2388
2389fn now_ms() -> u64 {
2390    std::time::SystemTime::now()
2391        .duration_since(std::time::UNIX_EPOCH)
2392        .unwrap_or_default()
2393        .as_millis() as u64
2394}
2395
2396fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2397    raw.and_then(|s| s.trim().parse::<u64>().ok())
2398        .unwrap_or(DEFAULT_MMAP_MB)
2399        .saturating_mul(1024)
2400        .saturating_mul(1024)
2401        .min(i64::MAX as u64) as i64
2402}
2403
2404fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2405    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2406    conn.execute_batch(&format!(
2407        "
2408        PRAGMA journal_mode=WAL;
2409        PRAGMA busy_timeout=5000;
2410        PRAGMA synchronous=NORMAL;
2411        PRAGMA cache_size=-65536;
2412        PRAGMA mmap_size={mmap_size};
2413        PRAGMA temp_store=MEMORY;
2414        PRAGMA wal_autocheckpoint=1000;
2415        "
2416    ))?;
2417    if mode == StoreOpenMode::ReadOnlyQuery {
2418        conn.execute_batch("PRAGMA query_only=ON;")?;
2419    }
2420    Ok(())
2421}
2422
2423fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2424    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2425}
2426
2427fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2428    let status_str: String = row.get(6)?;
2429    Ok(SessionRecord {
2430        id: row.get(0)?,
2431        agent: row.get(1)?,
2432        model: row.get(2)?,
2433        workspace: row.get(3)?,
2434        started_at_ms: row.get::<_, i64>(4)? as u64,
2435        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2436        status: status_from_str(&status_str),
2437        trace_path: row.get(7)?,
2438        start_commit: row.get(8)?,
2439        end_commit: row.get(9)?,
2440        branch: row.get(10)?,
2441        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2442        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2443        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2444        prompt_fingerprint: row.get(14)?,
2445        parent_session_id: row.get(15)?,
2446        agent_version: row.get(16)?,
2447        os: row.get(17)?,
2448        arch: row.get(18)?,
2449        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2450        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2451    })
2452}
2453
2454fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2455    let payload_str: String = row.get(12)?;
2456    Ok(Event {
2457        session_id: row.get(0)?,
2458        seq: row.get::<_, i64>(1)? as u64,
2459        ts_ms: row.get::<_, i64>(2)? as u64,
2460        ts_exact: row.get::<_, i64>(3)? != 0,
2461        kind: kind_from_str(&row.get::<_, String>(4)?),
2462        source: source_from_str(&row.get::<_, String>(5)?),
2463        tool: row.get(6)?,
2464        tool_call_id: row.get(7)?,
2465        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2466        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2467        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2468        cost_usd_e6: row.get(11)?,
2469        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2470        stop_reason: row.get(13)?,
2471        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2472        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2473        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2474        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2475        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2476        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2477        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2478        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2479    })
2480}
2481
2482fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2483    let mut clauses = vec!["workspace = ?".to_string()];
2484    let mut args = vec![Value::Text(workspace.to_string())];
2485    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2486        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2487        args.push(Value::Text(format!("{}%", escape_like(prefix))));
2488    }
2489    if let Some(status) = &filter.status {
2490        clauses.push("status = ?".to_string());
2491        args.push(Value::Text(format!("{status:?}")));
2492    }
2493    if let Some(since_ms) = filter.since_ms {
2494        clauses.push("started_at_ms >= ?".to_string());
2495        args.push(Value::Integer(since_ms as i64));
2496    }
2497    (format!("WHERE {}", clauses.join(" AND ")), args)
2498}
2499
2500fn escape_like(raw: &str) -> String {
2501    raw.to_lowercase()
2502        .replace('\\', "\\\\")
2503        .replace('%', "\\%")
2504        .replace('_', "\\_")
2505}
2506
2507fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2508    let cost: i64 = conn.query_row(
2509        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2510        params![workspace], |r| r.get(0),
2511    )?;
2512    let with_cost: i64 = conn.query_row(
2513        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2514        params![workspace], |r| r.get(0),
2515    )?;
2516    Ok((cost, with_cost as u64))
2517}
2518
2519fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2520    let build_raw: Option<i64> = r.get(4)?;
2521    let ci_raw: Option<i64> = r.get(8)?;
2522    Ok(SessionOutcomeRow {
2523        session_id: r.get(0)?,
2524        test_passed: r.get(1)?,
2525        test_failed: r.get(2)?,
2526        test_skipped: r.get(3)?,
2527        build_ok: build_raw.map(|v| v != 0),
2528        lint_errors: r.get(5)?,
2529        revert_lines_14d: r.get(6)?,
2530        pr_open: r.get(7)?,
2531        ci_ok: ci_raw.map(|v| v != 0),
2532        measured_at_ms: r.get::<_, i64>(9)? as u64,
2533        measure_error: r.get(10)?,
2534    })
2535}
2536
2537fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2538    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2539    let score = r
2540        .get::<_, Option<i64>>(2)?
2541        .and_then(|v| FeedbackScore::new(v as u8));
2542    let label = r
2543        .get::<_, Option<String>>(3)?
2544        .and_then(|s| FeedbackLabel::from_str_opt(&s));
2545    Ok(FeedbackRecord {
2546        id: r.get(0)?,
2547        session_id: r.get(1)?,
2548        score,
2549        label,
2550        note: r.get(4)?,
2551        created_at_ms: r.get::<_, i64>(5)? as u64,
2552    })
2553}
2554
2555fn day_label(day_idx: u64) -> &'static str {
2556    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2557}
2558
2559fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2560    let week_ago = now.saturating_sub(7 * 86_400_000);
2561    let mut stmt = conn
2562        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2563    let days: Vec<u64> = stmt
2564        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2565        .filter_map(|r| r.ok())
2566        .map(|v| v as u64 / 86_400_000)
2567        .collect();
2568    let today = now / 86_400_000;
2569    Ok((0u64..7)
2570        .map(|i| {
2571            let d = today.saturating_sub(6 - i);
2572            (
2573                day_label(d).to_string(),
2574                days.iter().filter(|&&x| x == d).count() as u64,
2575            )
2576        })
2577        .collect())
2578}
2579
2580fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2581    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2582               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2583               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2584               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2585               COUNT(e.id) FROM sessions s \
2586               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2587               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2588    let mut stmt = conn.prepare(sql)?;
2589    let out: Vec<(SessionRecord, u64)> = stmt
2590        .query_map(params![workspace], |r| {
2591            let st: String = r.get(6)?;
2592            Ok((
2593                SessionRecord {
2594                    id: r.get(0)?,
2595                    agent: r.get(1)?,
2596                    model: r.get(2)?,
2597                    workspace: r.get(3)?,
2598                    started_at_ms: r.get::<_, i64>(4)? as u64,
2599                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2600                    status: status_from_str(&st),
2601                    trace_path: r.get(7)?,
2602                    start_commit: r.get(8)?,
2603                    end_commit: r.get(9)?,
2604                    branch: r.get(10)?,
2605                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2606                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2607                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2608                    prompt_fingerprint: r.get(14)?,
2609                    parent_session_id: r.get(15)?,
2610                    agent_version: r.get(16)?,
2611                    os: r.get(17)?,
2612                    arch: r.get(18)?,
2613                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2614                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2615                },
2616                r.get::<_, i64>(21)? as u64,
2617            ))
2618        })?
2619        .filter_map(|r| r.ok())
2620        .collect();
2621    Ok(out)
2622}
2623
2624fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2625    let mut stmt = conn.prepare(
2626        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2627         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2628    )?;
2629    let out: Vec<(String, u64)> = stmt
2630        .query_map(params![workspace], |r| {
2631            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2632        })?
2633        .filter_map(|r| r.ok())
2634        .collect();
2635    Ok(out)
2636}
2637
2638fn status_from_str(s: &str) -> SessionStatus {
2639    match s {
2640        "Running" => SessionStatus::Running,
2641        "Waiting" => SessionStatus::Waiting,
2642        "Idle" => SessionStatus::Idle,
2643        _ => SessionStatus::Done,
2644    }
2645}
2646
2647fn kind_from_str(s: &str) -> EventKind {
2648    match s {
2649        "ToolCall" => EventKind::ToolCall,
2650        "ToolResult" => EventKind::ToolResult,
2651        "Message" => EventKind::Message,
2652        "Error" => EventKind::Error,
2653        "Cost" => EventKind::Cost,
2654        "Hook" => EventKind::Hook,
2655        "Lifecycle" => EventKind::Lifecycle,
2656        _ => EventKind::Hook,
2657    }
2658}
2659
2660fn source_from_str(s: &str) -> EventSource {
2661    match s {
2662        "Tail" => EventSource::Tail,
2663        "Hook" => EventSource::Hook,
2664        _ => EventSource::Proxy,
2665    }
2666}
2667
2668fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2669    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2670    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2671    ensure_column(conn, "sessions", "branch", "TEXT")?;
2672    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2673    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2674    ensure_column(
2675        conn,
2676        "sessions",
2677        "repo_binding_source",
2678        "TEXT NOT NULL DEFAULT ''",
2679    )?;
2680    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2681    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2682    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2683    ensure_column(conn, "events", "stop_reason", "TEXT")?;
2684    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2685    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2686    ensure_column(conn, "events", "retry_count", "INTEGER")?;
2687    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2688    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2689    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2690    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2691    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2692    ensure_column(
2693        conn,
2694        "sync_outbox",
2695        "kind",
2696        "TEXT NOT NULL DEFAULT 'events'",
2697    )?;
2698    ensure_column(
2699        conn,
2700        "experiments",
2701        "state",
2702        "TEXT NOT NULL DEFAULT 'Draft'",
2703    )?;
2704    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2705    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2706    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2707    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2708    ensure_column(conn, "sessions", "os", "TEXT")?;
2709    ensure_column(conn, "sessions", "arch", "TEXT")?;
2710    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2711    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2712    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2713    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2714    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2715    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2716    conn.execute_batch(
2717        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2718         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2719    )?;
2720    Ok(())
2721}
2722
2723fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2724    if has_column(conn, table, column)? {
2725        return Ok(());
2726    }
2727    conn.execute(
2728        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2729        [],
2730    )?;
2731    Ok(())
2732}
2733
2734fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2735    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2736    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2737    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2738}
2739
2740fn bool_to_i64(v: bool) -> i64 {
2741    if v { 1 } else { 0 }
2742}
2743
2744fn i64_to_bool(v: i64) -> bool {
2745    v != 0
2746}
2747
2748fn empty_to_none(s: String) -> Option<String> {
2749    if s.is_empty() { None } else { Some(s) }
2750}
2751
2752#[cfg(test)]
2753mod tests {
2754    use super::*;
2755    use serde_json::json;
2756    use std::collections::HashSet;
2757    use tempfile::TempDir;
2758
2759    fn make_session(id: &str) -> SessionRecord {
2760        SessionRecord {
2761            id: id.to_string(),
2762            agent: "cursor".to_string(),
2763            model: None,
2764            workspace: "/ws".to_string(),
2765            started_at_ms: 1000,
2766            ended_at_ms: None,
2767            status: SessionStatus::Done,
2768            trace_path: "/trace".to_string(),
2769            start_commit: None,
2770            end_commit: None,
2771            branch: None,
2772            dirty_start: None,
2773            dirty_end: None,
2774            repo_binding_source: None,
2775            prompt_fingerprint: None,
2776            parent_session_id: None,
2777            agent_version: None,
2778            os: None,
2779            arch: None,
2780            repo_file_count: None,
2781            repo_total_loc: None,
2782        }
2783    }
2784
2785    fn make_event(session_id: &str, seq: u64) -> Event {
2786        Event {
2787            session_id: session_id.to_string(),
2788            seq,
2789            ts_ms: 1000 + seq * 100,
2790            ts_exact: false,
2791            kind: EventKind::ToolCall,
2792            source: EventSource::Tail,
2793            tool: Some("read_file".to_string()),
2794            tool_call_id: Some(format!("call_{seq}")),
2795            tokens_in: None,
2796            tokens_out: None,
2797            reasoning_tokens: None,
2798            cost_usd_e6: None,
2799            stop_reason: None,
2800            latency_ms: None,
2801            ttft_ms: None,
2802            retry_count: None,
2803            context_used_tokens: None,
2804            context_max_tokens: None,
2805            cache_creation_tokens: None,
2806            cache_read_tokens: None,
2807            system_prompt_tokens: None,
2808            payload: json!({}),
2809        }
2810    }
2811
2812    #[test]
2813    fn open_and_wal_mode() {
2814        let dir = TempDir::new().unwrap();
2815        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2816        let mode: String = store
2817            .conn
2818            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2819            .unwrap();
2820        assert_eq!(mode, "wal");
2821    }
2822
2823    #[test]
2824    fn open_applies_phase0_pragmas() {
2825        let dir = TempDir::new().unwrap();
2826        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2827        let synchronous: i64 = store
2828            .conn
2829            .query_row("PRAGMA synchronous", [], |r| r.get(0))
2830            .unwrap();
2831        let cache_size: i64 = store
2832            .conn
2833            .query_row("PRAGMA cache_size", [], |r| r.get(0))
2834            .unwrap();
2835        let temp_store: i64 = store
2836            .conn
2837            .query_row("PRAGMA temp_store", [], |r| r.get(0))
2838            .unwrap();
2839        let wal_autocheckpoint: i64 = store
2840            .conn
2841            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
2842            .unwrap();
2843        assert_eq!(synchronous, 1);
2844        assert_eq!(cache_size, -65_536);
2845        assert_eq!(temp_store, 2);
2846        assert_eq!(wal_autocheckpoint, 1_000);
2847        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
2848    }
2849
2850    #[test]
2851    fn read_only_open_sets_query_only() {
2852        let dir = TempDir::new().unwrap();
2853        let db = dir.path().join("kaizen.db");
2854        Store::open(&db).unwrap();
2855        let store = Store::open_read_only(&db).unwrap();
2856        let query_only: i64 = store
2857            .conn
2858            .query_row("PRAGMA query_only", [], |r| r.get(0))
2859            .unwrap();
2860        assert_eq!(query_only, 1);
2861    }
2862
2863    #[test]
2864    fn phase0_indexes_exist() {
2865        let dir = TempDir::new().unwrap();
2866        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2867        for name in [
2868            "tool_spans_session_idx",
2869            "tool_spans_started_idx",
2870            "session_samples_ts_idx",
2871            "events_ts_idx",
2872            "feedback_session_idx",
2873        ] {
2874            let found: i64 = store
2875                .conn
2876                .query_row(
2877                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
2878                    params![name],
2879                    |r| r.get(0),
2880                )
2881                .unwrap();
2882            assert_eq!(found, 1, "{name}");
2883        }
2884    }
2885
2886    #[test]
2887    fn upsert_and_get_session() {
2888        let dir = TempDir::new().unwrap();
2889        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2890        let s = make_session("s1");
2891        store.upsert_session(&s).unwrap();
2892
2893        let got = store.get_session("s1").unwrap().unwrap();
2894        assert_eq!(got.id, "s1");
2895        assert_eq!(got.status, SessionStatus::Done);
2896    }
2897
2898    #[test]
2899    fn append_and_list_events_round_trip() {
2900        let dir = TempDir::new().unwrap();
2901        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2902        let s = make_session("s2");
2903        store.upsert_session(&s).unwrap();
2904        store.append_event(&make_event("s2", 0)).unwrap();
2905        store.append_event(&make_event("s2", 1)).unwrap();
2906
2907        let sessions = store.list_sessions("/ws").unwrap();
2908        assert_eq!(sessions.len(), 1);
2909        assert_eq!(sessions[0].id, "s2");
2910    }
2911
2912    #[test]
2913    fn list_sessions_page_orders_and_counts() {
2914        let dir = TempDir::new().unwrap();
2915        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2916        let mut a = make_session("a");
2917        a.started_at_ms = 2_000;
2918        let mut b = make_session("b");
2919        b.started_at_ms = 2_000;
2920        let mut c = make_session("c");
2921        c.started_at_ms = 1_000;
2922        store.upsert_session(&c).unwrap();
2923        store.upsert_session(&b).unwrap();
2924        store.upsert_session(&a).unwrap();
2925
2926        let page = store
2927            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
2928            .unwrap();
2929        assert_eq!(page.total, 3);
2930        assert_eq!(page.next_offset, Some(2));
2931        assert_eq!(
2932            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
2933            vec!["a", "b"]
2934        );
2935
2936        let all = store.list_sessions("/ws").unwrap();
2937        assert_eq!(
2938            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
2939            vec!["a", "b", "c"]
2940        );
2941    }
2942
2943    #[test]
2944    fn list_sessions_page_filters_in_sql_shape() {
2945        let dir = TempDir::new().unwrap();
2946        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2947        let mut cursor = make_session("cursor");
2948        cursor.agent = "Cursor".into();
2949        cursor.started_at_ms = 2_000;
2950        cursor.status = SessionStatus::Running;
2951        let mut claude = make_session("claude");
2952        claude.agent = "claude".into();
2953        claude.started_at_ms = 3_000;
2954        store.upsert_session(&cursor).unwrap();
2955        store.upsert_session(&claude).unwrap();
2956
2957        let page = store
2958            .list_sessions_page(
2959                "/ws",
2960                0,
2961                10,
2962                SessionFilter {
2963                    agent_prefix: Some("cur".into()),
2964                    status: Some(SessionStatus::Running),
2965                    since_ms: Some(1_500),
2966                },
2967            )
2968            .unwrap();
2969        assert_eq!(page.total, 1);
2970        assert_eq!(page.rows[0].id, "cursor");
2971    }
2972
2973    #[test]
2974    fn incremental_session_helpers_find_new_rows_and_statuses() {
2975        let dir = TempDir::new().unwrap();
2976        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2977        let mut old = make_session("old");
2978        old.started_at_ms = 1_000;
2979        let mut new = make_session("new");
2980        new.started_at_ms = 2_000;
2981        new.status = SessionStatus::Running;
2982        store.upsert_session(&old).unwrap();
2983        store.upsert_session(&new).unwrap();
2984
2985        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
2986        assert_eq!(rows.len(), 1);
2987        assert_eq!(rows[0].id, "new");
2988
2989        store
2990            .update_session_status("new", SessionStatus::Done)
2991            .unwrap();
2992        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
2993        assert_eq!(statuses.len(), 1);
2994        assert_eq!(statuses[0].status, SessionStatus::Done);
2995    }
2996
2997    #[test]
2998    fn summary_stats_empty() {
2999        let dir = TempDir::new().unwrap();
3000        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3001        let stats = store.summary_stats("/ws").unwrap();
3002        assert_eq!(stats.session_count, 0);
3003        assert_eq!(stats.total_cost_usd_e6, 0);
3004    }
3005
3006    #[test]
3007    fn summary_stats_counts_sessions() {
3008        let dir = TempDir::new().unwrap();
3009        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3010        store.upsert_session(&make_session("a")).unwrap();
3011        store.upsert_session(&make_session("b")).unwrap();
3012        let stats = store.summary_stats("/ws").unwrap();
3013        assert_eq!(stats.session_count, 2);
3014        assert_eq!(stats.by_agent.len(), 1);
3015        assert_eq!(stats.by_agent[0].0, "cursor");
3016        assert_eq!(stats.by_agent[0].1, 2);
3017    }
3018
3019    #[test]
3020    fn list_events_for_session_round_trip() {
3021        let dir = TempDir::new().unwrap();
3022        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3023        store.upsert_session(&make_session("s4")).unwrap();
3024        store.append_event(&make_event("s4", 0)).unwrap();
3025        store.append_event(&make_event("s4", 1)).unwrap();
3026        let events = store.list_events_for_session("s4").unwrap();
3027        assert_eq!(events.len(), 2);
3028        assert_eq!(events[0].seq, 0);
3029        assert_eq!(events[1].seq, 1);
3030    }
3031
3032    #[test]
3033    fn list_events_page_uses_inclusive_seq_cursor() {
3034        let dir = TempDir::new().unwrap();
3035        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3036        store.upsert_session(&make_session("paged")).unwrap();
3037        for seq in 0..5 {
3038            store.append_event(&make_event("paged", seq)).unwrap();
3039        }
3040        let first = store.list_events_page("paged", 0, 2).unwrap();
3041        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3042        let second = store
3043            .list_events_page("paged", first[1].seq + 1, 2)
3044            .unwrap();
3045        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3046    }
3047
3048    #[test]
3049    fn append_event_dedup() {
3050        let dir = TempDir::new().unwrap();
3051        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3052        store.upsert_session(&make_session("s5")).unwrap();
3053        store.append_event(&make_event("s5", 0)).unwrap();
3054        // Duplicate — should be silently ignored
3055        store.append_event(&make_event("s5", 0)).unwrap();
3056        let events = store.list_events_for_session("s5").unwrap();
3057        assert_eq!(events.len(), 1);
3058    }
3059
3060    #[test]
3061    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3062        let dir = TempDir::new().unwrap();
3063        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3064        assert!(store.session_span_tree("missing").unwrap().is_empty());
3065        assert!(store.span_tree_cache.borrow().is_some());
3066
3067        store.upsert_session(&make_session("tree")).unwrap();
3068        store.append_event(&make_event("tree", 0)).unwrap();
3069        assert!(store.span_tree_cache.borrow().is_none());
3070        let first = store.session_span_tree("tree").unwrap();
3071        assert_eq!(first.len(), 1);
3072        assert!(store.span_tree_cache.borrow().is_some());
3073        store.append_event(&make_event("tree", 1)).unwrap();
3074        assert!(store.span_tree_cache.borrow().is_none());
3075    }
3076
3077    #[test]
3078    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3079        let dir = TempDir::new().unwrap();
3080        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3081        store.upsert_session(&make_session("spans")).unwrap();
3082        for (id, started, ended) in [
3083            ("started", Some(200_i64), None),
3084            ("fallback", None, Some(250_i64)),
3085            ("outside", Some(400_i64), None),
3086            ("too_old", None, Some(50_i64)),
3087            ("started_wins", Some(500_i64), Some(200_i64)),
3088        ] {
3089            store
3090                .conn
3091                .execute(
3092                    "INSERT INTO tool_spans
3093                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3094                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3095                    params![id, started, ended],
3096                )
3097                .unwrap();
3098        }
3099        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3100        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3101        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3102    }
3103
3104    #[test]
3105    fn upsert_idempotent() {
3106        let dir = TempDir::new().unwrap();
3107        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3108        let mut s = make_session("s3");
3109        store.upsert_session(&s).unwrap();
3110        s.status = SessionStatus::Running;
3111        store.upsert_session(&s).unwrap();
3112
3113        let got = store.get_session("s3").unwrap().unwrap();
3114        assert_eq!(got.status, SessionStatus::Running);
3115    }
3116
3117    #[test]
3118    fn append_event_indexes_path_from_payload() {
3119        let dir = TempDir::new().unwrap();
3120        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3121        store.upsert_session(&make_session("sx")).unwrap();
3122        let mut ev = make_event("sx", 0);
3123        ev.payload = json!({"input": {"path": "src/lib.rs"}});
3124        store.append_event(&ev).unwrap();
3125        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3126        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3127    }
3128
3129    #[test]
3130    fn update_session_status_changes_status() {
3131        let dir = TempDir::new().unwrap();
3132        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3133        store.upsert_session(&make_session("s6")).unwrap();
3134        store
3135            .update_session_status("s6", SessionStatus::Running)
3136            .unwrap();
3137        let got = store.get_session("s6").unwrap().unwrap();
3138        assert_eq!(got.status, SessionStatus::Running);
3139    }
3140
3141    #[test]
3142    fn prune_sessions_removes_old_rows_and_keeps_recent() {
3143        let dir = TempDir::new().unwrap();
3144        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3145        let mut old = make_session("old");
3146        old.started_at_ms = 1_000;
3147        let mut new = make_session("new");
3148        new.started_at_ms = 9_000_000_000_000;
3149        store.upsert_session(&old).unwrap();
3150        store.upsert_session(&new).unwrap();
3151        store.append_event(&make_event("old", 0)).unwrap();
3152
3153        let stats = store.prune_sessions_started_before(5_000).unwrap();
3154        assert_eq!(
3155            stats,
3156            PruneStats {
3157                sessions_removed: 1,
3158                events_removed: 1,
3159            }
3160        );
3161        assert!(store.get_session("old").unwrap().is_none());
3162        assert!(store.get_session("new").unwrap().is_some());
3163        let sessions = store.list_sessions("/ws").unwrap();
3164        assert_eq!(sessions.len(), 1);
3165        assert_eq!(sessions[0].id, "new");
3166    }
3167
3168    #[test]
3169    fn append_event_indexes_rules_from_payload() {
3170        let dir = TempDir::new().unwrap();
3171        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3172        store.upsert_session(&make_session("sr")).unwrap();
3173        let mut ev = make_event("sr", 0);
3174        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3175        store.append_event(&ev).unwrap();
3176        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3177        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3178    }
3179
3180    #[test]
3181    fn guidance_report_counts_skill_and_rule_sessions() {
3182        let dir = TempDir::new().unwrap();
3183        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3184        store.upsert_session(&make_session("sx")).unwrap();
3185        let mut ev = make_event("sx", 0);
3186        ev.payload =
3187            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3188        ev.cost_usd_e6 = Some(500_000);
3189        store.append_event(&ev).unwrap();
3190
3191        let mut skill_slugs = HashSet::new();
3192        skill_slugs.insert("tdd".into());
3193        let mut rule_slugs = HashSet::new();
3194        rule_slugs.insert("style".into());
3195
3196        let rep = store
3197            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3198            .unwrap();
3199        assert_eq!(rep.sessions_in_window, 1);
3200        let tdd = rep
3201            .rows
3202            .iter()
3203            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3204            .unwrap();
3205        assert_eq!(tdd.sessions, 1);
3206        assert!(tdd.on_disk);
3207        let style = rep
3208            .rows
3209            .iter()
3210            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3211            .unwrap();
3212        assert_eq!(style.sessions, 1);
3213        assert!(style.on_disk);
3214    }
3215
3216    #[test]
3217    fn prune_sessions_removes_rules_used_rows() {
3218        let dir = TempDir::new().unwrap();
3219        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3220        let mut old = make_session("old_r");
3221        old.started_at_ms = 1_000;
3222        store.upsert_session(&old).unwrap();
3223        let mut ev = make_event("old_r", 0);
3224        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3225        store.append_event(&ev).unwrap();
3226
3227        store.prune_sessions_started_before(5_000).unwrap();
3228        let n: i64 = store
3229            .conn
3230            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3231            .unwrap();
3232        assert_eq!(n, 0);
3233    }
3234}