Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
19/// Rows below this use `sessions.started_at_ms` for time-window matching.
20const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23    "CREATE TABLE IF NOT EXISTS sessions (
24        id TEXT PRIMARY KEY,
25        agent TEXT NOT NULL,
26        model TEXT,
27        workspace TEXT NOT NULL,
28        started_at_ms INTEGER NOT NULL,
29        ended_at_ms INTEGER,
30        status TEXT NOT NULL,
31        trace_path TEXT NOT NULL
32    )",
33    "CREATE TABLE IF NOT EXISTS events (
34        id INTEGER PRIMARY KEY AUTOINCREMENT,
35        session_id TEXT NOT NULL,
36        seq INTEGER NOT NULL,
37        ts_ms INTEGER NOT NULL,
38        kind TEXT NOT NULL,
39        source TEXT NOT NULL,
40        tool TEXT,
41        tokens_in INTEGER,
42        tokens_out INTEGER,
43        cost_usd_e6 INTEGER,
44        payload TEXT NOT NULL
45    )",
46    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47    "CREATE TABLE IF NOT EXISTS files_touched (
48        id INTEGER PRIMARY KEY AUTOINCREMENT,
49        session_id TEXT NOT NULL,
50        path TEXT NOT NULL
51    )",
52    "CREATE TABLE IF NOT EXISTS skills_used (
53        id INTEGER PRIMARY KEY AUTOINCREMENT,
54        session_id TEXT NOT NULL,
55        skill TEXT NOT NULL
56    )",
57    "CREATE TABLE IF NOT EXISTS sync_outbox (
58        id INTEGER PRIMARY KEY AUTOINCREMENT,
59        session_id TEXT NOT NULL,
60        payload TEXT NOT NULL,
61        sent INTEGER NOT NULL DEFAULT 0
62    )",
63    "CREATE TABLE IF NOT EXISTS experiments (
64        id TEXT PRIMARY KEY,
65        name TEXT NOT NULL,
66        created_at_ms INTEGER NOT NULL,
67        metadata TEXT NOT NULL DEFAULT '{}'
68    )",
69    "CREATE TABLE IF NOT EXISTS experiment_tags (
70        experiment_id TEXT NOT NULL,
71        session_id TEXT NOT NULL,
72        variant TEXT NOT NULL,
73        PRIMARY KEY (experiment_id, session_id)
74    )",
75    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76    "CREATE TABLE IF NOT EXISTS sync_state (
77        k TEXT PRIMARY KEY,
78        v TEXT NOT NULL
79    )",
80    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82    "CREATE TABLE IF NOT EXISTS tool_spans (
83        span_id TEXT PRIMARY KEY,
84        session_id TEXT NOT NULL,
85        tool TEXT,
86        tool_call_id TEXT,
87        status TEXT NOT NULL,
88        started_at_ms INTEGER,
89        ended_at_ms INTEGER,
90        lead_time_ms INTEGER,
91        tokens_in INTEGER,
92        tokens_out INTEGER,
93        reasoning_tokens INTEGER,
94        cost_usd_e6 INTEGER,
95        paths_json TEXT NOT NULL DEFAULT '[]'
96    )",
97    "CREATE TABLE IF NOT EXISTS tool_span_paths (
98        span_id TEXT NOT NULL,
99        path TEXT NOT NULL,
100        PRIMARY KEY (span_id, path)
101    )",
102    "CREATE TABLE IF NOT EXISTS session_repo_binding (
103        session_id TEXT PRIMARY KEY,
104        start_commit TEXT,
105        end_commit TEXT,
106        branch TEXT,
107        dirty_start INTEGER,
108        dirty_end INTEGER,
109        repo_binding_source TEXT NOT NULL DEFAULT ''
110    )",
111    "CREATE TABLE IF NOT EXISTS repo_snapshots (
112        id TEXT PRIMARY KEY,
113        workspace TEXT NOT NULL,
114        head_commit TEXT,
115        dirty_fingerprint TEXT NOT NULL,
116        analyzer_version TEXT NOT NULL,
117        indexed_at_ms INTEGER NOT NULL,
118        dirty INTEGER NOT NULL DEFAULT 0,
119        graph_path TEXT NOT NULL
120    )",
121    "CREATE TABLE IF NOT EXISTS file_facts (
122        snapshot_id TEXT NOT NULL,
123        path TEXT NOT NULL,
124        language TEXT NOT NULL,
125        bytes INTEGER NOT NULL,
126        loc INTEGER NOT NULL,
127        sloc INTEGER NOT NULL,
128        complexity_total INTEGER NOT NULL,
129        max_fn_complexity INTEGER NOT NULL,
130        symbol_count INTEGER NOT NULL,
131        import_count INTEGER NOT NULL,
132        fan_in INTEGER NOT NULL,
133        fan_out INTEGER NOT NULL,
134        churn_30d INTEGER NOT NULL,
135        churn_90d INTEGER NOT NULL,
136        authors_90d INTEGER NOT NULL,
137        last_changed_ms INTEGER,
138        PRIMARY KEY (snapshot_id, path)
139    )",
140    "CREATE TABLE IF NOT EXISTS repo_edges (
141        snapshot_id TEXT NOT NULL,
142        from_id TEXT NOT NULL,
143        to_id TEXT NOT NULL,
144        kind TEXT NOT NULL,
145        weight INTEGER NOT NULL,
146        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147    )",
148    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
149    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
151    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152    "CREATE TABLE IF NOT EXISTS rules_used (
153        id INTEGER PRIMARY KEY AUTOINCREMENT,
154        session_id TEXT NOT NULL,
155        rule TEXT NOT NULL
156    )",
157    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
159    "CREATE TABLE IF NOT EXISTS remote_pull_state (
160        id INTEGER PRIMARY KEY CHECK (id = 1),
161        query_provider TEXT NOT NULL DEFAULT 'none',
162        cursor_json TEXT NOT NULL DEFAULT '',
163        last_success_ms INTEGER
164    )",
165    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166    "CREATE TABLE IF NOT EXISTS remote_sessions (
167        team_id TEXT NOT NULL,
168        workspace_hash TEXT NOT NULL,
169        session_id_hash TEXT NOT NULL,
170        json TEXT NOT NULL,
171        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172    )",
173    "CREATE TABLE IF NOT EXISTS remote_events (
174        team_id TEXT NOT NULL,
175        workspace_hash TEXT NOT NULL,
176        session_id_hash TEXT NOT NULL,
177        event_seq INTEGER NOT NULL,
178        json TEXT NOT NULL,
179        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180    )",
181    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182        team_id TEXT NOT NULL,
183        workspace_hash TEXT NOT NULL,
184        span_id_hash TEXT NOT NULL,
185        json TEXT NOT NULL,
186        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187    )",
188    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189        team_id TEXT NOT NULL,
190        workspace_hash TEXT NOT NULL,
191        snapshot_id_hash TEXT NOT NULL,
192        chunk_index INTEGER NOT NULL,
193        json TEXT NOT NULL,
194        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195    )",
196    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197        team_id TEXT NOT NULL,
198        workspace_hash TEXT NOT NULL,
199        fact_key TEXT NOT NULL,
200        json TEXT NOT NULL,
201        PRIMARY KEY (team_id, workspace_hash, fact_key)
202    )",
203    "CREATE TABLE IF NOT EXISTS session_evals (
204        id            TEXT    PRIMARY KEY,
205        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206        judge_model   TEXT    NOT NULL,
207        rubric_id     TEXT    NOT NULL,
208        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209        rationale     TEXT    NOT NULL,
210        flagged       INTEGER NOT NULL DEFAULT 0,
211        created_at_ms INTEGER NOT NULL
212    );
213    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
215    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
216        fingerprint   TEXT    PRIMARY KEY,
217        captured_at_ms INTEGER NOT NULL,
218        files_json    TEXT    NOT NULL,
219        total_bytes   INTEGER NOT NULL
220    )",
221];
222
223/// Per-workspace activity dashboard stats.
224#[derive(Clone)]
225pub struct InsightsStats {
226    pub total_sessions: u64,
227    pub running_sessions: u64,
228    pub total_events: u64,
229    /// (day label e.g. "Mon", count) last 7 days oldest first
230    pub sessions_by_day: Vec<(String, u64)>,
231    /// Recent sessions DESC by started_at, max 3; paired with event count
232    pub recent: Vec<(SessionRecord, u64)>,
233    /// Top tools by event count, max 5
234    pub top_tools: Vec<(String, u64)>,
235    pub total_cost_usd_e6: i64,
236    pub sessions_with_cost: u64,
237}
238
239/// Sync daemon / outbox status for `kaizen sync status`.
240pub struct SyncStatusSnapshot {
241    pub pending_outbox: u64,
242    pub last_success_ms: Option<u64>,
243    pub last_error: Option<String>,
244    pub consecutive_failures: u32,
245}
246
247/// Aggregate stats across sessions + events for a workspace.
248#[derive(serde::Serialize)]
249pub struct SummaryStats {
250    pub session_count: u64,
251    pub total_cost_usd_e6: i64,
252    pub by_agent: Vec<(String, u64)>,
253    pub by_model: Vec<(String, u64)>,
254    pub top_tools: Vec<(String, u64)>,
255}
256
257/// Skill vs Cursor rule for [`GuidancePerfRow`].
258#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
259#[serde(rename_all = "lowercase")]
260pub enum GuidanceKind {
261    Skill,
262    Rule,
263}
264
265/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
266#[derive(Clone, Debug, serde::Serialize)]
267pub struct GuidancePerfRow {
268    pub kind: GuidanceKind,
269    pub id: String,
270    pub sessions: u64,
271    pub sessions_pct: f64,
272    pub total_cost_usd_e6: i64,
273    pub avg_cost_per_session_usd: Option<f64>,
274    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
275    pub on_disk: bool,
276}
277
278/// Aggregated skill/rule adoption and cost proxy for a time window.
279#[derive(Clone, Debug, serde::Serialize)]
280pub struct GuidanceReport {
281    pub workspace: String,
282    pub window_start_ms: u64,
283    pub window_end_ms: u64,
284    pub sessions_in_window: u64,
285    pub workspace_avg_cost_per_session_usd: Option<f64>,
286    pub rows: Vec<GuidancePerfRow>,
287}
288
289/// Result of [`Store::prune_sessions_started_before`].
290#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
291pub struct PruneStats {
292    pub sessions_removed: u64,
293    pub events_removed: u64,
294}
295
296/// `sync_state` keys for agent rescan throttling and auto-prune.
297pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
298pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
299
300pub struct ToolSpanSyncRow {
301    pub span_id: String,
302    pub session_id: String,
303    pub tool: Option<String>,
304    pub tool_call_id: Option<String>,
305    pub status: String,
306    pub started_at_ms: Option<u64>,
307    pub ended_at_ms: Option<u64>,
308    pub lead_time_ms: Option<u64>,
309    pub tokens_in: Option<u32>,
310    pub tokens_out: Option<u32>,
311    pub reasoning_tokens: Option<u32>,
312    pub cost_usd_e6: Option<i64>,
313    pub paths: Vec<String>,
314}
315
316pub struct Store {
317    conn: Connection,
318}
319
320impl Store {
321    pub(crate) fn conn(&self) -> &Connection {
322        &self.conn
323    }
324
325    pub fn open(path: &Path) -> Result<Self> {
326        if let Some(parent) = path.parent() {
327            std::fs::create_dir_all(parent)?;
328        }
329        let conn =
330            Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
331        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
332        for sql in MIGRATIONS {
333            conn.execute_batch(sql)?;
334        }
335        ensure_schema_columns(&conn)?;
336        Ok(Self { conn })
337    }
338
339    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
340        self.conn.execute(
341            "INSERT INTO sessions (
342                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
343                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
344                prompt_fingerprint
345             )
346             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
347             ON CONFLICT(id) DO UPDATE SET
348               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
349               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
350               status=excluded.status, trace_path=excluded.trace_path,
351               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
352               branch=excluded.branch, dirty_start=excluded.dirty_start,
353               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
354               prompt_fingerprint=excluded.prompt_fingerprint",
355            params![
356                s.id,
357                s.agent,
358                s.model,
359                s.workspace,
360                s.started_at_ms as i64,
361                s.ended_at_ms.map(|v| v as i64),
362                format!("{:?}", s.status),
363                s.trace_path,
364                s.start_commit,
365                s.end_commit,
366                s.branch,
367                s.dirty_start.map(bool_to_i64),
368                s.dirty_end.map(bool_to_i64),
369                s.repo_binding_source.clone().unwrap_or_default(),
370                s.prompt_fingerprint.as_deref(),
371            ],
372        )?;
373        self.conn.execute(
374            "INSERT INTO session_repo_binding (
375                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
376             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
377             ON CONFLICT(session_id) DO UPDATE SET
378                start_commit=excluded.start_commit,
379                end_commit=excluded.end_commit,
380                branch=excluded.branch,
381                dirty_start=excluded.dirty_start,
382                dirty_end=excluded.dirty_end,
383                repo_binding_source=excluded.repo_binding_source",
384            params![
385                s.id,
386                s.start_commit,
387                s.end_commit,
388                s.branch,
389                s.dirty_start.map(bool_to_i64),
390                s.dirty_end.map(bool_to_i64),
391                s.repo_binding_source.clone().unwrap_or_default(),
392            ],
393        )?;
394        Ok(())
395    }
396
397    /// Insert a minimal session row if none exists. Used by hook ingestion when
398    /// the first observed event is not `SessionStart` (hooks installed mid-session).
399    pub fn ensure_session_stub(
400        &self,
401        id: &str,
402        agent: &str,
403        workspace: &str,
404        started_at_ms: u64,
405    ) -> Result<()> {
406        self.conn.execute(
407            "INSERT OR IGNORE INTO sessions (
408                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
409                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
410             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
411            params![id, agent, workspace, started_at_ms as i64],
412        )?;
413        Ok(())
414    }
415
416    /// Next `seq` for a new event in this session (0 when there are no events yet).
417    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
418        let n: i64 = self.conn.query_row(
419            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
420            [session_id],
421            |r| r.get(0),
422        )?;
423        Ok(n as u64)
424    }
425
426    pub fn append_event(&self, e: &Event) -> Result<()> {
427        self.append_event_with_sync(e, None)
428    }
429
430    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
431    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
432        let payload = serde_json::to_string(&e.payload)?;
433        self.conn.execute(
434            "INSERT INTO events (
435                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
436                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
437             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
438             ON CONFLICT(session_id, seq) DO UPDATE SET
439                ts_ms = excluded.ts_ms,
440                ts_exact = excluded.ts_exact,
441                kind = excluded.kind,
442                source = excluded.source,
443                tool = excluded.tool,
444                tool_call_id = excluded.tool_call_id,
445                tokens_in = excluded.tokens_in,
446                tokens_out = excluded.tokens_out,
447                reasoning_tokens = excluded.reasoning_tokens,
448                cost_usd_e6 = excluded.cost_usd_e6,
449                payload = excluded.payload",
450            params![
451                e.session_id,
452                e.seq as i64,
453                e.ts_ms as i64,
454                bool_to_i64(e.ts_exact),
455                format!("{:?}", e.kind),
456                format!("{:?}", e.source),
457                e.tool,
458                e.tool_call_id,
459                e.tokens_in.map(|v| v as i64),
460                e.tokens_out.map(|v| v as i64),
461                e.reasoning_tokens.map(|v| v as i64),
462                e.cost_usd_e6,
463                payload,
464            ],
465        )?;
466        if self.conn.changes() == 0 {
467            return Ok(());
468        }
469        index_event_derived(&self.conn, e)?;
470        rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
471        let Some(ctx) = ctx else {
472            return Ok(());
473        };
474        let sync = &ctx.sync;
475        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
476            return Ok(());
477        }
478        let Some(salt) = try_team_salt(sync) else {
479            tracing::warn!(
480                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
481            );
482            return Ok(());
483        };
484        if sync.sample_rate < 1.0 {
485            let u: f64 = rand::random();
486            if u > sync.sample_rate {
487                return Ok(());
488            }
489        }
490        let Some(session) = self.get_session(&e.session_id)? else {
491            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
492            return Ok(());
493        };
494        let mut outbound = outbound_event_from_row(e, &session, &salt);
495        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
496        let row = serde_json::to_string(&outbound)?;
497        self.conn.execute(
498            "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
499            params![e.session_id, row],
500        )?;
501        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
502        Ok(())
503    }
504
505    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
506        let mut stmt = self.conn.prepare(
507            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
508        )?;
509        let rows = stmt.query_map(params![limit as i64], |row| {
510            Ok((
511                row.get::<_, i64>(0)?,
512                row.get::<_, String>(1)?,
513                row.get::<_, String>(2)?,
514            ))
515        })?;
516        let mut out = Vec::new();
517        for r in rows {
518            out.push(r?);
519        }
520        Ok(out)
521    }
522
523    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
524        for id in ids {
525            self.conn
526                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
527        }
528        Ok(())
529    }
530
531    pub fn replace_outbox_rows(
532        &self,
533        owner_id: &str,
534        kind: &str,
535        payloads: &[String],
536    ) -> Result<()> {
537        self.conn.execute(
538            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
539            params![owner_id, kind],
540        )?;
541        for payload in payloads {
542            self.conn.execute(
543                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
544                params![owner_id, kind, payload],
545            )?;
546        }
547        Ok(())
548    }
549
550    pub fn outbox_pending_count(&self) -> Result<u64> {
551        let c: i64 =
552            self.conn
553                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
554                    r.get(0)
555                })?;
556        Ok(c as u64)
557    }
558
559    pub fn set_sync_state_ok(&self) -> Result<()> {
560        let now = now_ms().to_string();
561        self.conn.execute(
562            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
563             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
564            params![now],
565        )?;
566        self.conn.execute(
567            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
568             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
569            [],
570        )?;
571        self.conn
572            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
573        Ok(())
574    }
575
576    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
577        let prev: i64 = self
578            .conn
579            .query_row(
580                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
581                [],
582                |r| {
583                    let s: String = r.get(0)?;
584                    Ok(s.parse::<i64>().unwrap_or(0))
585                },
586            )
587            .optional()?
588            .unwrap_or(0);
589        let next = prev.saturating_add(1);
590        self.conn.execute(
591            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
592             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
593            params![msg],
594        )?;
595        self.conn.execute(
596            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
597             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
598            params![next.to_string()],
599        )?;
600        Ok(())
601    }
602
603    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
604        let pending_outbox = self.outbox_pending_count()?;
605        let last_success_ms = self
606            .conn
607            .query_row(
608                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
609                [],
610                |r| r.get::<_, String>(0),
611            )
612            .optional()?
613            .and_then(|s| s.parse().ok());
614        let last_error = self
615            .conn
616            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
617                r.get::<_, String>(0)
618            })
619            .optional()?;
620        let consecutive_failures = self
621            .conn
622            .query_row(
623                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
624                [],
625                |r| r.get::<_, String>(0),
626            )
627            .optional()?
628            .and_then(|s| s.parse().ok())
629            .unwrap_or(0);
630        Ok(SyncStatusSnapshot {
631            pending_outbox,
632            last_success_ms,
633            last_error,
634            consecutive_failures,
635        })
636    }
637
638    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
639        let row: Option<String> = self
640            .conn
641            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
642                r.get::<_, String>(0)
643            })
644            .optional()?;
645        Ok(row.and_then(|s| s.parse().ok()))
646    }
647
648    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
649        self.conn.execute(
650            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
651             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
652            params![key, v.to_string()],
653        )?;
654        Ok(())
655    }
656
657    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
658    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
659        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
660        let sessions_to_remove: i64 = tx.query_row(
661            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
662            params![cutoff_ms],
663            |r| r.get(0),
664        )?;
665        let events_to_remove: i64 = tx.query_row(
666            "SELECT COUNT(*) FROM events WHERE session_id IN \
667             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
668            params![cutoff_ms],
669            |r| r.get(0),
670        )?;
671
672        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
673        tx.execute(
674            &format!(
675                "DELETE FROM tool_span_paths WHERE span_id IN \
676                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
677            ),
678            params![cutoff_ms],
679        )?;
680        tx.execute(
681            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
682            params![cutoff_ms],
683        )?;
684        tx.execute(
685            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
686            params![cutoff_ms],
687        )?;
688        tx.execute(
689            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
690            params![cutoff_ms],
691        )?;
692        tx.execute(
693            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
694            params![cutoff_ms],
695        )?;
696        tx.execute(
697            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
698            params![cutoff_ms],
699        )?;
700        tx.execute(
701            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
702            params![cutoff_ms],
703        )?;
704        tx.execute(
705            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
706            params![cutoff_ms],
707        )?;
708        tx.execute(
709            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
710            params![cutoff_ms],
711        )?;
712        tx.execute(
713            "DELETE FROM sessions WHERE started_at_ms < ?1",
714            params![cutoff_ms],
715        )?;
716        tx.commit()?;
717        Ok(PruneStats {
718            sessions_removed: sessions_to_remove as u64,
719            events_removed: events_to_remove as u64,
720        })
721    }
722
723    /// Reclaim file space after large deletes (exclusive lock; can be slow).
724    pub fn vacuum(&self) -> Result<()> {
725        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
726        Ok(())
727    }
728
729    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
730        let mut stmt = self.conn.prepare(
731            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
732                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
733                    prompt_fingerprint
734             FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
735        )?;
736        let rows = stmt.query_map(params![workspace], |row| {
737            Ok((
738                row.get::<_, String>(0)?,
739                row.get::<_, String>(1)?,
740                row.get::<_, Option<String>>(2)?,
741                row.get::<_, String>(3)?,
742                row.get::<_, i64>(4)?,
743                row.get::<_, Option<i64>>(5)?,
744                row.get::<_, String>(6)?,
745                row.get::<_, String>(7)?,
746                row.get::<_, Option<String>>(8)?,
747                row.get::<_, Option<String>>(9)?,
748                row.get::<_, Option<String>>(10)?,
749                row.get::<_, Option<i64>>(11)?,
750                row.get::<_, Option<i64>>(12)?,
751                row.get::<_, String>(13)?,
752                row.get::<_, Option<String>>(14)?,
753            ))
754        })?;
755
756        let mut out = Vec::new();
757        for row in rows {
758            let (
759                id,
760                agent,
761                model,
762                workspace,
763                started,
764                ended,
765                status_str,
766                trace,
767                start_commit,
768                end_commit,
769                branch,
770                dirty_start,
771                dirty_end,
772                source,
773                prompt_fingerprint,
774            ) = row?;
775            out.push(SessionRecord {
776                id,
777                agent,
778                model,
779                workspace,
780                started_at_ms: started as u64,
781                ended_at_ms: ended.map(|v| v as u64),
782                status: status_from_str(&status_str),
783                trace_path: trace,
784                start_commit,
785                end_commit,
786                branch,
787                dirty_start: dirty_start.map(i64_to_bool),
788                dirty_end: dirty_end.map(i64_to_bool),
789                repo_binding_source: empty_to_none(source),
790                prompt_fingerprint,
791            });
792        }
793        Ok(out)
794    }
795
796    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
797        let session_count: i64 = self.conn.query_row(
798            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
799            params![workspace],
800            |r| r.get(0),
801        )?;
802
803        let total_cost: i64 = self.conn.query_row(
804            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
805             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
806            params![workspace],
807            |r| r.get(0),
808        )?;
809
810        let mut stmt = self.conn.prepare(
811            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
812        )?;
813        let by_agent: Vec<(String, u64)> = stmt
814            .query_map(params![workspace], |r| {
815                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
816            })?
817            .filter_map(|r| r.ok())
818            .collect();
819
820        let mut stmt = self.conn.prepare(
821            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
822        )?;
823        let by_model: Vec<(String, u64)> = stmt
824            .query_map(params![workspace], |r| {
825                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
826            })?
827            .filter_map(|r| r.ok())
828            .collect();
829
830        let mut stmt = self.conn.prepare(
831            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
832             WHERE s.workspace = ?1 AND tool IS NOT NULL
833             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
834        )?;
835        let top_tools: Vec<(String, u64)> = stmt
836            .query_map(params![workspace], |r| {
837                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
838            })?
839            .filter_map(|r| r.ok())
840            .collect();
841
842        Ok(SummaryStats {
843            session_count: session_count as u64,
844            total_cost_usd_e6: total_cost,
845            by_agent,
846            by_model,
847            top_tools,
848        })
849    }
850
851    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
852        let mut stmt = self.conn.prepare(
853            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
854                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
855             FROM events WHERE session_id = ?1 ORDER BY seq ASC",
856        )?;
857        let rows = stmt.query_map(params![session_id], |row| {
858            Ok((
859                row.get::<_, String>(0)?,
860                row.get::<_, i64>(1)?,
861                row.get::<_, i64>(2)?,
862                row.get::<_, i64>(3)?,
863                row.get::<_, String>(4)?,
864                row.get::<_, String>(5)?,
865                row.get::<_, Option<String>>(6)?,
866                row.get::<_, Option<String>>(7)?,
867                row.get::<_, Option<i64>>(8)?,
868                row.get::<_, Option<i64>>(9)?,
869                row.get::<_, Option<i64>>(10)?,
870                row.get::<_, Option<i64>>(11)?,
871                row.get::<_, String>(12)?,
872            ))
873        })?;
874
875        let mut events = Vec::new();
876        for row in rows {
877            let (
878                sid,
879                seq,
880                ts_ms,
881                ts_exact,
882                kind_str,
883                source_str,
884                tool,
885                tool_call_id,
886                tokens_in,
887                tokens_out,
888                reasoning_tokens,
889                cost_usd_e6,
890                payload_str,
891            ) = row?;
892            events.push(Event {
893                session_id: sid,
894                seq: seq as u64,
895                ts_ms: ts_ms as u64,
896                ts_exact: ts_exact != 0,
897                kind: kind_from_str(&kind_str),
898                source: source_from_str(&source_str),
899                tool,
900                tool_call_id,
901                tokens_in: tokens_in.map(|v| v as u32),
902                tokens_out: tokens_out.map(|v| v as u32),
903                reasoning_tokens: reasoning_tokens.map(|v| v as u32),
904                cost_usd_e6,
905                payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
906            });
907        }
908        Ok(events)
909    }
910
911    /// Update only status for existing session.
912    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
913        self.conn.execute(
914            "UPDATE sessions SET status = ?1 WHERE id = ?2",
915            params![format!("{:?}", status), id],
916        )?;
917        Ok(())
918    }
919
920    /// Workspace activity dashboard — feeds `cmd_insights`.
921    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
922        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
923        Ok(InsightsStats {
924            total_sessions: count_q(
925                &self.conn,
926                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
927                workspace,
928            )?,
929            running_sessions: count_q(
930                &self.conn,
931                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
932                workspace,
933            )?,
934            total_events: count_q(
935                &self.conn,
936                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
937                workspace,
938            )?,
939            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
940            recent: recent_sessions_3(&self.conn, workspace)?,
941            top_tools: top_tools_5(&self.conn, workspace)?,
942            total_cost_usd_e6,
943            sessions_with_cost,
944        })
945    }
946
947    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
948    pub fn retro_events_in_window(
949        &self,
950        workspace: &str,
951        start_ms: u64,
952        end_ms: u64,
953    ) -> Result<Vec<(SessionRecord, Event)>> {
954        let mut stmt = self.conn.prepare(
955            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
956                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
957                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
958                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
959             FROM events e
960             JOIN sessions s ON s.id = e.session_id
961             WHERE s.workspace = ?1
962               AND (
963                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
964                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
965               )
966             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
967        )?;
968        let rows = stmt.query_map(
969            params![
970                workspace,
971                start_ms as i64,
972                end_ms as i64,
973                SYNTHETIC_TS_CEILING_MS,
974            ],
975            |row| {
976                let payload_str: String = row.get(12)?;
977                let status_str: String = row.get(19)?;
978                Ok((
979                    SessionRecord {
980                        id: row.get(13)?,
981                        agent: row.get(14)?,
982                        model: row.get(15)?,
983                        workspace: row.get(16)?,
984                        started_at_ms: row.get::<_, i64>(17)? as u64,
985                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
986                        status: status_from_str(&status_str),
987                        trace_path: row.get(20)?,
988                        start_commit: row.get(21)?,
989                        end_commit: row.get(22)?,
990                        branch: row.get(23)?,
991                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
992                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
993                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
994                        prompt_fingerprint: None,
995                    },
996                    Event {
997                        session_id: row.get(0)?,
998                        seq: row.get::<_, i64>(1)? as u64,
999                        ts_ms: row.get::<_, i64>(2)? as u64,
1000                        ts_exact: row.get::<_, i64>(3)? != 0,
1001                        kind: kind_from_str(&row.get::<_, String>(4)?),
1002                        source: source_from_str(&row.get::<_, String>(5)?),
1003                        tool: row.get(6)?,
1004                        tool_call_id: row.get(7)?,
1005                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1006                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1007                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1008                        cost_usd_e6: row.get(11)?,
1009                        payload: serde_json::from_str(&payload_str)
1010                            .unwrap_or(serde_json::Value::Null),
1011                    },
1012                ))
1013            },
1014        )?;
1015
1016        let mut out = Vec::new();
1017        for r in rows {
1018            out.push(r?);
1019        }
1020        Ok(out)
1021    }
1022
1023    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1024    pub fn files_touched_in_window(
1025        &self,
1026        workspace: &str,
1027        start_ms: u64,
1028        end_ms: u64,
1029    ) -> Result<Vec<(String, String)>> {
1030        let mut stmt = self.conn.prepare(
1031            "SELECT DISTINCT ft.session_id, ft.path
1032             FROM files_touched ft
1033             JOIN sessions s ON s.id = ft.session_id
1034             WHERE s.workspace = ?1
1035               AND EXISTS (
1036                 SELECT 1 FROM events e
1037                 JOIN sessions ss ON ss.id = e.session_id
1038                 WHERE e.session_id = ft.session_id
1039                   AND (
1040                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1041                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1042                   )
1043               )
1044             ORDER BY ft.session_id, ft.path",
1045        )?;
1046        let out: Vec<(String, String)> = stmt
1047            .query_map(
1048                params![
1049                    workspace,
1050                    start_ms as i64,
1051                    end_ms as i64,
1052                    SYNTHETIC_TS_CEILING_MS,
1053                ],
1054                |r| Ok((r.get(0)?, r.get(1)?)),
1055            )?
1056            .filter_map(|r| r.ok())
1057            .collect();
1058        Ok(out)
1059    }
1060
1061    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1062    /// (any session with an indexed skill row; join events optional — use row existence).
1063    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1064        let mut stmt = self.conn.prepare(
1065            "SELECT DISTINCT su.skill
1066             FROM skills_used su
1067             JOIN sessions s ON s.id = su.session_id
1068             WHERE s.workspace = ?1
1069               AND EXISTS (
1070                 SELECT 1 FROM events e
1071                 JOIN sessions ss ON ss.id = e.session_id
1072                 WHERE e.session_id = su.session_id
1073                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1074               )
1075             ORDER BY su.skill",
1076        )?;
1077        let out: Vec<String> = stmt
1078            .query_map(
1079                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1080                |r| r.get::<_, String>(0),
1081            )?
1082            .filter_map(|r| r.ok())
1083            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1084            .collect();
1085        Ok(out)
1086    }
1087
1088    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1089    pub fn skills_used_in_window(
1090        &self,
1091        workspace: &str,
1092        start_ms: u64,
1093        end_ms: u64,
1094    ) -> Result<Vec<(String, String)>> {
1095        let mut stmt = self.conn.prepare(
1096            "SELECT DISTINCT su.session_id, su.skill
1097             FROM skills_used su
1098             JOIN sessions s ON s.id = su.session_id
1099             WHERE s.workspace = ?1
1100               AND EXISTS (
1101                 SELECT 1 FROM events e
1102                 JOIN sessions ss ON ss.id = e.session_id
1103                 WHERE e.session_id = su.session_id
1104                   AND (
1105                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1106                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1107                   )
1108               )
1109             ORDER BY su.session_id, su.skill",
1110        )?;
1111        let out: Vec<(String, String)> = stmt
1112            .query_map(
1113                params![
1114                    workspace,
1115                    start_ms as i64,
1116                    end_ms as i64,
1117                    SYNTHETIC_TS_CEILING_MS,
1118                ],
1119                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1120            )?
1121            .filter_map(|r| r.ok())
1122            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1123            .collect();
1124        Ok(out)
1125    }
1126
1127    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1128    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1129        let mut stmt = self.conn.prepare(
1130            "SELECT DISTINCT ru.rule
1131             FROM rules_used ru
1132             JOIN sessions s ON s.id = ru.session_id
1133             WHERE s.workspace = ?1
1134               AND EXISTS (
1135                 SELECT 1 FROM events e
1136                 JOIN sessions ss ON ss.id = e.session_id
1137                 WHERE e.session_id = ru.session_id
1138                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1139               )
1140             ORDER BY ru.rule",
1141        )?;
1142        let out: Vec<String> = stmt
1143            .query_map(
1144                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1145                |r| r.get::<_, String>(0),
1146            )?
1147            .filter_map(|r| r.ok())
1148            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1149            .collect();
1150        Ok(out)
1151    }
1152
1153    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1154    pub fn rules_used_in_window(
1155        &self,
1156        workspace: &str,
1157        start_ms: u64,
1158        end_ms: u64,
1159    ) -> Result<Vec<(String, String)>> {
1160        let mut stmt = self.conn.prepare(
1161            "SELECT DISTINCT ru.session_id, ru.rule
1162             FROM rules_used ru
1163             JOIN sessions s ON s.id = ru.session_id
1164             WHERE s.workspace = ?1
1165               AND EXISTS (
1166                 SELECT 1 FROM events e
1167                 JOIN sessions ss ON ss.id = e.session_id
1168                 WHERE e.session_id = ru.session_id
1169                   AND (
1170                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1171                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1172                   )
1173               )
1174             ORDER BY ru.session_id, ru.rule",
1175        )?;
1176        let out: Vec<(String, String)> = stmt
1177            .query_map(
1178                params![
1179                    workspace,
1180                    start_ms as i64,
1181                    end_ms as i64,
1182                    SYNTHETIC_TS_CEILING_MS,
1183                ],
1184                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1185            )?
1186            .filter_map(|r| r.ok())
1187            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1188            .collect();
1189        Ok(out)
1190    }
1191
1192    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1193    pub fn sessions_active_in_window(
1194        &self,
1195        workspace: &str,
1196        start_ms: u64,
1197        end_ms: u64,
1198    ) -> Result<HashSet<String>> {
1199        let mut stmt = self.conn.prepare(
1200            "SELECT DISTINCT s.id
1201             FROM sessions s
1202             WHERE s.workspace = ?1
1203               AND EXISTS (
1204                 SELECT 1 FROM events e
1205                 WHERE e.session_id = s.id
1206                   AND (
1207                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1208                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1209                   )
1210               )",
1211        )?;
1212        let out: HashSet<String> = stmt
1213            .query_map(
1214                params![
1215                    workspace,
1216                    start_ms as i64,
1217                    end_ms as i64,
1218                    SYNTHETIC_TS_CEILING_MS,
1219                ],
1220                |r| r.get(0),
1221            )?
1222            .filter_map(|r| r.ok())
1223            .collect();
1224        Ok(out)
1225    }
1226
1227    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1228    pub fn session_costs_usd_e6_in_window(
1229        &self,
1230        workspace: &str,
1231        start_ms: u64,
1232        end_ms: u64,
1233    ) -> Result<HashMap<String, i64>> {
1234        let mut stmt = self.conn.prepare(
1235            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1236             FROM events e
1237             JOIN sessions s ON s.id = e.session_id
1238             WHERE s.workspace = ?1
1239               AND (
1240                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1241                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1242               )
1243             GROUP BY e.session_id",
1244        )?;
1245        let rows: Vec<(String, i64)> = stmt
1246            .query_map(
1247                params![
1248                    workspace,
1249                    start_ms as i64,
1250                    end_ms as i64,
1251                    SYNTHETIC_TS_CEILING_MS,
1252                ],
1253                |r| Ok((r.get(0)?, r.get(1)?)),
1254            )?
1255            .filter_map(|r| r.ok())
1256            .collect();
1257        Ok(rows.into_iter().collect())
1258    }
1259
1260    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1261    pub fn guidance_report(
1262        &self,
1263        workspace: &str,
1264        window_start_ms: u64,
1265        window_end_ms: u64,
1266        skill_slugs_on_disk: &HashSet<String>,
1267        rule_slugs_on_disk: &HashSet<String>,
1268    ) -> Result<GuidanceReport> {
1269        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1270        let denom = active.len() as u64;
1271        let costs =
1272            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1273
1274        let workspace_avg_cost_per_session_usd = if denom > 0 {
1275            let total_e6: i64 = active
1276                .iter()
1277                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1278                .sum();
1279            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1280        } else {
1281            None
1282        };
1283
1284        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1285        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1286            skill_sessions.entry(skill).or_default().insert(sid);
1287        }
1288        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1289        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1290            rule_sessions.entry(rule).or_default().insert(sid);
1291        }
1292
1293        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1294
1295        let mut push_row =
1296            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1297                let sessions = sids.len() as u64;
1298                let sessions_pct = if denom > 0 {
1299                    sessions as f64 * 100.0 / denom as f64
1300                } else {
1301                    0.0
1302                };
1303                let total_cost_usd_e6: i64 = sids
1304                    .iter()
1305                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1306                    .sum();
1307                let avg_cost_per_session_usd = if sessions > 0 {
1308                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1309                } else {
1310                    None
1311                };
1312                let vs_workspace_avg_cost_per_session_usd =
1313                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1314                        (Some(avg), Some(w)) => Some(avg - w),
1315                        _ => None,
1316                    };
1317                rows.push(GuidancePerfRow {
1318                    kind,
1319                    id,
1320                    sessions,
1321                    sessions_pct,
1322                    total_cost_usd_e6,
1323                    avg_cost_per_session_usd,
1324                    vs_workspace_avg_cost_per_session_usd,
1325                    on_disk,
1326                });
1327            };
1328
1329        let mut seen_skills: HashSet<String> = HashSet::new();
1330        for (id, sids) in &skill_sessions {
1331            seen_skills.insert(id.clone());
1332            push_row(
1333                GuidanceKind::Skill,
1334                id.clone(),
1335                sids,
1336                skill_slugs_on_disk.contains(id),
1337            );
1338        }
1339        for slug in skill_slugs_on_disk {
1340            if seen_skills.contains(slug) {
1341                continue;
1342            }
1343            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1344        }
1345
1346        let mut seen_rules: HashSet<String> = HashSet::new();
1347        for (id, sids) in &rule_sessions {
1348            seen_rules.insert(id.clone());
1349            push_row(
1350                GuidanceKind::Rule,
1351                id.clone(),
1352                sids,
1353                rule_slugs_on_disk.contains(id),
1354            );
1355        }
1356        for slug in rule_slugs_on_disk {
1357            if seen_rules.contains(slug) {
1358                continue;
1359            }
1360            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1361        }
1362
1363        rows.sort_by(|a, b| {
1364            b.sessions
1365                .cmp(&a.sessions)
1366                .then_with(|| a.kind.cmp(&b.kind))
1367                .then_with(|| a.id.cmp(&b.id))
1368        });
1369
1370        Ok(GuidanceReport {
1371            workspace: workspace.to_string(),
1372            window_start_ms,
1373            window_end_ms,
1374            sessions_in_window: denom,
1375            workspace_avg_cost_per_session_usd,
1376            rows,
1377        })
1378    }
1379
1380    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1381        let mut stmt = self.conn.prepare(
1382            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1383                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1384                    prompt_fingerprint
1385             FROM sessions WHERE id = ?1",
1386        )?;
1387        let mut rows = stmt.query_map(params![id], |row| {
1388            Ok((
1389                row.get::<_, String>(0)?,
1390                row.get::<_, String>(1)?,
1391                row.get::<_, Option<String>>(2)?,
1392                row.get::<_, String>(3)?,
1393                row.get::<_, i64>(4)?,
1394                row.get::<_, Option<i64>>(5)?,
1395                row.get::<_, String>(6)?,
1396                row.get::<_, String>(7)?,
1397                row.get::<_, Option<String>>(8)?,
1398                row.get::<_, Option<String>>(9)?,
1399                row.get::<_, Option<String>>(10)?,
1400                row.get::<_, Option<i64>>(11)?,
1401                row.get::<_, Option<i64>>(12)?,
1402                row.get::<_, String>(13)?,
1403                row.get::<_, Option<String>>(14)?,
1404            ))
1405        })?;
1406
1407        if let Some(row) = rows.next() {
1408            let (
1409                id,
1410                agent,
1411                model,
1412                workspace,
1413                started,
1414                ended,
1415                status_str,
1416                trace,
1417                start_commit,
1418                end_commit,
1419                branch,
1420                dirty_start,
1421                dirty_end,
1422                source,
1423                prompt_fingerprint,
1424            ) = row?;
1425            Ok(Some(SessionRecord {
1426                id,
1427                agent,
1428                model,
1429                workspace,
1430                started_at_ms: started as u64,
1431                ended_at_ms: ended.map(|v| v as u64),
1432                status: status_from_str(&status_str),
1433                trace_path: trace,
1434                start_commit,
1435                end_commit,
1436                branch,
1437                dirty_start: dirty_start.map(i64_to_bool),
1438                dirty_end: dirty_end.map(i64_to_bool),
1439                repo_binding_source: empty_to_none(source),
1440                prompt_fingerprint,
1441            }))
1442        } else {
1443            Ok(None)
1444        }
1445    }
1446
1447    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1448        let mut stmt = self.conn.prepare(
1449            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1450                    indexed_at_ms, dirty, graph_path
1451             FROM repo_snapshots WHERE workspace = ?1
1452             ORDER BY indexed_at_ms DESC LIMIT 1",
1453        )?;
1454        let mut rows = stmt.query_map(params![workspace], |row| {
1455            Ok(RepoSnapshotRecord {
1456                id: row.get(0)?,
1457                workspace: row.get(1)?,
1458                head_commit: row.get(2)?,
1459                dirty_fingerprint: row.get(3)?,
1460                analyzer_version: row.get(4)?,
1461                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1462                dirty: row.get::<_, i64>(6)? != 0,
1463                graph_path: row.get(7)?,
1464            })
1465        })?;
1466        Ok(rows.next().transpose()?)
1467    }
1468
1469    pub fn save_repo_snapshot(
1470        &self,
1471        snapshot: &RepoSnapshotRecord,
1472        facts: &[FileFact],
1473        edges: &[RepoEdge],
1474    ) -> Result<()> {
1475        self.conn.execute(
1476            "INSERT INTO repo_snapshots (
1477                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1478                indexed_at_ms, dirty, graph_path
1479             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1480             ON CONFLICT(id) DO UPDATE SET
1481                workspace=excluded.workspace,
1482                head_commit=excluded.head_commit,
1483                dirty_fingerprint=excluded.dirty_fingerprint,
1484                analyzer_version=excluded.analyzer_version,
1485                indexed_at_ms=excluded.indexed_at_ms,
1486                dirty=excluded.dirty,
1487                graph_path=excluded.graph_path",
1488            params![
1489                snapshot.id,
1490                snapshot.workspace,
1491                snapshot.head_commit,
1492                snapshot.dirty_fingerprint,
1493                snapshot.analyzer_version,
1494                snapshot.indexed_at_ms as i64,
1495                bool_to_i64(snapshot.dirty),
1496                snapshot.graph_path,
1497            ],
1498        )?;
1499        self.conn.execute(
1500            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1501            params![snapshot.id],
1502        )?;
1503        self.conn.execute(
1504            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1505            params![snapshot.id],
1506        )?;
1507        for fact in facts {
1508            self.conn.execute(
1509                "INSERT INTO file_facts (
1510                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1511                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1512                    churn_30d, churn_90d, authors_90d, last_changed_ms
1513                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1514                params![
1515                    fact.snapshot_id,
1516                    fact.path,
1517                    fact.language,
1518                    fact.bytes as i64,
1519                    fact.loc as i64,
1520                    fact.sloc as i64,
1521                    fact.complexity_total as i64,
1522                    fact.max_fn_complexity as i64,
1523                    fact.symbol_count as i64,
1524                    fact.import_count as i64,
1525                    fact.fan_in as i64,
1526                    fact.fan_out as i64,
1527                    fact.churn_30d as i64,
1528                    fact.churn_90d as i64,
1529                    fact.authors_90d as i64,
1530                    fact.last_changed_ms.map(|v| v as i64),
1531                ],
1532            )?;
1533        }
1534        for edge in edges {
1535            self.conn.execute(
1536                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1537                 VALUES (?1, ?2, ?3, ?4, ?5)
1538                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1539                 DO UPDATE SET weight = weight + excluded.weight",
1540                params![
1541                    snapshot.id,
1542                    edge.from_path,
1543                    edge.to_path,
1544                    edge.kind,
1545                    edge.weight as i64,
1546                ],
1547            )?;
1548        }
1549        Ok(())
1550    }
1551
1552    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1553        let mut stmt = self.conn.prepare(
1554            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1555                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1556                    churn_30d, churn_90d, authors_90d, last_changed_ms
1557             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1558        )?;
1559        let rows = stmt.query_map(params![snapshot_id], |row| {
1560            Ok(FileFact {
1561                snapshot_id: row.get(0)?,
1562                path: row.get(1)?,
1563                language: row.get(2)?,
1564                bytes: row.get::<_, i64>(3)? as u64,
1565                loc: row.get::<_, i64>(4)? as u32,
1566                sloc: row.get::<_, i64>(5)? as u32,
1567                complexity_total: row.get::<_, i64>(6)? as u32,
1568                max_fn_complexity: row.get::<_, i64>(7)? as u32,
1569                symbol_count: row.get::<_, i64>(8)? as u32,
1570                import_count: row.get::<_, i64>(9)? as u32,
1571                fan_in: row.get::<_, i64>(10)? as u32,
1572                fan_out: row.get::<_, i64>(11)? as u32,
1573                churn_30d: row.get::<_, i64>(12)? as u32,
1574                churn_90d: row.get::<_, i64>(13)? as u32,
1575                authors_90d: row.get::<_, i64>(14)? as u32,
1576                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1577            })
1578        })?;
1579        Ok(rows.filter_map(|row| row.ok()).collect())
1580    }
1581
1582    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1583        let mut stmt = self.conn.prepare(
1584            "SELECT from_id, to_id, kind, weight
1585             FROM repo_edges WHERE snapshot_id = ?1
1586             ORDER BY kind, from_id, to_id",
1587        )?;
1588        let rows = stmt.query_map(params![snapshot_id], |row| {
1589            Ok(RepoEdge {
1590                from_path: row.get(0)?,
1591                to_path: row.get(1)?,
1592                kind: row.get(2)?,
1593                weight: row.get::<_, i64>(3)? as u32,
1594            })
1595        })?;
1596        Ok(rows.filter_map(|row| row.ok()).collect())
1597    }
1598
1599    pub fn tool_spans_in_window(
1600        &self,
1601        workspace: &str,
1602        start_ms: u64,
1603        end_ms: u64,
1604    ) -> Result<Vec<ToolSpanView>> {
1605        let mut stmt = self.conn.prepare(
1606            "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1607                    ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1608             FROM tool_spans ts
1609             JOIN sessions s ON s.id = ts.session_id
1610             WHERE s.workspace = ?1
1611               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1612               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1613             ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1614        )?;
1615        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1616            let paths_json: String = row.get(7)?;
1617            Ok(ToolSpanView {
1618                tool: row
1619                    .get::<_, Option<String>>(0)?
1620                    .unwrap_or_else(|| "unknown".into()),
1621                status: row.get(1)?,
1622                lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1623                tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1624                tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1625                reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1626                cost_usd_e6: row.get(6)?,
1627                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1628            })
1629        })?;
1630        Ok(rows.filter_map(|row| row.ok()).collect())
1631    }
1632
1633    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1634        let mut stmt = self.conn.prepare(
1635            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1636                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1637             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1638        )?;
1639        let rows = stmt.query_map(params![session_id], |row| {
1640            let paths_json: String = row.get(12)?;
1641            Ok(ToolSpanSyncRow {
1642                span_id: row.get(0)?,
1643                session_id: row.get(1)?,
1644                tool: row.get(2)?,
1645                tool_call_id: row.get(3)?,
1646                status: row.get(4)?,
1647                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1648                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1649                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1650                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1651                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1652                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1653                cost_usd_e6: row.get(11)?,
1654                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1655            })
1656        })?;
1657        Ok(rows.filter_map(|row| row.ok()).collect())
1658    }
1659
1660    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1661        self.conn.execute(
1662            "INSERT OR REPLACE INTO session_evals
1663             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1664             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1665            rusqlite::params![
1666                eval.id,
1667                eval.session_id,
1668                eval.judge_model,
1669                eval.rubric_id,
1670                eval.score,
1671                eval.rationale,
1672                eval.flagged as i64,
1673                eval.created_at_ms as i64,
1674            ],
1675        )?;
1676        Ok(())
1677    }
1678
1679    pub fn list_evals_in_window(
1680        &self,
1681        start_ms: u64,
1682        end_ms: u64,
1683    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1684        let mut stmt = self.conn.prepare(
1685            "SELECT id, session_id, judge_model, rubric_id, score,
1686                    rationale, flagged, created_at_ms
1687             FROM session_evals
1688             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1689             ORDER BY created_at_ms ASC",
1690        )?;
1691        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1692            Ok(crate::eval::types::EvalRow {
1693                id: r.get(0)?,
1694                session_id: r.get(1)?,
1695                judge_model: r.get(2)?,
1696                rubric_id: r.get(3)?,
1697                score: r.get(4)?,
1698                rationale: r.get(5)?,
1699                flagged: r.get::<_, i64>(6)? != 0,
1700                created_at_ms: r.get::<_, i64>(7)? as u64,
1701            })
1702        })?;
1703        rows.collect()
1704    }
1705
1706    pub fn list_evals_for_session(
1707        &self,
1708        session_id: &str,
1709    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1710        let mut stmt = self.conn.prepare(
1711            "SELECT id, session_id, judge_model, rubric_id, score,
1712                    rationale, flagged, created_at_ms
1713             FROM session_evals
1714             WHERE session_id = ?1
1715             ORDER BY created_at_ms DESC",
1716        )?;
1717        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1718            Ok(crate::eval::types::EvalRow {
1719                id: r.get(0)?,
1720                session_id: r.get(1)?,
1721                judge_model: r.get(2)?,
1722                rubric_id: r.get(3)?,
1723                score: r.get(4)?,
1724                rationale: r.get(5)?,
1725                flagged: r.get::<_, i64>(6)? != 0,
1726                created_at_ms: r.get::<_, i64>(7)? as u64,
1727            })
1728        })?;
1729        rows.collect()
1730    }
1731
1732    pub fn list_sessions_for_eval(
1733        &self,
1734        since_ms: u64,
1735        min_cost_usd: f64,
1736    ) -> Result<Vec<crate::core::event::SessionRecord>> {
1737        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
1738        let mut stmt = self.conn.prepare(
1739            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1740                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
1741                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint
1742             FROM sessions s
1743             WHERE s.started_at_ms >= ?1
1744               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
1745               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
1746             ORDER BY s.started_at_ms DESC",
1747        )?;
1748        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
1749            Ok((
1750                r.get::<_, String>(0)?,
1751                r.get::<_, String>(1)?,
1752                r.get::<_, Option<String>>(2)?,
1753                r.get::<_, String>(3)?,
1754                r.get::<_, i64>(4)?,
1755                r.get::<_, Option<i64>>(5)?,
1756                r.get::<_, String>(6)?,
1757                r.get::<_, String>(7)?,
1758                r.get::<_, Option<String>>(8)?,
1759                r.get::<_, Option<String>>(9)?,
1760                r.get::<_, Option<String>>(10)?,
1761                r.get::<_, Option<i64>>(11)?,
1762                r.get::<_, Option<i64>>(12)?,
1763                r.get::<_, Option<String>>(13)?,
1764                r.get::<_, Option<String>>(14)?,
1765            ))
1766        })?;
1767        let mut out = Vec::new();
1768        for row in rows {
1769            let (
1770                id,
1771                agent,
1772                model,
1773                workspace,
1774                started,
1775                ended,
1776                status_str,
1777                trace,
1778                start_commit,
1779                end_commit,
1780                branch,
1781                dirty_start,
1782                dirty_end,
1783                source,
1784                prompt_fingerprint,
1785            ) = row?;
1786            out.push(crate::core::event::SessionRecord {
1787                id,
1788                agent,
1789                model,
1790                workspace,
1791                started_at_ms: started as u64,
1792                ended_at_ms: ended.map(|v| v as u64),
1793                status: status_from_str(&status_str),
1794                trace_path: trace,
1795                start_commit,
1796                end_commit,
1797                branch,
1798                dirty_start: dirty_start.map(i64_to_bool),
1799                dirty_end: dirty_end.map(i64_to_bool),
1800                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
1801                prompt_fingerprint,
1802            });
1803        }
1804        Ok(out)
1805    }
1806
1807    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
1808        self.conn.execute(
1809            "INSERT OR IGNORE INTO prompt_snapshots
1810             (fingerprint, captured_at_ms, files_json, total_bytes)
1811             VALUES (?1, ?2, ?3, ?4)",
1812            params![
1813                snap.fingerprint,
1814                snap.captured_at_ms as i64,
1815                snap.files_json,
1816                snap.total_bytes as i64
1817            ],
1818        )?;
1819        Ok(())
1820    }
1821
1822    pub fn get_prompt_snapshot(
1823        &self,
1824        fingerprint: &str,
1825    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
1826        self.conn
1827            .query_row(
1828                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1829                 FROM prompt_snapshots WHERE fingerprint = ?1",
1830                params![fingerprint],
1831                |r| {
1832                    Ok(crate::prompt::PromptSnapshot {
1833                        fingerprint: r.get(0)?,
1834                        captured_at_ms: r.get::<_, i64>(1)? as u64,
1835                        files_json: r.get(2)?,
1836                        total_bytes: r.get::<_, i64>(3)? as u64,
1837                    })
1838                },
1839            )
1840            .optional()
1841            .map_err(Into::into)
1842    }
1843
1844    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
1845        let mut stmt = self.conn.prepare(
1846            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1847             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
1848        )?;
1849        let rows = stmt.query_map([], |r| {
1850            Ok(crate::prompt::PromptSnapshot {
1851                fingerprint: r.get(0)?,
1852                captured_at_ms: r.get::<_, i64>(1)? as u64,
1853                files_json: r.get(2)?,
1854                total_bytes: r.get::<_, i64>(3)? as u64,
1855            })
1856        })?;
1857        Ok(rows.filter_map(|r| r.ok()).collect())
1858    }
1859
1860    /// Sessions with a non-null prompt_fingerprint in the given window.
1861    pub fn sessions_with_prompt_fingerprint(
1862        &self,
1863        workspace: &str,
1864        start_ms: u64,
1865        end_ms: u64,
1866    ) -> Result<Vec<(String, String)>> {
1867        let mut stmt = self.conn.prepare(
1868            "SELECT id, prompt_fingerprint FROM sessions
1869             WHERE workspace = ?1
1870               AND started_at_ms >= ?2 AND started_at_ms < ?3
1871               AND prompt_fingerprint IS NOT NULL",
1872        )?;
1873        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
1874            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1875        })?;
1876        Ok(rows.filter_map(|r| r.ok()).collect())
1877    }
1878}
1879
1880fn now_ms() -> u64 {
1881    std::time::SystemTime::now()
1882        .duration_since(std::time::UNIX_EPOCH)
1883        .unwrap_or_default()
1884        .as_millis() as u64
1885}
1886
1887fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1888    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1889}
1890
1891fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1892    let cost: i64 = conn.query_row(
1893        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1894        params![workspace], |r| r.get(0),
1895    )?;
1896    let with_cost: i64 = conn.query_row(
1897        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1898        params![workspace], |r| r.get(0),
1899    )?;
1900    Ok((cost, with_cost as u64))
1901}
1902
1903fn day_label(day_idx: u64) -> &'static str {
1904    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
1905}
1906
1907fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
1908    let week_ago = now.saturating_sub(7 * 86_400_000);
1909    let mut stmt = conn
1910        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
1911    let days: Vec<u64> = stmt
1912        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
1913        .filter_map(|r| r.ok())
1914        .map(|v| v as u64 / 86_400_000)
1915        .collect();
1916    let today = now / 86_400_000;
1917    Ok((0u64..7)
1918        .map(|i| {
1919            let d = today.saturating_sub(6 - i);
1920            (
1921                day_label(d).to_string(),
1922                days.iter().filter(|&&x| x == d).count() as u64,
1923            )
1924        })
1925        .collect())
1926}
1927
1928fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
1929    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
1930               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
1931               s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
1932               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
1933               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
1934    let mut stmt = conn.prepare(sql)?;
1935    let out: Vec<(SessionRecord, u64)> = stmt
1936        .query_map(params![workspace], |r| {
1937            let st: String = r.get(6)?;
1938            Ok((
1939                SessionRecord {
1940                    id: r.get(0)?,
1941                    agent: r.get(1)?,
1942                    model: r.get(2)?,
1943                    workspace: r.get(3)?,
1944                    started_at_ms: r.get::<_, i64>(4)? as u64,
1945                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1946                    status: status_from_str(&st),
1947                    trace_path: r.get(7)?,
1948                    start_commit: r.get(8)?,
1949                    end_commit: r.get(9)?,
1950                    branch: r.get(10)?,
1951                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
1952                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
1953                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
1954                    prompt_fingerprint: None,
1955                },
1956                r.get::<_, i64>(14)? as u64,
1957            ))
1958        })?
1959        .filter_map(|r| r.ok())
1960        .collect();
1961    Ok(out)
1962}
1963
1964fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
1965    let mut stmt = conn.prepare(
1966        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
1967         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
1968    )?;
1969    let out: Vec<(String, u64)> = stmt
1970        .query_map(params![workspace], |r| {
1971            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1972        })?
1973        .filter_map(|r| r.ok())
1974        .collect();
1975    Ok(out)
1976}
1977
1978fn status_from_str(s: &str) -> SessionStatus {
1979    match s {
1980        "Running" => SessionStatus::Running,
1981        "Waiting" => SessionStatus::Waiting,
1982        "Idle" => SessionStatus::Idle,
1983        _ => SessionStatus::Done,
1984    }
1985}
1986
1987fn kind_from_str(s: &str) -> EventKind {
1988    match s {
1989        "ToolCall" => EventKind::ToolCall,
1990        "ToolResult" => EventKind::ToolResult,
1991        "Message" => EventKind::Message,
1992        "Error" => EventKind::Error,
1993        "Cost" => EventKind::Cost,
1994        _ => EventKind::Hook,
1995    }
1996}
1997
1998fn source_from_str(s: &str) -> EventSource {
1999    match s {
2000        "Tail" => EventSource::Tail,
2001        "Hook" => EventSource::Hook,
2002        _ => EventSource::Proxy,
2003    }
2004}
2005
2006fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2007    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2008    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2009    ensure_column(conn, "sessions", "branch", "TEXT")?;
2010    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2011    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2012    ensure_column(
2013        conn,
2014        "sessions",
2015        "repo_binding_source",
2016        "TEXT NOT NULL DEFAULT ''",
2017    )?;
2018    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2019    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2020    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2021    ensure_column(
2022        conn,
2023        "sync_outbox",
2024        "kind",
2025        "TEXT NOT NULL DEFAULT 'events'",
2026    )?;
2027    ensure_column(
2028        conn,
2029        "experiments",
2030        "state",
2031        "TEXT NOT NULL DEFAULT 'Draft'",
2032    )?;
2033    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2034    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2035    Ok(())
2036}
2037
2038fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2039    if has_column(conn, table, column)? {
2040        return Ok(());
2041    }
2042    conn.execute(
2043        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2044        [],
2045    )?;
2046    Ok(())
2047}
2048
2049fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2050    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2051    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2052    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2053}
2054
2055fn bool_to_i64(v: bool) -> i64 {
2056    if v { 1 } else { 0 }
2057}
2058
2059fn i64_to_bool(v: i64) -> bool {
2060    v != 0
2061}
2062
2063fn empty_to_none(s: String) -> Option<String> {
2064    if s.is_empty() { None } else { Some(s) }
2065}
2066
2067#[cfg(test)]
2068mod tests {
2069    use super::*;
2070    use serde_json::json;
2071    use std::collections::HashSet;
2072    use tempfile::TempDir;
2073
2074    fn make_session(id: &str) -> SessionRecord {
2075        SessionRecord {
2076            id: id.to_string(),
2077            agent: "cursor".to_string(),
2078            model: None,
2079            workspace: "/ws".to_string(),
2080            started_at_ms: 1000,
2081            ended_at_ms: None,
2082            status: SessionStatus::Done,
2083            trace_path: "/trace".to_string(),
2084            start_commit: None,
2085            end_commit: None,
2086            branch: None,
2087            dirty_start: None,
2088            dirty_end: None,
2089            repo_binding_source: None,
2090            prompt_fingerprint: None,
2091        }
2092    }
2093
2094    fn make_event(session_id: &str, seq: u64) -> Event {
2095        Event {
2096            session_id: session_id.to_string(),
2097            seq,
2098            ts_ms: 1000 + seq * 100,
2099            ts_exact: false,
2100            kind: EventKind::ToolCall,
2101            source: EventSource::Tail,
2102            tool: Some("read_file".to_string()),
2103            tool_call_id: Some(format!("call_{seq}")),
2104            tokens_in: None,
2105            tokens_out: None,
2106            reasoning_tokens: None,
2107            cost_usd_e6: None,
2108            payload: json!({}),
2109        }
2110    }
2111
2112    #[test]
2113    fn open_and_wal_mode() {
2114        let dir = TempDir::new().unwrap();
2115        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2116        let mode: String = store
2117            .conn
2118            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2119            .unwrap();
2120        assert_eq!(mode, "wal");
2121    }
2122
2123    #[test]
2124    fn upsert_and_get_session() {
2125        let dir = TempDir::new().unwrap();
2126        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2127        let s = make_session("s1");
2128        store.upsert_session(&s).unwrap();
2129
2130        let got = store.get_session("s1").unwrap().unwrap();
2131        assert_eq!(got.id, "s1");
2132        assert_eq!(got.status, SessionStatus::Done);
2133    }
2134
2135    #[test]
2136    fn append_and_list_events_round_trip() {
2137        let dir = TempDir::new().unwrap();
2138        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2139        let s = make_session("s2");
2140        store.upsert_session(&s).unwrap();
2141        store.append_event(&make_event("s2", 0)).unwrap();
2142        store.append_event(&make_event("s2", 1)).unwrap();
2143
2144        let sessions = store.list_sessions("/ws").unwrap();
2145        assert_eq!(sessions.len(), 1);
2146        assert_eq!(sessions[0].id, "s2");
2147    }
2148
2149    #[test]
2150    fn summary_stats_empty() {
2151        let dir = TempDir::new().unwrap();
2152        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2153        let stats = store.summary_stats("/ws").unwrap();
2154        assert_eq!(stats.session_count, 0);
2155        assert_eq!(stats.total_cost_usd_e6, 0);
2156    }
2157
2158    #[test]
2159    fn summary_stats_counts_sessions() {
2160        let dir = TempDir::new().unwrap();
2161        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2162        store.upsert_session(&make_session("a")).unwrap();
2163        store.upsert_session(&make_session("b")).unwrap();
2164        let stats = store.summary_stats("/ws").unwrap();
2165        assert_eq!(stats.session_count, 2);
2166        assert_eq!(stats.by_agent.len(), 1);
2167        assert_eq!(stats.by_agent[0].0, "cursor");
2168        assert_eq!(stats.by_agent[0].1, 2);
2169    }
2170
2171    #[test]
2172    fn list_events_for_session_round_trip() {
2173        let dir = TempDir::new().unwrap();
2174        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2175        store.upsert_session(&make_session("s4")).unwrap();
2176        store.append_event(&make_event("s4", 0)).unwrap();
2177        store.append_event(&make_event("s4", 1)).unwrap();
2178        let events = store.list_events_for_session("s4").unwrap();
2179        assert_eq!(events.len(), 2);
2180        assert_eq!(events[0].seq, 0);
2181        assert_eq!(events[1].seq, 1);
2182    }
2183
2184    #[test]
2185    fn append_event_dedup() {
2186        let dir = TempDir::new().unwrap();
2187        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2188        store.upsert_session(&make_session("s5")).unwrap();
2189        store.append_event(&make_event("s5", 0)).unwrap();
2190        // Duplicate — should be silently ignored
2191        store.append_event(&make_event("s5", 0)).unwrap();
2192        let events = store.list_events_for_session("s5").unwrap();
2193        assert_eq!(events.len(), 1);
2194    }
2195
2196    #[test]
2197    fn upsert_idempotent() {
2198        let dir = TempDir::new().unwrap();
2199        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2200        let mut s = make_session("s3");
2201        store.upsert_session(&s).unwrap();
2202        s.status = SessionStatus::Running;
2203        store.upsert_session(&s).unwrap();
2204
2205        let got = store.get_session("s3").unwrap().unwrap();
2206        assert_eq!(got.status, SessionStatus::Running);
2207    }
2208
2209    #[test]
2210    fn append_event_indexes_path_from_payload() {
2211        let dir = TempDir::new().unwrap();
2212        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2213        store.upsert_session(&make_session("sx")).unwrap();
2214        let mut ev = make_event("sx", 0);
2215        ev.payload = json!({"input": {"path": "src/lib.rs"}});
2216        store.append_event(&ev).unwrap();
2217        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2218        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2219    }
2220
2221    #[test]
2222    fn update_session_status_changes_status() {
2223        let dir = TempDir::new().unwrap();
2224        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2225        store.upsert_session(&make_session("s6")).unwrap();
2226        store
2227            .update_session_status("s6", SessionStatus::Running)
2228            .unwrap();
2229        let got = store.get_session("s6").unwrap().unwrap();
2230        assert_eq!(got.status, SessionStatus::Running);
2231    }
2232
2233    #[test]
2234    fn prune_sessions_removes_old_rows_and_keeps_recent() {
2235        let dir = TempDir::new().unwrap();
2236        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2237        let mut old = make_session("old");
2238        old.started_at_ms = 1_000;
2239        let mut new = make_session("new");
2240        new.started_at_ms = 9_000_000_000_000;
2241        store.upsert_session(&old).unwrap();
2242        store.upsert_session(&new).unwrap();
2243        store.append_event(&make_event("old", 0)).unwrap();
2244
2245        let stats = store.prune_sessions_started_before(5_000).unwrap();
2246        assert_eq!(
2247            stats,
2248            PruneStats {
2249                sessions_removed: 1,
2250                events_removed: 1,
2251            }
2252        );
2253        assert!(store.get_session("old").unwrap().is_none());
2254        assert!(store.get_session("new").unwrap().is_some());
2255        let sessions = store.list_sessions("/ws").unwrap();
2256        assert_eq!(sessions.len(), 1);
2257        assert_eq!(sessions[0].id, "new");
2258    }
2259
2260    #[test]
2261    fn append_event_indexes_rules_from_payload() {
2262        let dir = TempDir::new().unwrap();
2263        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2264        store.upsert_session(&make_session("sr")).unwrap();
2265        let mut ev = make_event("sr", 0);
2266        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2267        store.append_event(&ev).unwrap();
2268        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2269        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2270    }
2271
2272    #[test]
2273    fn guidance_report_counts_skill_and_rule_sessions() {
2274        let dir = TempDir::new().unwrap();
2275        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2276        store.upsert_session(&make_session("sx")).unwrap();
2277        let mut ev = make_event("sx", 0);
2278        ev.payload =
2279            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2280        ev.cost_usd_e6 = Some(500_000);
2281        store.append_event(&ev).unwrap();
2282
2283        let mut skill_slugs = HashSet::new();
2284        skill_slugs.insert("tdd".into());
2285        let mut rule_slugs = HashSet::new();
2286        rule_slugs.insert("style".into());
2287
2288        let rep = store
2289            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2290            .unwrap();
2291        assert_eq!(rep.sessions_in_window, 1);
2292        let tdd = rep
2293            .rows
2294            .iter()
2295            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2296            .unwrap();
2297        assert_eq!(tdd.sessions, 1);
2298        assert!(tdd.on_disk);
2299        let style = rep
2300            .rows
2301            .iter()
2302            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2303            .unwrap();
2304        assert_eq!(style.sessions, 1);
2305        assert!(style.on_disk);
2306    }
2307
2308    #[test]
2309    fn prune_sessions_removes_rules_used_rows() {
2310        let dir = TempDir::new().unwrap();
2311        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2312        let mut old = make_session("old_r");
2313        old.started_at_ms = 1_000;
2314        store.upsert_session(&old).unwrap();
2315        let mut ev = make_event("old_r", 0);
2316        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2317        store.append_event(&ev).unwrap();
2318
2319        store.prune_sessions_started_before(5_000).unwrap();
2320        let n: i64 = store
2321            .conn
2322            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2323            .unwrap();
2324        assert_eq!(n, 0);
2325    }
2326}