Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
19/// Rows below this use `sessions.started_at_ms` for time-window matching.
20const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23    "CREATE TABLE IF NOT EXISTS sessions (
24        id TEXT PRIMARY KEY,
25        agent TEXT NOT NULL,
26        model TEXT,
27        workspace TEXT NOT NULL,
28        started_at_ms INTEGER NOT NULL,
29        ended_at_ms INTEGER,
30        status TEXT NOT NULL,
31        trace_path TEXT NOT NULL
32    )",
33    "CREATE TABLE IF NOT EXISTS events (
34        id INTEGER PRIMARY KEY AUTOINCREMENT,
35        session_id TEXT NOT NULL,
36        seq INTEGER NOT NULL,
37        ts_ms INTEGER NOT NULL,
38        kind TEXT NOT NULL,
39        source TEXT NOT NULL,
40        tool TEXT,
41        tokens_in INTEGER,
42        tokens_out INTEGER,
43        cost_usd_e6 INTEGER,
44        payload TEXT NOT NULL
45    )",
46    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47    "CREATE TABLE IF NOT EXISTS files_touched (
48        id INTEGER PRIMARY KEY AUTOINCREMENT,
49        session_id TEXT NOT NULL,
50        path TEXT NOT NULL
51    )",
52    "CREATE TABLE IF NOT EXISTS skills_used (
53        id INTEGER PRIMARY KEY AUTOINCREMENT,
54        session_id TEXT NOT NULL,
55        skill TEXT NOT NULL
56    )",
57    "CREATE TABLE IF NOT EXISTS sync_outbox (
58        id INTEGER PRIMARY KEY AUTOINCREMENT,
59        session_id TEXT NOT NULL,
60        payload TEXT NOT NULL,
61        sent INTEGER NOT NULL DEFAULT 0
62    )",
63    "CREATE TABLE IF NOT EXISTS experiments (
64        id TEXT PRIMARY KEY,
65        name TEXT NOT NULL,
66        created_at_ms INTEGER NOT NULL,
67        metadata TEXT NOT NULL DEFAULT '{}'
68    )",
69    "CREATE TABLE IF NOT EXISTS experiment_tags (
70        experiment_id TEXT NOT NULL,
71        session_id TEXT NOT NULL,
72        variant TEXT NOT NULL,
73        PRIMARY KEY (experiment_id, session_id)
74    )",
75    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76    "CREATE TABLE IF NOT EXISTS sync_state (
77        k TEXT PRIMARY KEY,
78        v TEXT NOT NULL
79    )",
80    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82    "CREATE TABLE IF NOT EXISTS tool_spans (
83        span_id TEXT PRIMARY KEY,
84        session_id TEXT NOT NULL,
85        tool TEXT,
86        tool_call_id TEXT,
87        status TEXT NOT NULL,
88        started_at_ms INTEGER,
89        ended_at_ms INTEGER,
90        lead_time_ms INTEGER,
91        tokens_in INTEGER,
92        tokens_out INTEGER,
93        reasoning_tokens INTEGER,
94        cost_usd_e6 INTEGER,
95        paths_json TEXT NOT NULL DEFAULT '[]'
96    )",
97    "CREATE TABLE IF NOT EXISTS tool_span_paths (
98        span_id TEXT NOT NULL,
99        path TEXT NOT NULL,
100        PRIMARY KEY (span_id, path)
101    )",
102    "CREATE TABLE IF NOT EXISTS session_repo_binding (
103        session_id TEXT PRIMARY KEY,
104        start_commit TEXT,
105        end_commit TEXT,
106        branch TEXT,
107        dirty_start INTEGER,
108        dirty_end INTEGER,
109        repo_binding_source TEXT NOT NULL DEFAULT ''
110    )",
111    "CREATE TABLE IF NOT EXISTS repo_snapshots (
112        id TEXT PRIMARY KEY,
113        workspace TEXT NOT NULL,
114        head_commit TEXT,
115        dirty_fingerprint TEXT NOT NULL,
116        analyzer_version TEXT NOT NULL,
117        indexed_at_ms INTEGER NOT NULL,
118        dirty INTEGER NOT NULL DEFAULT 0,
119        graph_path TEXT NOT NULL
120    )",
121    "CREATE TABLE IF NOT EXISTS file_facts (
122        snapshot_id TEXT NOT NULL,
123        path TEXT NOT NULL,
124        language TEXT NOT NULL,
125        bytes INTEGER NOT NULL,
126        loc INTEGER NOT NULL,
127        sloc INTEGER NOT NULL,
128        complexity_total INTEGER NOT NULL,
129        max_fn_complexity INTEGER NOT NULL,
130        symbol_count INTEGER NOT NULL,
131        import_count INTEGER NOT NULL,
132        fan_in INTEGER NOT NULL,
133        fan_out INTEGER NOT NULL,
134        churn_30d INTEGER NOT NULL,
135        churn_90d INTEGER NOT NULL,
136        authors_90d INTEGER NOT NULL,
137        last_changed_ms INTEGER,
138        PRIMARY KEY (snapshot_id, path)
139    )",
140    "CREATE TABLE IF NOT EXISTS repo_edges (
141        snapshot_id TEXT NOT NULL,
142        from_id TEXT NOT NULL,
143        to_id TEXT NOT NULL,
144        kind TEXT NOT NULL,
145        weight INTEGER NOT NULL,
146        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147    )",
148    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
149    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
151    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152    "CREATE TABLE IF NOT EXISTS rules_used (
153        id INTEGER PRIMARY KEY AUTOINCREMENT,
154        session_id TEXT NOT NULL,
155        rule TEXT NOT NULL
156    )",
157    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
159    "CREATE TABLE IF NOT EXISTS remote_pull_state (
160        id INTEGER PRIMARY KEY CHECK (id = 1),
161        query_provider TEXT NOT NULL DEFAULT 'none',
162        cursor_json TEXT NOT NULL DEFAULT '',
163        last_success_ms INTEGER
164    )",
165    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166    "CREATE TABLE IF NOT EXISTS remote_sessions (
167        team_id TEXT NOT NULL,
168        workspace_hash TEXT NOT NULL,
169        session_id_hash TEXT NOT NULL,
170        json TEXT NOT NULL,
171        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172    )",
173    "CREATE TABLE IF NOT EXISTS remote_events (
174        team_id TEXT NOT NULL,
175        workspace_hash TEXT NOT NULL,
176        session_id_hash TEXT NOT NULL,
177        event_seq INTEGER NOT NULL,
178        json TEXT NOT NULL,
179        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180    )",
181    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182        team_id TEXT NOT NULL,
183        workspace_hash TEXT NOT NULL,
184        span_id_hash TEXT NOT NULL,
185        json TEXT NOT NULL,
186        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187    )",
188    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189        team_id TEXT NOT NULL,
190        workspace_hash TEXT NOT NULL,
191        snapshot_id_hash TEXT NOT NULL,
192        chunk_index INTEGER NOT NULL,
193        json TEXT NOT NULL,
194        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195    )",
196    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197        team_id TEXT NOT NULL,
198        workspace_hash TEXT NOT NULL,
199        fact_key TEXT NOT NULL,
200        json TEXT NOT NULL,
201        PRIMARY KEY (team_id, workspace_hash, fact_key)
202    )",
203    "CREATE TABLE IF NOT EXISTS session_evals (
204        id            TEXT    PRIMARY KEY,
205        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206        judge_model   TEXT    NOT NULL,
207        rubric_id     TEXT    NOT NULL,
208        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209        rationale     TEXT    NOT NULL,
210        flagged       INTEGER NOT NULL DEFAULT 0,
211        created_at_ms INTEGER NOT NULL
212    );
213    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
215    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
216        fingerprint   TEXT    PRIMARY KEY,
217        captured_at_ms INTEGER NOT NULL,
218        files_json    TEXT    NOT NULL,
219        total_bytes   INTEGER NOT NULL
220    )",
221    "CREATE TABLE IF NOT EXISTS session_feedback (
222        id TEXT PRIMARY KEY,
223        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
224        score INTEGER CHECK(score BETWEEN 1 AND 5),
225        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
226        note TEXT,
227        created_at_ms INTEGER NOT NULL
228    );
229    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
230    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
231];
232
233/// Per-workspace activity dashboard stats.
234#[derive(Clone)]
235pub struct InsightsStats {
236    pub total_sessions: u64,
237    pub running_sessions: u64,
238    pub total_events: u64,
239    /// (day label e.g. "Mon", count) last 7 days oldest first
240    pub sessions_by_day: Vec<(String, u64)>,
241    /// Recent sessions DESC by started_at, max 3; paired with event count
242    pub recent: Vec<(SessionRecord, u64)>,
243    /// Top tools by event count, max 5
244    pub top_tools: Vec<(String, u64)>,
245    pub total_cost_usd_e6: i64,
246    pub sessions_with_cost: u64,
247}
248
249/// Sync daemon / outbox status for `kaizen sync status`.
250pub struct SyncStatusSnapshot {
251    pub pending_outbox: u64,
252    pub last_success_ms: Option<u64>,
253    pub last_error: Option<String>,
254    pub consecutive_failures: u32,
255}
256
257/// Aggregate stats across sessions + events for a workspace.
258#[derive(serde::Serialize)]
259pub struct SummaryStats {
260    pub session_count: u64,
261    pub total_cost_usd_e6: i64,
262    pub by_agent: Vec<(String, u64)>,
263    pub by_model: Vec<(String, u64)>,
264    pub top_tools: Vec<(String, u64)>,
265}
266
267/// Skill vs Cursor rule for [`GuidancePerfRow`].
268#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
269#[serde(rename_all = "lowercase")]
270pub enum GuidanceKind {
271    Skill,
272    Rule,
273}
274
275/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
276#[derive(Clone, Debug, serde::Serialize)]
277pub struct GuidancePerfRow {
278    pub kind: GuidanceKind,
279    pub id: String,
280    pub sessions: u64,
281    pub sessions_pct: f64,
282    pub total_cost_usd_e6: i64,
283    pub avg_cost_per_session_usd: Option<f64>,
284    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
285    pub on_disk: bool,
286}
287
288/// Aggregated skill/rule adoption and cost proxy for a time window.
289#[derive(Clone, Debug, serde::Serialize)]
290pub struct GuidanceReport {
291    pub workspace: String,
292    pub window_start_ms: u64,
293    pub window_end_ms: u64,
294    pub sessions_in_window: u64,
295    pub workspace_avg_cost_per_session_usd: Option<f64>,
296    pub rows: Vec<GuidancePerfRow>,
297}
298
299/// Result of [`Store::prune_sessions_started_before`].
300#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
301pub struct PruneStats {
302    pub sessions_removed: u64,
303    pub events_removed: u64,
304}
305
306/// `sync_state` keys for agent rescan throttling and auto-prune.
307pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
308pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
309
310pub struct ToolSpanSyncRow {
311    pub span_id: String,
312    pub session_id: String,
313    pub tool: Option<String>,
314    pub tool_call_id: Option<String>,
315    pub status: String,
316    pub started_at_ms: Option<u64>,
317    pub ended_at_ms: Option<u64>,
318    pub lead_time_ms: Option<u64>,
319    pub tokens_in: Option<u32>,
320    pub tokens_out: Option<u32>,
321    pub reasoning_tokens: Option<u32>,
322    pub cost_usd_e6: Option<i64>,
323    pub paths: Vec<String>,
324}
325
326pub struct Store {
327    conn: Connection,
328}
329
330impl Store {
331    pub(crate) fn conn(&self) -> &Connection {
332        &self.conn
333    }
334
335    pub fn open(path: &Path) -> Result<Self> {
336        if let Some(parent) = path.parent() {
337            std::fs::create_dir_all(parent)?;
338        }
339        let conn =
340            Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
341        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
342        for sql in MIGRATIONS {
343            conn.execute_batch(sql)?;
344        }
345        ensure_schema_columns(&conn)?;
346        Ok(Self { conn })
347    }
348
349    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
350        self.conn.execute(
351            "INSERT INTO sessions (
352                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
353                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
354                prompt_fingerprint
355             )
356             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
357             ON CONFLICT(id) DO UPDATE SET
358               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
359               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
360               status=excluded.status, trace_path=excluded.trace_path,
361               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
362               branch=excluded.branch, dirty_start=excluded.dirty_start,
363               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
364               prompt_fingerprint=excluded.prompt_fingerprint",
365            params![
366                s.id,
367                s.agent,
368                s.model,
369                s.workspace,
370                s.started_at_ms as i64,
371                s.ended_at_ms.map(|v| v as i64),
372                format!("{:?}", s.status),
373                s.trace_path,
374                s.start_commit,
375                s.end_commit,
376                s.branch,
377                s.dirty_start.map(bool_to_i64),
378                s.dirty_end.map(bool_to_i64),
379                s.repo_binding_source.clone().unwrap_or_default(),
380                s.prompt_fingerprint.as_deref(),
381            ],
382        )?;
383        self.conn.execute(
384            "INSERT INTO session_repo_binding (
385                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
386             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
387             ON CONFLICT(session_id) DO UPDATE SET
388                start_commit=excluded.start_commit,
389                end_commit=excluded.end_commit,
390                branch=excluded.branch,
391                dirty_start=excluded.dirty_start,
392                dirty_end=excluded.dirty_end,
393                repo_binding_source=excluded.repo_binding_source",
394            params![
395                s.id,
396                s.start_commit,
397                s.end_commit,
398                s.branch,
399                s.dirty_start.map(bool_to_i64),
400                s.dirty_end.map(bool_to_i64),
401                s.repo_binding_source.clone().unwrap_or_default(),
402            ],
403        )?;
404        Ok(())
405    }
406
407    /// Insert a minimal session row if none exists. Used by hook ingestion when
408    /// the first observed event is not `SessionStart` (hooks installed mid-session).
409    pub fn ensure_session_stub(
410        &self,
411        id: &str,
412        agent: &str,
413        workspace: &str,
414        started_at_ms: u64,
415    ) -> Result<()> {
416        self.conn.execute(
417            "INSERT OR IGNORE INTO sessions (
418                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
419                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
420             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
421            params![id, agent, workspace, started_at_ms as i64],
422        )?;
423        Ok(())
424    }
425
426    /// Next `seq` for a new event in this session (0 when there are no events yet).
427    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
428        let n: i64 = self.conn.query_row(
429            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
430            [session_id],
431            |r| r.get(0),
432        )?;
433        Ok(n as u64)
434    }
435
436    pub fn append_event(&self, e: &Event) -> Result<()> {
437        self.append_event_with_sync(e, None)
438    }
439
440    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
441    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
442        let payload = serde_json::to_string(&e.payload)?;
443        self.conn.execute(
444            "INSERT INTO events (
445                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
446                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
447             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
448             ON CONFLICT(session_id, seq) DO UPDATE SET
449                ts_ms = excluded.ts_ms,
450                ts_exact = excluded.ts_exact,
451                kind = excluded.kind,
452                source = excluded.source,
453                tool = excluded.tool,
454                tool_call_id = excluded.tool_call_id,
455                tokens_in = excluded.tokens_in,
456                tokens_out = excluded.tokens_out,
457                reasoning_tokens = excluded.reasoning_tokens,
458                cost_usd_e6 = excluded.cost_usd_e6,
459                payload = excluded.payload",
460            params![
461                e.session_id,
462                e.seq as i64,
463                e.ts_ms as i64,
464                bool_to_i64(e.ts_exact),
465                format!("{:?}", e.kind),
466                format!("{:?}", e.source),
467                e.tool,
468                e.tool_call_id,
469                e.tokens_in.map(|v| v as i64),
470                e.tokens_out.map(|v| v as i64),
471                e.reasoning_tokens.map(|v| v as i64),
472                e.cost_usd_e6,
473                payload,
474            ],
475        )?;
476        if self.conn.changes() == 0 {
477            return Ok(());
478        }
479        index_event_derived(&self.conn, e)?;
480        rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
481        let Some(ctx) = ctx else {
482            return Ok(());
483        };
484        let sync = &ctx.sync;
485        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
486            return Ok(());
487        }
488        let Some(salt) = try_team_salt(sync) else {
489            tracing::warn!(
490                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
491            );
492            return Ok(());
493        };
494        if sync.sample_rate < 1.0 {
495            let u: f64 = rand::random();
496            if u > sync.sample_rate {
497                return Ok(());
498            }
499        }
500        let Some(session) = self.get_session(&e.session_id)? else {
501            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
502            return Ok(());
503        };
504        let mut outbound = outbound_event_from_row(e, &session, &salt);
505        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
506        let row = serde_json::to_string(&outbound)?;
507        self.conn.execute(
508            "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
509            params![e.session_id, row],
510        )?;
511        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
512        Ok(())
513    }
514
515    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
516        let mut stmt = self.conn.prepare(
517            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
518        )?;
519        let rows = stmt.query_map(params![limit as i64], |row| {
520            Ok((
521                row.get::<_, i64>(0)?,
522                row.get::<_, String>(1)?,
523                row.get::<_, String>(2)?,
524            ))
525        })?;
526        let mut out = Vec::new();
527        for r in rows {
528            out.push(r?);
529        }
530        Ok(out)
531    }
532
533    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
534        for id in ids {
535            self.conn
536                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
537        }
538        Ok(())
539    }
540
541    pub fn replace_outbox_rows(
542        &self,
543        owner_id: &str,
544        kind: &str,
545        payloads: &[String],
546    ) -> Result<()> {
547        self.conn.execute(
548            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
549            params![owner_id, kind],
550        )?;
551        for payload in payloads {
552            self.conn.execute(
553                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
554                params![owner_id, kind, payload],
555            )?;
556        }
557        Ok(())
558    }
559
560    pub fn outbox_pending_count(&self) -> Result<u64> {
561        let c: i64 =
562            self.conn
563                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
564                    r.get(0)
565                })?;
566        Ok(c as u64)
567    }
568
569    pub fn set_sync_state_ok(&self) -> Result<()> {
570        let now = now_ms().to_string();
571        self.conn.execute(
572            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
573             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
574            params![now],
575        )?;
576        self.conn.execute(
577            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
578             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
579            [],
580        )?;
581        self.conn
582            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
583        Ok(())
584    }
585
586    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
587        let prev: i64 = self
588            .conn
589            .query_row(
590                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
591                [],
592                |r| {
593                    let s: String = r.get(0)?;
594                    Ok(s.parse::<i64>().unwrap_or(0))
595                },
596            )
597            .optional()?
598            .unwrap_or(0);
599        let next = prev.saturating_add(1);
600        self.conn.execute(
601            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
602             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
603            params![msg],
604        )?;
605        self.conn.execute(
606            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
607             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
608            params![next.to_string()],
609        )?;
610        Ok(())
611    }
612
613    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
614        let pending_outbox = self.outbox_pending_count()?;
615        let last_success_ms = self
616            .conn
617            .query_row(
618                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
619                [],
620                |r| r.get::<_, String>(0),
621            )
622            .optional()?
623            .and_then(|s| s.parse().ok());
624        let last_error = self
625            .conn
626            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
627                r.get::<_, String>(0)
628            })
629            .optional()?;
630        let consecutive_failures = self
631            .conn
632            .query_row(
633                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
634                [],
635                |r| r.get::<_, String>(0),
636            )
637            .optional()?
638            .and_then(|s| s.parse().ok())
639            .unwrap_or(0);
640        Ok(SyncStatusSnapshot {
641            pending_outbox,
642            last_success_ms,
643            last_error,
644            consecutive_failures,
645        })
646    }
647
648    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
649        let row: Option<String> = self
650            .conn
651            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
652                r.get::<_, String>(0)
653            })
654            .optional()?;
655        Ok(row.and_then(|s| s.parse().ok()))
656    }
657
658    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
659        self.conn.execute(
660            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
661             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
662            params![key, v.to_string()],
663        )?;
664        Ok(())
665    }
666
667    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
668    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
669        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
670        let sessions_to_remove: i64 = tx.query_row(
671            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
672            params![cutoff_ms],
673            |r| r.get(0),
674        )?;
675        let events_to_remove: i64 = tx.query_row(
676            "SELECT COUNT(*) FROM events WHERE session_id IN \
677             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
678            params![cutoff_ms],
679            |r| r.get(0),
680        )?;
681
682        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
683        tx.execute(
684            &format!(
685                "DELETE FROM tool_span_paths WHERE span_id IN \
686                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
687            ),
688            params![cutoff_ms],
689        )?;
690        tx.execute(
691            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
692            params![cutoff_ms],
693        )?;
694        tx.execute(
695            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
696            params![cutoff_ms],
697        )?;
698        tx.execute(
699            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
700            params![cutoff_ms],
701        )?;
702        tx.execute(
703            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
704            params![cutoff_ms],
705        )?;
706        tx.execute(
707            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
708            params![cutoff_ms],
709        )?;
710        tx.execute(
711            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
712            params![cutoff_ms],
713        )?;
714        tx.execute(
715            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
716            params![cutoff_ms],
717        )?;
718        tx.execute(
719            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
720            params![cutoff_ms],
721        )?;
722        tx.execute(
723            "DELETE FROM sessions WHERE started_at_ms < ?1",
724            params![cutoff_ms],
725        )?;
726        tx.commit()?;
727        Ok(PruneStats {
728            sessions_removed: sessions_to_remove as u64,
729            events_removed: events_to_remove as u64,
730        })
731    }
732
733    /// Reclaim file space after large deletes (exclusive lock; can be slow).
734    pub fn vacuum(&self) -> Result<()> {
735        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
736        Ok(())
737    }
738
739    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
740        let mut stmt = self.conn.prepare(
741            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
742                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
743                    prompt_fingerprint
744             FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
745        )?;
746        let rows = stmt.query_map(params![workspace], |row| {
747            Ok((
748                row.get::<_, String>(0)?,
749                row.get::<_, String>(1)?,
750                row.get::<_, Option<String>>(2)?,
751                row.get::<_, String>(3)?,
752                row.get::<_, i64>(4)?,
753                row.get::<_, Option<i64>>(5)?,
754                row.get::<_, String>(6)?,
755                row.get::<_, String>(7)?,
756                row.get::<_, Option<String>>(8)?,
757                row.get::<_, Option<String>>(9)?,
758                row.get::<_, Option<String>>(10)?,
759                row.get::<_, Option<i64>>(11)?,
760                row.get::<_, Option<i64>>(12)?,
761                row.get::<_, String>(13)?,
762                row.get::<_, Option<String>>(14)?,
763            ))
764        })?;
765
766        let mut out = Vec::new();
767        for row in rows {
768            let (
769                id,
770                agent,
771                model,
772                workspace,
773                started,
774                ended,
775                status_str,
776                trace,
777                start_commit,
778                end_commit,
779                branch,
780                dirty_start,
781                dirty_end,
782                source,
783                prompt_fingerprint,
784            ) = row?;
785            out.push(SessionRecord {
786                id,
787                agent,
788                model,
789                workspace,
790                started_at_ms: started as u64,
791                ended_at_ms: ended.map(|v| v as u64),
792                status: status_from_str(&status_str),
793                trace_path: trace,
794                start_commit,
795                end_commit,
796                branch,
797                dirty_start: dirty_start.map(i64_to_bool),
798                dirty_end: dirty_end.map(i64_to_bool),
799                repo_binding_source: empty_to_none(source),
800                prompt_fingerprint,
801            });
802        }
803        Ok(out)
804    }
805
806    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
807        let session_count: i64 = self.conn.query_row(
808            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
809            params![workspace],
810            |r| r.get(0),
811        )?;
812
813        let total_cost: i64 = self.conn.query_row(
814            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
815             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
816            params![workspace],
817            |r| r.get(0),
818        )?;
819
820        let mut stmt = self.conn.prepare(
821            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
822        )?;
823        let by_agent: Vec<(String, u64)> = stmt
824            .query_map(params![workspace], |r| {
825                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
826            })?
827            .filter_map(|r| r.ok())
828            .collect();
829
830        let mut stmt = self.conn.prepare(
831            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
832        )?;
833        let by_model: Vec<(String, u64)> = stmt
834            .query_map(params![workspace], |r| {
835                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
836            })?
837            .filter_map(|r| r.ok())
838            .collect();
839
840        let mut stmt = self.conn.prepare(
841            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
842             WHERE s.workspace = ?1 AND tool IS NOT NULL
843             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
844        )?;
845        let top_tools: Vec<(String, u64)> = stmt
846            .query_map(params![workspace], |r| {
847                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
848            })?
849            .filter_map(|r| r.ok())
850            .collect();
851
852        Ok(SummaryStats {
853            session_count: session_count as u64,
854            total_cost_usd_e6: total_cost,
855            by_agent,
856            by_model,
857            top_tools,
858        })
859    }
860
861    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
862        let mut stmt = self.conn.prepare(
863            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
864                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
865             FROM events WHERE session_id = ?1 ORDER BY seq ASC",
866        )?;
867        let rows = stmt.query_map(params![session_id], |row| {
868            Ok((
869                row.get::<_, String>(0)?,
870                row.get::<_, i64>(1)?,
871                row.get::<_, i64>(2)?,
872                row.get::<_, i64>(3)?,
873                row.get::<_, String>(4)?,
874                row.get::<_, String>(5)?,
875                row.get::<_, Option<String>>(6)?,
876                row.get::<_, Option<String>>(7)?,
877                row.get::<_, Option<i64>>(8)?,
878                row.get::<_, Option<i64>>(9)?,
879                row.get::<_, Option<i64>>(10)?,
880                row.get::<_, Option<i64>>(11)?,
881                row.get::<_, String>(12)?,
882            ))
883        })?;
884
885        let mut events = Vec::new();
886        for row in rows {
887            let (
888                sid,
889                seq,
890                ts_ms,
891                ts_exact,
892                kind_str,
893                source_str,
894                tool,
895                tool_call_id,
896                tokens_in,
897                tokens_out,
898                reasoning_tokens,
899                cost_usd_e6,
900                payload_str,
901            ) = row?;
902            events.push(Event {
903                session_id: sid,
904                seq: seq as u64,
905                ts_ms: ts_ms as u64,
906                ts_exact: ts_exact != 0,
907                kind: kind_from_str(&kind_str),
908                source: source_from_str(&source_str),
909                tool,
910                tool_call_id,
911                tokens_in: tokens_in.map(|v| v as u32),
912                tokens_out: tokens_out.map(|v| v as u32),
913                reasoning_tokens: reasoning_tokens.map(|v| v as u32),
914                cost_usd_e6,
915                payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
916            });
917        }
918        Ok(events)
919    }
920
921    /// Update only status for existing session.
922    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
923        self.conn.execute(
924            "UPDATE sessions SET status = ?1 WHERE id = ?2",
925            params![format!("{:?}", status), id],
926        )?;
927        Ok(())
928    }
929
930    /// Workspace activity dashboard — feeds `cmd_insights`.
931    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
932        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
933        Ok(InsightsStats {
934            total_sessions: count_q(
935                &self.conn,
936                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
937                workspace,
938            )?,
939            running_sessions: count_q(
940                &self.conn,
941                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
942                workspace,
943            )?,
944            total_events: count_q(
945                &self.conn,
946                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
947                workspace,
948            )?,
949            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
950            recent: recent_sessions_3(&self.conn, workspace)?,
951            top_tools: top_tools_5(&self.conn, workspace)?,
952            total_cost_usd_e6,
953            sessions_with_cost,
954        })
955    }
956
957    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
958    pub fn retro_events_in_window(
959        &self,
960        workspace: &str,
961        start_ms: u64,
962        end_ms: u64,
963    ) -> Result<Vec<(SessionRecord, Event)>> {
964        let mut stmt = self.conn.prepare(
965            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
966                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
967                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
968                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
969             FROM events e
970             JOIN sessions s ON s.id = e.session_id
971             WHERE s.workspace = ?1
972               AND (
973                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
974                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
975               )
976             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
977        )?;
978        let rows = stmt.query_map(
979            params![
980                workspace,
981                start_ms as i64,
982                end_ms as i64,
983                SYNTHETIC_TS_CEILING_MS,
984            ],
985            |row| {
986                let payload_str: String = row.get(12)?;
987                let status_str: String = row.get(19)?;
988                Ok((
989                    SessionRecord {
990                        id: row.get(13)?,
991                        agent: row.get(14)?,
992                        model: row.get(15)?,
993                        workspace: row.get(16)?,
994                        started_at_ms: row.get::<_, i64>(17)? as u64,
995                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
996                        status: status_from_str(&status_str),
997                        trace_path: row.get(20)?,
998                        start_commit: row.get(21)?,
999                        end_commit: row.get(22)?,
1000                        branch: row.get(23)?,
1001                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1002                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1003                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1004                        prompt_fingerprint: None,
1005                    },
1006                    Event {
1007                        session_id: row.get(0)?,
1008                        seq: row.get::<_, i64>(1)? as u64,
1009                        ts_ms: row.get::<_, i64>(2)? as u64,
1010                        ts_exact: row.get::<_, i64>(3)? != 0,
1011                        kind: kind_from_str(&row.get::<_, String>(4)?),
1012                        source: source_from_str(&row.get::<_, String>(5)?),
1013                        tool: row.get(6)?,
1014                        tool_call_id: row.get(7)?,
1015                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1016                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1017                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1018                        cost_usd_e6: row.get(11)?,
1019                        payload: serde_json::from_str(&payload_str)
1020                            .unwrap_or(serde_json::Value::Null),
1021                    },
1022                ))
1023            },
1024        )?;
1025
1026        let mut out = Vec::new();
1027        for r in rows {
1028            out.push(r?);
1029        }
1030        Ok(out)
1031    }
1032
1033    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1034    pub fn files_touched_in_window(
1035        &self,
1036        workspace: &str,
1037        start_ms: u64,
1038        end_ms: u64,
1039    ) -> Result<Vec<(String, String)>> {
1040        let mut stmt = self.conn.prepare(
1041            "SELECT DISTINCT ft.session_id, ft.path
1042             FROM files_touched ft
1043             JOIN sessions s ON s.id = ft.session_id
1044             WHERE s.workspace = ?1
1045               AND EXISTS (
1046                 SELECT 1 FROM events e
1047                 JOIN sessions ss ON ss.id = e.session_id
1048                 WHERE e.session_id = ft.session_id
1049                   AND (
1050                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1051                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1052                   )
1053               )
1054             ORDER BY ft.session_id, ft.path",
1055        )?;
1056        let out: Vec<(String, String)> = stmt
1057            .query_map(
1058                params![
1059                    workspace,
1060                    start_ms as i64,
1061                    end_ms as i64,
1062                    SYNTHETIC_TS_CEILING_MS,
1063                ],
1064                |r| Ok((r.get(0)?, r.get(1)?)),
1065            )?
1066            .filter_map(|r| r.ok())
1067            .collect();
1068        Ok(out)
1069    }
1070
1071    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1072    /// (any session with an indexed skill row; join events optional — use row existence).
1073    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1074        let mut stmt = self.conn.prepare(
1075            "SELECT DISTINCT su.skill
1076             FROM skills_used su
1077             JOIN sessions s ON s.id = su.session_id
1078             WHERE s.workspace = ?1
1079               AND EXISTS (
1080                 SELECT 1 FROM events e
1081                 JOIN sessions ss ON ss.id = e.session_id
1082                 WHERE e.session_id = su.session_id
1083                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1084               )
1085             ORDER BY su.skill",
1086        )?;
1087        let out: Vec<String> = stmt
1088            .query_map(
1089                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1090                |r| r.get::<_, String>(0),
1091            )?
1092            .filter_map(|r| r.ok())
1093            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1094            .collect();
1095        Ok(out)
1096    }
1097
1098    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1099    pub fn skills_used_in_window(
1100        &self,
1101        workspace: &str,
1102        start_ms: u64,
1103        end_ms: u64,
1104    ) -> Result<Vec<(String, String)>> {
1105        let mut stmt = self.conn.prepare(
1106            "SELECT DISTINCT su.session_id, su.skill
1107             FROM skills_used su
1108             JOIN sessions s ON s.id = su.session_id
1109             WHERE s.workspace = ?1
1110               AND EXISTS (
1111                 SELECT 1 FROM events e
1112                 JOIN sessions ss ON ss.id = e.session_id
1113                 WHERE e.session_id = su.session_id
1114                   AND (
1115                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1116                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1117                   )
1118               )
1119             ORDER BY su.session_id, su.skill",
1120        )?;
1121        let out: Vec<(String, String)> = stmt
1122            .query_map(
1123                params![
1124                    workspace,
1125                    start_ms as i64,
1126                    end_ms as i64,
1127                    SYNTHETIC_TS_CEILING_MS,
1128                ],
1129                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1130            )?
1131            .filter_map(|r| r.ok())
1132            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1133            .collect();
1134        Ok(out)
1135    }
1136
1137    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1138    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1139        let mut stmt = self.conn.prepare(
1140            "SELECT DISTINCT ru.rule
1141             FROM rules_used ru
1142             JOIN sessions s ON s.id = ru.session_id
1143             WHERE s.workspace = ?1
1144               AND EXISTS (
1145                 SELECT 1 FROM events e
1146                 JOIN sessions ss ON ss.id = e.session_id
1147                 WHERE e.session_id = ru.session_id
1148                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1149               )
1150             ORDER BY ru.rule",
1151        )?;
1152        let out: Vec<String> = stmt
1153            .query_map(
1154                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1155                |r| r.get::<_, String>(0),
1156            )?
1157            .filter_map(|r| r.ok())
1158            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1159            .collect();
1160        Ok(out)
1161    }
1162
1163    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1164    pub fn rules_used_in_window(
1165        &self,
1166        workspace: &str,
1167        start_ms: u64,
1168        end_ms: u64,
1169    ) -> Result<Vec<(String, String)>> {
1170        let mut stmt = self.conn.prepare(
1171            "SELECT DISTINCT ru.session_id, ru.rule
1172             FROM rules_used ru
1173             JOIN sessions s ON s.id = ru.session_id
1174             WHERE s.workspace = ?1
1175               AND EXISTS (
1176                 SELECT 1 FROM events e
1177                 JOIN sessions ss ON ss.id = e.session_id
1178                 WHERE e.session_id = ru.session_id
1179                   AND (
1180                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1181                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1182                   )
1183               )
1184             ORDER BY ru.session_id, ru.rule",
1185        )?;
1186        let out: Vec<(String, String)> = stmt
1187            .query_map(
1188                params![
1189                    workspace,
1190                    start_ms as i64,
1191                    end_ms as i64,
1192                    SYNTHETIC_TS_CEILING_MS,
1193                ],
1194                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1195            )?
1196            .filter_map(|r| r.ok())
1197            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1198            .collect();
1199        Ok(out)
1200    }
1201
1202    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1203    pub fn sessions_active_in_window(
1204        &self,
1205        workspace: &str,
1206        start_ms: u64,
1207        end_ms: u64,
1208    ) -> Result<HashSet<String>> {
1209        let mut stmt = self.conn.prepare(
1210            "SELECT DISTINCT s.id
1211             FROM sessions s
1212             WHERE s.workspace = ?1
1213               AND EXISTS (
1214                 SELECT 1 FROM events e
1215                 WHERE e.session_id = s.id
1216                   AND (
1217                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1218                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1219                   )
1220               )",
1221        )?;
1222        let out: HashSet<String> = stmt
1223            .query_map(
1224                params![
1225                    workspace,
1226                    start_ms as i64,
1227                    end_ms as i64,
1228                    SYNTHETIC_TS_CEILING_MS,
1229                ],
1230                |r| r.get(0),
1231            )?
1232            .filter_map(|r| r.ok())
1233            .collect();
1234        Ok(out)
1235    }
1236
1237    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1238    pub fn session_costs_usd_e6_in_window(
1239        &self,
1240        workspace: &str,
1241        start_ms: u64,
1242        end_ms: u64,
1243    ) -> Result<HashMap<String, i64>> {
1244        let mut stmt = self.conn.prepare(
1245            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1246             FROM events e
1247             JOIN sessions s ON s.id = e.session_id
1248             WHERE s.workspace = ?1
1249               AND (
1250                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1251                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1252               )
1253             GROUP BY e.session_id",
1254        )?;
1255        let rows: Vec<(String, i64)> = stmt
1256            .query_map(
1257                params![
1258                    workspace,
1259                    start_ms as i64,
1260                    end_ms as i64,
1261                    SYNTHETIC_TS_CEILING_MS,
1262                ],
1263                |r| Ok((r.get(0)?, r.get(1)?)),
1264            )?
1265            .filter_map(|r| r.ok())
1266            .collect();
1267        Ok(rows.into_iter().collect())
1268    }
1269
1270    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1271    pub fn guidance_report(
1272        &self,
1273        workspace: &str,
1274        window_start_ms: u64,
1275        window_end_ms: u64,
1276        skill_slugs_on_disk: &HashSet<String>,
1277        rule_slugs_on_disk: &HashSet<String>,
1278    ) -> Result<GuidanceReport> {
1279        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1280        let denom = active.len() as u64;
1281        let costs =
1282            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1283
1284        let workspace_avg_cost_per_session_usd = if denom > 0 {
1285            let total_e6: i64 = active
1286                .iter()
1287                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1288                .sum();
1289            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1290        } else {
1291            None
1292        };
1293
1294        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1295        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1296            skill_sessions.entry(skill).or_default().insert(sid);
1297        }
1298        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1299        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1300            rule_sessions.entry(rule).or_default().insert(sid);
1301        }
1302
1303        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1304
1305        let mut push_row =
1306            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1307                let sessions = sids.len() as u64;
1308                let sessions_pct = if denom > 0 {
1309                    sessions as f64 * 100.0 / denom as f64
1310                } else {
1311                    0.0
1312                };
1313                let total_cost_usd_e6: i64 = sids
1314                    .iter()
1315                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1316                    .sum();
1317                let avg_cost_per_session_usd = if sessions > 0 {
1318                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1319                } else {
1320                    None
1321                };
1322                let vs_workspace_avg_cost_per_session_usd =
1323                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1324                        (Some(avg), Some(w)) => Some(avg - w),
1325                        _ => None,
1326                    };
1327                rows.push(GuidancePerfRow {
1328                    kind,
1329                    id,
1330                    sessions,
1331                    sessions_pct,
1332                    total_cost_usd_e6,
1333                    avg_cost_per_session_usd,
1334                    vs_workspace_avg_cost_per_session_usd,
1335                    on_disk,
1336                });
1337            };
1338
1339        let mut seen_skills: HashSet<String> = HashSet::new();
1340        for (id, sids) in &skill_sessions {
1341            seen_skills.insert(id.clone());
1342            push_row(
1343                GuidanceKind::Skill,
1344                id.clone(),
1345                sids,
1346                skill_slugs_on_disk.contains(id),
1347            );
1348        }
1349        for slug in skill_slugs_on_disk {
1350            if seen_skills.contains(slug) {
1351                continue;
1352            }
1353            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1354        }
1355
1356        let mut seen_rules: HashSet<String> = HashSet::new();
1357        for (id, sids) in &rule_sessions {
1358            seen_rules.insert(id.clone());
1359            push_row(
1360                GuidanceKind::Rule,
1361                id.clone(),
1362                sids,
1363                rule_slugs_on_disk.contains(id),
1364            );
1365        }
1366        for slug in rule_slugs_on_disk {
1367            if seen_rules.contains(slug) {
1368                continue;
1369            }
1370            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1371        }
1372
1373        rows.sort_by(|a, b| {
1374            b.sessions
1375                .cmp(&a.sessions)
1376                .then_with(|| a.kind.cmp(&b.kind))
1377                .then_with(|| a.id.cmp(&b.id))
1378        });
1379
1380        Ok(GuidanceReport {
1381            workspace: workspace.to_string(),
1382            window_start_ms,
1383            window_end_ms,
1384            sessions_in_window: denom,
1385            workspace_avg_cost_per_session_usd,
1386            rows,
1387        })
1388    }
1389
1390    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1391        let mut stmt = self.conn.prepare(
1392            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1393                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1394                    prompt_fingerprint
1395             FROM sessions WHERE id = ?1",
1396        )?;
1397        let mut rows = stmt.query_map(params![id], |row| {
1398            Ok((
1399                row.get::<_, String>(0)?,
1400                row.get::<_, String>(1)?,
1401                row.get::<_, Option<String>>(2)?,
1402                row.get::<_, String>(3)?,
1403                row.get::<_, i64>(4)?,
1404                row.get::<_, Option<i64>>(5)?,
1405                row.get::<_, String>(6)?,
1406                row.get::<_, String>(7)?,
1407                row.get::<_, Option<String>>(8)?,
1408                row.get::<_, Option<String>>(9)?,
1409                row.get::<_, Option<String>>(10)?,
1410                row.get::<_, Option<i64>>(11)?,
1411                row.get::<_, Option<i64>>(12)?,
1412                row.get::<_, String>(13)?,
1413                row.get::<_, Option<String>>(14)?,
1414            ))
1415        })?;
1416
1417        if let Some(row) = rows.next() {
1418            let (
1419                id,
1420                agent,
1421                model,
1422                workspace,
1423                started,
1424                ended,
1425                status_str,
1426                trace,
1427                start_commit,
1428                end_commit,
1429                branch,
1430                dirty_start,
1431                dirty_end,
1432                source,
1433                prompt_fingerprint,
1434            ) = row?;
1435            Ok(Some(SessionRecord {
1436                id,
1437                agent,
1438                model,
1439                workspace,
1440                started_at_ms: started as u64,
1441                ended_at_ms: ended.map(|v| v as u64),
1442                status: status_from_str(&status_str),
1443                trace_path: trace,
1444                start_commit,
1445                end_commit,
1446                branch,
1447                dirty_start: dirty_start.map(i64_to_bool),
1448                dirty_end: dirty_end.map(i64_to_bool),
1449                repo_binding_source: empty_to_none(source),
1450                prompt_fingerprint,
1451            }))
1452        } else {
1453            Ok(None)
1454        }
1455    }
1456
1457    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1458        let mut stmt = self.conn.prepare(
1459            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1460                    indexed_at_ms, dirty, graph_path
1461             FROM repo_snapshots WHERE workspace = ?1
1462             ORDER BY indexed_at_ms DESC LIMIT 1",
1463        )?;
1464        let mut rows = stmt.query_map(params![workspace], |row| {
1465            Ok(RepoSnapshotRecord {
1466                id: row.get(0)?,
1467                workspace: row.get(1)?,
1468                head_commit: row.get(2)?,
1469                dirty_fingerprint: row.get(3)?,
1470                analyzer_version: row.get(4)?,
1471                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1472                dirty: row.get::<_, i64>(6)? != 0,
1473                graph_path: row.get(7)?,
1474            })
1475        })?;
1476        Ok(rows.next().transpose()?)
1477    }
1478
1479    pub fn save_repo_snapshot(
1480        &self,
1481        snapshot: &RepoSnapshotRecord,
1482        facts: &[FileFact],
1483        edges: &[RepoEdge],
1484    ) -> Result<()> {
1485        self.conn.execute(
1486            "INSERT INTO repo_snapshots (
1487                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1488                indexed_at_ms, dirty, graph_path
1489             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1490             ON CONFLICT(id) DO UPDATE SET
1491                workspace=excluded.workspace,
1492                head_commit=excluded.head_commit,
1493                dirty_fingerprint=excluded.dirty_fingerprint,
1494                analyzer_version=excluded.analyzer_version,
1495                indexed_at_ms=excluded.indexed_at_ms,
1496                dirty=excluded.dirty,
1497                graph_path=excluded.graph_path",
1498            params![
1499                snapshot.id,
1500                snapshot.workspace,
1501                snapshot.head_commit,
1502                snapshot.dirty_fingerprint,
1503                snapshot.analyzer_version,
1504                snapshot.indexed_at_ms as i64,
1505                bool_to_i64(snapshot.dirty),
1506                snapshot.graph_path,
1507            ],
1508        )?;
1509        self.conn.execute(
1510            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1511            params![snapshot.id],
1512        )?;
1513        self.conn.execute(
1514            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1515            params![snapshot.id],
1516        )?;
1517        for fact in facts {
1518            self.conn.execute(
1519                "INSERT INTO file_facts (
1520                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1521                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1522                    churn_30d, churn_90d, authors_90d, last_changed_ms
1523                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1524                params![
1525                    fact.snapshot_id,
1526                    fact.path,
1527                    fact.language,
1528                    fact.bytes as i64,
1529                    fact.loc as i64,
1530                    fact.sloc as i64,
1531                    fact.complexity_total as i64,
1532                    fact.max_fn_complexity as i64,
1533                    fact.symbol_count as i64,
1534                    fact.import_count as i64,
1535                    fact.fan_in as i64,
1536                    fact.fan_out as i64,
1537                    fact.churn_30d as i64,
1538                    fact.churn_90d as i64,
1539                    fact.authors_90d as i64,
1540                    fact.last_changed_ms.map(|v| v as i64),
1541                ],
1542            )?;
1543        }
1544        for edge in edges {
1545            self.conn.execute(
1546                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1547                 VALUES (?1, ?2, ?3, ?4, ?5)
1548                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1549                 DO UPDATE SET weight = weight + excluded.weight",
1550                params![
1551                    snapshot.id,
1552                    edge.from_path,
1553                    edge.to_path,
1554                    edge.kind,
1555                    edge.weight as i64,
1556                ],
1557            )?;
1558        }
1559        Ok(())
1560    }
1561
1562    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1563        let mut stmt = self.conn.prepare(
1564            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1565                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1566                    churn_30d, churn_90d, authors_90d, last_changed_ms
1567             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1568        )?;
1569        let rows = stmt.query_map(params![snapshot_id], |row| {
1570            Ok(FileFact {
1571                snapshot_id: row.get(0)?,
1572                path: row.get(1)?,
1573                language: row.get(2)?,
1574                bytes: row.get::<_, i64>(3)? as u64,
1575                loc: row.get::<_, i64>(4)? as u32,
1576                sloc: row.get::<_, i64>(5)? as u32,
1577                complexity_total: row.get::<_, i64>(6)? as u32,
1578                max_fn_complexity: row.get::<_, i64>(7)? as u32,
1579                symbol_count: row.get::<_, i64>(8)? as u32,
1580                import_count: row.get::<_, i64>(9)? as u32,
1581                fan_in: row.get::<_, i64>(10)? as u32,
1582                fan_out: row.get::<_, i64>(11)? as u32,
1583                churn_30d: row.get::<_, i64>(12)? as u32,
1584                churn_90d: row.get::<_, i64>(13)? as u32,
1585                authors_90d: row.get::<_, i64>(14)? as u32,
1586                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1587            })
1588        })?;
1589        Ok(rows.filter_map(|row| row.ok()).collect())
1590    }
1591
1592    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1593        let mut stmt = self.conn.prepare(
1594            "SELECT from_id, to_id, kind, weight
1595             FROM repo_edges WHERE snapshot_id = ?1
1596             ORDER BY kind, from_id, to_id",
1597        )?;
1598        let rows = stmt.query_map(params![snapshot_id], |row| {
1599            Ok(RepoEdge {
1600                from_path: row.get(0)?,
1601                to_path: row.get(1)?,
1602                kind: row.get(2)?,
1603                weight: row.get::<_, i64>(3)? as u32,
1604            })
1605        })?;
1606        Ok(rows.filter_map(|row| row.ok()).collect())
1607    }
1608
1609    pub fn tool_spans_in_window(
1610        &self,
1611        workspace: &str,
1612        start_ms: u64,
1613        end_ms: u64,
1614    ) -> Result<Vec<ToolSpanView>> {
1615        let mut stmt = self.conn.prepare(
1616            "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1617                    ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1618             FROM tool_spans ts
1619             JOIN sessions s ON s.id = ts.session_id
1620             WHERE s.workspace = ?1
1621               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1622               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1623             ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1624        )?;
1625        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1626            let paths_json: String = row.get(7)?;
1627            Ok(ToolSpanView {
1628                tool: row
1629                    .get::<_, Option<String>>(0)?
1630                    .unwrap_or_else(|| "unknown".into()),
1631                status: row.get(1)?,
1632                lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1633                tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1634                tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1635                reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1636                cost_usd_e6: row.get(6)?,
1637                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1638            })
1639        })?;
1640        Ok(rows.filter_map(|row| row.ok()).collect())
1641    }
1642
1643    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1644        let mut stmt = self.conn.prepare(
1645            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1646                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1647             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1648        )?;
1649        let rows = stmt.query_map(params![session_id], |row| {
1650            let paths_json: String = row.get(12)?;
1651            Ok(ToolSpanSyncRow {
1652                span_id: row.get(0)?,
1653                session_id: row.get(1)?,
1654                tool: row.get(2)?,
1655                tool_call_id: row.get(3)?,
1656                status: row.get(4)?,
1657                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1658                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1659                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1660                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1661                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1662                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1663                cost_usd_e6: row.get(11)?,
1664                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1665            })
1666        })?;
1667        Ok(rows.filter_map(|row| row.ok()).collect())
1668    }
1669
1670    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1671        self.conn.execute(
1672            "INSERT OR REPLACE INTO session_evals
1673             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1674             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1675            rusqlite::params![
1676                eval.id,
1677                eval.session_id,
1678                eval.judge_model,
1679                eval.rubric_id,
1680                eval.score,
1681                eval.rationale,
1682                eval.flagged as i64,
1683                eval.created_at_ms as i64,
1684            ],
1685        )?;
1686        Ok(())
1687    }
1688
1689    pub fn list_evals_in_window(
1690        &self,
1691        start_ms: u64,
1692        end_ms: u64,
1693    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1694        let mut stmt = self.conn.prepare(
1695            "SELECT id, session_id, judge_model, rubric_id, score,
1696                    rationale, flagged, created_at_ms
1697             FROM session_evals
1698             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1699             ORDER BY created_at_ms ASC",
1700        )?;
1701        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1702            Ok(crate::eval::types::EvalRow {
1703                id: r.get(0)?,
1704                session_id: r.get(1)?,
1705                judge_model: r.get(2)?,
1706                rubric_id: r.get(3)?,
1707                score: r.get(4)?,
1708                rationale: r.get(5)?,
1709                flagged: r.get::<_, i64>(6)? != 0,
1710                created_at_ms: r.get::<_, i64>(7)? as u64,
1711            })
1712        })?;
1713        rows.collect()
1714    }
1715
1716    pub fn list_evals_for_session(
1717        &self,
1718        session_id: &str,
1719    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1720        let mut stmt = self.conn.prepare(
1721            "SELECT id, session_id, judge_model, rubric_id, score,
1722                    rationale, flagged, created_at_ms
1723             FROM session_evals
1724             WHERE session_id = ?1
1725             ORDER BY created_at_ms DESC",
1726        )?;
1727        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1728            Ok(crate::eval::types::EvalRow {
1729                id: r.get(0)?,
1730                session_id: r.get(1)?,
1731                judge_model: r.get(2)?,
1732                rubric_id: r.get(3)?,
1733                score: r.get(4)?,
1734                rationale: r.get(5)?,
1735                flagged: r.get::<_, i64>(6)? != 0,
1736                created_at_ms: r.get::<_, i64>(7)? as u64,
1737            })
1738        })?;
1739        rows.collect()
1740    }
1741
1742    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
1743        use crate::feedback::types::FeedbackLabel;
1744        self.conn.execute(
1745            "INSERT OR REPLACE INTO session_feedback
1746             (id, session_id, score, label, note, created_at_ms)
1747             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1748            rusqlite::params![
1749                r.id,
1750                r.session_id,
1751                r.score.as_ref().map(|s| s.0 as i64),
1752                r.label.as_ref().map(FeedbackLabel::to_db_str),
1753                r.note,
1754                r.created_at_ms as i64,
1755            ],
1756        )?;
1757        let payload = serde_json::to_string(r).unwrap_or_default();
1758        self.conn.execute(
1759            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
1760             VALUES (?1, 'session_feedback', ?2, 0)",
1761            rusqlite::params![r.session_id, payload],
1762        )?;
1763        Ok(())
1764    }
1765
1766    pub fn list_feedback_in_window(
1767        &self,
1768        start_ms: u64,
1769        end_ms: u64,
1770    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
1771        let mut stmt = self.conn.prepare(
1772            "SELECT id, session_id, score, label, note, created_at_ms
1773             FROM session_feedback
1774             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1775             ORDER BY created_at_ms ASC",
1776        )?;
1777        let rows = stmt.query_map(
1778            rusqlite::params![start_ms as i64, end_ms as i64],
1779            feedback_row,
1780        )?;
1781        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1782    }
1783
1784    pub fn feedback_for_sessions(
1785        &self,
1786        ids: &[String],
1787    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
1788        if ids.is_empty() {
1789            return Ok(std::collections::HashMap::new());
1790        }
1791        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1792        let sql = format!(
1793            "SELECT id, session_id, score, label, note, created_at_ms
1794             FROM session_feedback WHERE session_id IN ({placeholders})
1795             ORDER BY created_at_ms DESC"
1796        );
1797        let mut stmt = self.conn.prepare(&sql)?;
1798        let params: Vec<&dyn rusqlite::ToSql> =
1799            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1800        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
1801        let mut map = std::collections::HashMap::new();
1802        for row in rows {
1803            let r = row?;
1804            map.entry(r.session_id.clone()).or_insert(r);
1805        }
1806        Ok(map)
1807    }
1808
1809    pub fn list_sessions_for_eval(
1810        &self,
1811        since_ms: u64,
1812        min_cost_usd: f64,
1813    ) -> Result<Vec<crate::core::event::SessionRecord>> {
1814        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
1815        let mut stmt = self.conn.prepare(
1816            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1817                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
1818                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint
1819             FROM sessions s
1820             WHERE s.started_at_ms >= ?1
1821               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
1822               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
1823             ORDER BY s.started_at_ms DESC",
1824        )?;
1825        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
1826            Ok((
1827                r.get::<_, String>(0)?,
1828                r.get::<_, String>(1)?,
1829                r.get::<_, Option<String>>(2)?,
1830                r.get::<_, String>(3)?,
1831                r.get::<_, i64>(4)?,
1832                r.get::<_, Option<i64>>(5)?,
1833                r.get::<_, String>(6)?,
1834                r.get::<_, String>(7)?,
1835                r.get::<_, Option<String>>(8)?,
1836                r.get::<_, Option<String>>(9)?,
1837                r.get::<_, Option<String>>(10)?,
1838                r.get::<_, Option<i64>>(11)?,
1839                r.get::<_, Option<i64>>(12)?,
1840                r.get::<_, Option<String>>(13)?,
1841                r.get::<_, Option<String>>(14)?,
1842            ))
1843        })?;
1844        let mut out = Vec::new();
1845        for row in rows {
1846            let (
1847                id,
1848                agent,
1849                model,
1850                workspace,
1851                started,
1852                ended,
1853                status_str,
1854                trace,
1855                start_commit,
1856                end_commit,
1857                branch,
1858                dirty_start,
1859                dirty_end,
1860                source,
1861                prompt_fingerprint,
1862            ) = row?;
1863            out.push(crate::core::event::SessionRecord {
1864                id,
1865                agent,
1866                model,
1867                workspace,
1868                started_at_ms: started as u64,
1869                ended_at_ms: ended.map(|v| v as u64),
1870                status: status_from_str(&status_str),
1871                trace_path: trace,
1872                start_commit,
1873                end_commit,
1874                branch,
1875                dirty_start: dirty_start.map(i64_to_bool),
1876                dirty_end: dirty_end.map(i64_to_bool),
1877                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
1878                prompt_fingerprint,
1879            });
1880        }
1881        Ok(out)
1882    }
1883
1884    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
1885        self.conn.execute(
1886            "INSERT OR IGNORE INTO prompt_snapshots
1887             (fingerprint, captured_at_ms, files_json, total_bytes)
1888             VALUES (?1, ?2, ?3, ?4)",
1889            params![
1890                snap.fingerprint,
1891                snap.captured_at_ms as i64,
1892                snap.files_json,
1893                snap.total_bytes as i64
1894            ],
1895        )?;
1896        Ok(())
1897    }
1898
1899    pub fn get_prompt_snapshot(
1900        &self,
1901        fingerprint: &str,
1902    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
1903        self.conn
1904            .query_row(
1905                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1906                 FROM prompt_snapshots WHERE fingerprint = ?1",
1907                params![fingerprint],
1908                |r| {
1909                    Ok(crate::prompt::PromptSnapshot {
1910                        fingerprint: r.get(0)?,
1911                        captured_at_ms: r.get::<_, i64>(1)? as u64,
1912                        files_json: r.get(2)?,
1913                        total_bytes: r.get::<_, i64>(3)? as u64,
1914                    })
1915                },
1916            )
1917            .optional()
1918            .map_err(Into::into)
1919    }
1920
1921    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
1922        let mut stmt = self.conn.prepare(
1923            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1924             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
1925        )?;
1926        let rows = stmt.query_map([], |r| {
1927            Ok(crate::prompt::PromptSnapshot {
1928                fingerprint: r.get(0)?,
1929                captured_at_ms: r.get::<_, i64>(1)? as u64,
1930                files_json: r.get(2)?,
1931                total_bytes: r.get::<_, i64>(3)? as u64,
1932            })
1933        })?;
1934        Ok(rows.filter_map(|r| r.ok()).collect())
1935    }
1936
1937    /// Sessions with a non-null prompt_fingerprint in the given window.
1938    pub fn sessions_with_prompt_fingerprint(
1939        &self,
1940        workspace: &str,
1941        start_ms: u64,
1942        end_ms: u64,
1943    ) -> Result<Vec<(String, String)>> {
1944        let mut stmt = self.conn.prepare(
1945            "SELECT id, prompt_fingerprint FROM sessions
1946             WHERE workspace = ?1
1947               AND started_at_ms >= ?2 AND started_at_ms < ?3
1948               AND prompt_fingerprint IS NOT NULL",
1949        )?;
1950        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
1951            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1952        })?;
1953        Ok(rows.filter_map(|r| r.ok()).collect())
1954    }
1955}
1956
1957fn now_ms() -> u64 {
1958    std::time::SystemTime::now()
1959        .duration_since(std::time::UNIX_EPOCH)
1960        .unwrap_or_default()
1961        .as_millis() as u64
1962}
1963
1964fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1965    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1966}
1967
1968fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1969    let cost: i64 = conn.query_row(
1970        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1971        params![workspace], |r| r.get(0),
1972    )?;
1973    let with_cost: i64 = conn.query_row(
1974        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1975        params![workspace], |r| r.get(0),
1976    )?;
1977    Ok((cost, with_cost as u64))
1978}
1979
1980fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
1981    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
1982    let score = r
1983        .get::<_, Option<i64>>(2)?
1984        .and_then(|v| FeedbackScore::new(v as u8));
1985    let label = r
1986        .get::<_, Option<String>>(3)?
1987        .and_then(|s| FeedbackLabel::from_str_opt(&s));
1988    Ok(FeedbackRecord {
1989        id: r.get(0)?,
1990        session_id: r.get(1)?,
1991        score,
1992        label,
1993        note: r.get(4)?,
1994        created_at_ms: r.get::<_, i64>(5)? as u64,
1995    })
1996}
1997
1998fn day_label(day_idx: u64) -> &'static str {
1999    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2000}
2001
2002fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2003    let week_ago = now.saturating_sub(7 * 86_400_000);
2004    let mut stmt = conn
2005        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2006    let days: Vec<u64> = stmt
2007        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2008        .filter_map(|r| r.ok())
2009        .map(|v| v as u64 / 86_400_000)
2010        .collect();
2011    let today = now / 86_400_000;
2012    Ok((0u64..7)
2013        .map(|i| {
2014            let d = today.saturating_sub(6 - i);
2015            (
2016                day_label(d).to_string(),
2017                days.iter().filter(|&&x| x == d).count() as u64,
2018            )
2019        })
2020        .collect())
2021}
2022
2023fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2024    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2025               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2026               s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
2027               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2028               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2029    let mut stmt = conn.prepare(sql)?;
2030    let out: Vec<(SessionRecord, u64)> = stmt
2031        .query_map(params![workspace], |r| {
2032            let st: String = r.get(6)?;
2033            Ok((
2034                SessionRecord {
2035                    id: r.get(0)?,
2036                    agent: r.get(1)?,
2037                    model: r.get(2)?,
2038                    workspace: r.get(3)?,
2039                    started_at_ms: r.get::<_, i64>(4)? as u64,
2040                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2041                    status: status_from_str(&st),
2042                    trace_path: r.get(7)?,
2043                    start_commit: r.get(8)?,
2044                    end_commit: r.get(9)?,
2045                    branch: r.get(10)?,
2046                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2047                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2048                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2049                    prompt_fingerprint: None,
2050                },
2051                r.get::<_, i64>(14)? as u64,
2052            ))
2053        })?
2054        .filter_map(|r| r.ok())
2055        .collect();
2056    Ok(out)
2057}
2058
2059fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2060    let mut stmt = conn.prepare(
2061        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2062         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2063    )?;
2064    let out: Vec<(String, u64)> = stmt
2065        .query_map(params![workspace], |r| {
2066            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2067        })?
2068        .filter_map(|r| r.ok())
2069        .collect();
2070    Ok(out)
2071}
2072
2073fn status_from_str(s: &str) -> SessionStatus {
2074    match s {
2075        "Running" => SessionStatus::Running,
2076        "Waiting" => SessionStatus::Waiting,
2077        "Idle" => SessionStatus::Idle,
2078        _ => SessionStatus::Done,
2079    }
2080}
2081
2082fn kind_from_str(s: &str) -> EventKind {
2083    match s {
2084        "ToolCall" => EventKind::ToolCall,
2085        "ToolResult" => EventKind::ToolResult,
2086        "Message" => EventKind::Message,
2087        "Error" => EventKind::Error,
2088        "Cost" => EventKind::Cost,
2089        _ => EventKind::Hook,
2090    }
2091}
2092
2093fn source_from_str(s: &str) -> EventSource {
2094    match s {
2095        "Tail" => EventSource::Tail,
2096        "Hook" => EventSource::Hook,
2097        _ => EventSource::Proxy,
2098    }
2099}
2100
2101fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2102    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2103    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2104    ensure_column(conn, "sessions", "branch", "TEXT")?;
2105    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2106    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2107    ensure_column(
2108        conn,
2109        "sessions",
2110        "repo_binding_source",
2111        "TEXT NOT NULL DEFAULT ''",
2112    )?;
2113    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2114    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2115    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2116    ensure_column(
2117        conn,
2118        "sync_outbox",
2119        "kind",
2120        "TEXT NOT NULL DEFAULT 'events'",
2121    )?;
2122    ensure_column(
2123        conn,
2124        "experiments",
2125        "state",
2126        "TEXT NOT NULL DEFAULT 'Draft'",
2127    )?;
2128    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2129    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2130    Ok(())
2131}
2132
2133fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2134    if has_column(conn, table, column)? {
2135        return Ok(());
2136    }
2137    conn.execute(
2138        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2139        [],
2140    )?;
2141    Ok(())
2142}
2143
2144fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2145    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2146    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2147    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2148}
2149
2150fn bool_to_i64(v: bool) -> i64 {
2151    if v { 1 } else { 0 }
2152}
2153
2154fn i64_to_bool(v: i64) -> bool {
2155    v != 0
2156}
2157
2158fn empty_to_none(s: String) -> Option<String> {
2159    if s.is_empty() { None } else { Some(s) }
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164    use super::*;
2165    use serde_json::json;
2166    use std::collections::HashSet;
2167    use tempfile::TempDir;
2168
2169    fn make_session(id: &str) -> SessionRecord {
2170        SessionRecord {
2171            id: id.to_string(),
2172            agent: "cursor".to_string(),
2173            model: None,
2174            workspace: "/ws".to_string(),
2175            started_at_ms: 1000,
2176            ended_at_ms: None,
2177            status: SessionStatus::Done,
2178            trace_path: "/trace".to_string(),
2179            start_commit: None,
2180            end_commit: None,
2181            branch: None,
2182            dirty_start: None,
2183            dirty_end: None,
2184            repo_binding_source: None,
2185            prompt_fingerprint: None,
2186        }
2187    }
2188
2189    fn make_event(session_id: &str, seq: u64) -> Event {
2190        Event {
2191            session_id: session_id.to_string(),
2192            seq,
2193            ts_ms: 1000 + seq * 100,
2194            ts_exact: false,
2195            kind: EventKind::ToolCall,
2196            source: EventSource::Tail,
2197            tool: Some("read_file".to_string()),
2198            tool_call_id: Some(format!("call_{seq}")),
2199            tokens_in: None,
2200            tokens_out: None,
2201            reasoning_tokens: None,
2202            cost_usd_e6: None,
2203            payload: json!({}),
2204        }
2205    }
2206
2207    #[test]
2208    fn open_and_wal_mode() {
2209        let dir = TempDir::new().unwrap();
2210        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2211        let mode: String = store
2212            .conn
2213            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2214            .unwrap();
2215        assert_eq!(mode, "wal");
2216    }
2217
2218    #[test]
2219    fn upsert_and_get_session() {
2220        let dir = TempDir::new().unwrap();
2221        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2222        let s = make_session("s1");
2223        store.upsert_session(&s).unwrap();
2224
2225        let got = store.get_session("s1").unwrap().unwrap();
2226        assert_eq!(got.id, "s1");
2227        assert_eq!(got.status, SessionStatus::Done);
2228    }
2229
2230    #[test]
2231    fn append_and_list_events_round_trip() {
2232        let dir = TempDir::new().unwrap();
2233        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2234        let s = make_session("s2");
2235        store.upsert_session(&s).unwrap();
2236        store.append_event(&make_event("s2", 0)).unwrap();
2237        store.append_event(&make_event("s2", 1)).unwrap();
2238
2239        let sessions = store.list_sessions("/ws").unwrap();
2240        assert_eq!(sessions.len(), 1);
2241        assert_eq!(sessions[0].id, "s2");
2242    }
2243
2244    #[test]
2245    fn summary_stats_empty() {
2246        let dir = TempDir::new().unwrap();
2247        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2248        let stats = store.summary_stats("/ws").unwrap();
2249        assert_eq!(stats.session_count, 0);
2250        assert_eq!(stats.total_cost_usd_e6, 0);
2251    }
2252
2253    #[test]
2254    fn summary_stats_counts_sessions() {
2255        let dir = TempDir::new().unwrap();
2256        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2257        store.upsert_session(&make_session("a")).unwrap();
2258        store.upsert_session(&make_session("b")).unwrap();
2259        let stats = store.summary_stats("/ws").unwrap();
2260        assert_eq!(stats.session_count, 2);
2261        assert_eq!(stats.by_agent.len(), 1);
2262        assert_eq!(stats.by_agent[0].0, "cursor");
2263        assert_eq!(stats.by_agent[0].1, 2);
2264    }
2265
2266    #[test]
2267    fn list_events_for_session_round_trip() {
2268        let dir = TempDir::new().unwrap();
2269        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2270        store.upsert_session(&make_session("s4")).unwrap();
2271        store.append_event(&make_event("s4", 0)).unwrap();
2272        store.append_event(&make_event("s4", 1)).unwrap();
2273        let events = store.list_events_for_session("s4").unwrap();
2274        assert_eq!(events.len(), 2);
2275        assert_eq!(events[0].seq, 0);
2276        assert_eq!(events[1].seq, 1);
2277    }
2278
2279    #[test]
2280    fn append_event_dedup() {
2281        let dir = TempDir::new().unwrap();
2282        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2283        store.upsert_session(&make_session("s5")).unwrap();
2284        store.append_event(&make_event("s5", 0)).unwrap();
2285        // Duplicate — should be silently ignored
2286        store.append_event(&make_event("s5", 0)).unwrap();
2287        let events = store.list_events_for_session("s5").unwrap();
2288        assert_eq!(events.len(), 1);
2289    }
2290
2291    #[test]
2292    fn upsert_idempotent() {
2293        let dir = TempDir::new().unwrap();
2294        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2295        let mut s = make_session("s3");
2296        store.upsert_session(&s).unwrap();
2297        s.status = SessionStatus::Running;
2298        store.upsert_session(&s).unwrap();
2299
2300        let got = store.get_session("s3").unwrap().unwrap();
2301        assert_eq!(got.status, SessionStatus::Running);
2302    }
2303
2304    #[test]
2305    fn append_event_indexes_path_from_payload() {
2306        let dir = TempDir::new().unwrap();
2307        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2308        store.upsert_session(&make_session("sx")).unwrap();
2309        let mut ev = make_event("sx", 0);
2310        ev.payload = json!({"input": {"path": "src/lib.rs"}});
2311        store.append_event(&ev).unwrap();
2312        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2313        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2314    }
2315
2316    #[test]
2317    fn update_session_status_changes_status() {
2318        let dir = TempDir::new().unwrap();
2319        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2320        store.upsert_session(&make_session("s6")).unwrap();
2321        store
2322            .update_session_status("s6", SessionStatus::Running)
2323            .unwrap();
2324        let got = store.get_session("s6").unwrap().unwrap();
2325        assert_eq!(got.status, SessionStatus::Running);
2326    }
2327
2328    #[test]
2329    fn prune_sessions_removes_old_rows_and_keeps_recent() {
2330        let dir = TempDir::new().unwrap();
2331        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2332        let mut old = make_session("old");
2333        old.started_at_ms = 1_000;
2334        let mut new = make_session("new");
2335        new.started_at_ms = 9_000_000_000_000;
2336        store.upsert_session(&old).unwrap();
2337        store.upsert_session(&new).unwrap();
2338        store.append_event(&make_event("old", 0)).unwrap();
2339
2340        let stats = store.prune_sessions_started_before(5_000).unwrap();
2341        assert_eq!(
2342            stats,
2343            PruneStats {
2344                sessions_removed: 1,
2345                events_removed: 1,
2346            }
2347        );
2348        assert!(store.get_session("old").unwrap().is_none());
2349        assert!(store.get_session("new").unwrap().is_some());
2350        let sessions = store.list_sessions("/ws").unwrap();
2351        assert_eq!(sessions.len(), 1);
2352        assert_eq!(sessions[0].id, "new");
2353    }
2354
2355    #[test]
2356    fn append_event_indexes_rules_from_payload() {
2357        let dir = TempDir::new().unwrap();
2358        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2359        store.upsert_session(&make_session("sr")).unwrap();
2360        let mut ev = make_event("sr", 0);
2361        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2362        store.append_event(&ev).unwrap();
2363        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2364        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2365    }
2366
2367    #[test]
2368    fn guidance_report_counts_skill_and_rule_sessions() {
2369        let dir = TempDir::new().unwrap();
2370        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2371        store.upsert_session(&make_session("sx")).unwrap();
2372        let mut ev = make_event("sx", 0);
2373        ev.payload =
2374            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2375        ev.cost_usd_e6 = Some(500_000);
2376        store.append_event(&ev).unwrap();
2377
2378        let mut skill_slugs = HashSet::new();
2379        skill_slugs.insert("tdd".into());
2380        let mut rule_slugs = HashSet::new();
2381        rule_slugs.insert("style".into());
2382
2383        let rep = store
2384            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2385            .unwrap();
2386        assert_eq!(rep.sessions_in_window, 1);
2387        let tdd = rep
2388            .rows
2389            .iter()
2390            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2391            .unwrap();
2392        assert_eq!(tdd.sessions, 1);
2393        assert!(tdd.on_disk);
2394        let style = rep
2395            .rows
2396            .iter()
2397            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2398            .unwrap();
2399        assert_eq!(style.sessions, 1);
2400        assert!(style.on_disk);
2401    }
2402
2403    #[test]
2404    fn prune_sessions_removes_rules_used_rows() {
2405        let dir = TempDir::new().unwrap();
2406        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2407        let mut old = make_session("old_r");
2408        old.started_at_ms = 1_000;
2409        store.upsert_session(&old).unwrap();
2410        let mut ev = make_event("old_r", 0);
2411        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2412        store.append_event(&ev).unwrap();
2413
2414        store.prune_sessions_started_before(5_000).unwrap();
2415        let n: i64 = store
2416            .conn
2417            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2418            .unwrap();
2419        assert_eq!(n, 0);
2420    }
2421}