Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
19/// Rows below this use `sessions.started_at_ms` for time-window matching.
20const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23    "CREATE TABLE IF NOT EXISTS sessions (
24        id TEXT PRIMARY KEY,
25        agent TEXT NOT NULL,
26        model TEXT,
27        workspace TEXT NOT NULL,
28        started_at_ms INTEGER NOT NULL,
29        ended_at_ms INTEGER,
30        status TEXT NOT NULL,
31        trace_path TEXT NOT NULL
32    )",
33    "CREATE TABLE IF NOT EXISTS events (
34        id INTEGER PRIMARY KEY AUTOINCREMENT,
35        session_id TEXT NOT NULL,
36        seq INTEGER NOT NULL,
37        ts_ms INTEGER NOT NULL,
38        kind TEXT NOT NULL,
39        source TEXT NOT NULL,
40        tool TEXT,
41        tokens_in INTEGER,
42        tokens_out INTEGER,
43        cost_usd_e6 INTEGER,
44        payload TEXT NOT NULL
45    )",
46    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47    "CREATE TABLE IF NOT EXISTS files_touched (
48        id INTEGER PRIMARY KEY AUTOINCREMENT,
49        session_id TEXT NOT NULL,
50        path TEXT NOT NULL
51    )",
52    "CREATE TABLE IF NOT EXISTS skills_used (
53        id INTEGER PRIMARY KEY AUTOINCREMENT,
54        session_id TEXT NOT NULL,
55        skill TEXT NOT NULL
56    )",
57    "CREATE TABLE IF NOT EXISTS sync_outbox (
58        id INTEGER PRIMARY KEY AUTOINCREMENT,
59        session_id TEXT NOT NULL,
60        payload TEXT NOT NULL,
61        sent INTEGER NOT NULL DEFAULT 0
62    )",
63    "CREATE TABLE IF NOT EXISTS experiments (
64        id TEXT PRIMARY KEY,
65        name TEXT NOT NULL,
66        created_at_ms INTEGER NOT NULL,
67        metadata TEXT NOT NULL DEFAULT '{}'
68    )",
69    "CREATE TABLE IF NOT EXISTS experiment_tags (
70        experiment_id TEXT NOT NULL,
71        session_id TEXT NOT NULL,
72        variant TEXT NOT NULL,
73        PRIMARY KEY (experiment_id, session_id)
74    )",
75    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76    "CREATE TABLE IF NOT EXISTS sync_state (
77        k TEXT PRIMARY KEY,
78        v TEXT NOT NULL
79    )",
80    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82    "CREATE TABLE IF NOT EXISTS tool_spans (
83        span_id TEXT PRIMARY KEY,
84        session_id TEXT NOT NULL,
85        tool TEXT,
86        tool_call_id TEXT,
87        status TEXT NOT NULL,
88        started_at_ms INTEGER,
89        ended_at_ms INTEGER,
90        lead_time_ms INTEGER,
91        tokens_in INTEGER,
92        tokens_out INTEGER,
93        reasoning_tokens INTEGER,
94        cost_usd_e6 INTEGER,
95        paths_json TEXT NOT NULL DEFAULT '[]'
96    )",
97    "CREATE TABLE IF NOT EXISTS tool_span_paths (
98        span_id TEXT NOT NULL,
99        path TEXT NOT NULL,
100        PRIMARY KEY (span_id, path)
101    )",
102    "CREATE TABLE IF NOT EXISTS session_repo_binding (
103        session_id TEXT PRIMARY KEY,
104        start_commit TEXT,
105        end_commit TEXT,
106        branch TEXT,
107        dirty_start INTEGER,
108        dirty_end INTEGER,
109        repo_binding_source TEXT NOT NULL DEFAULT ''
110    )",
111    "CREATE TABLE IF NOT EXISTS repo_snapshots (
112        id TEXT PRIMARY KEY,
113        workspace TEXT NOT NULL,
114        head_commit TEXT,
115        dirty_fingerprint TEXT NOT NULL,
116        analyzer_version TEXT NOT NULL,
117        indexed_at_ms INTEGER NOT NULL,
118        dirty INTEGER NOT NULL DEFAULT 0,
119        graph_path TEXT NOT NULL
120    )",
121    "CREATE TABLE IF NOT EXISTS file_facts (
122        snapshot_id TEXT NOT NULL,
123        path TEXT NOT NULL,
124        language TEXT NOT NULL,
125        bytes INTEGER NOT NULL,
126        loc INTEGER NOT NULL,
127        sloc INTEGER NOT NULL,
128        complexity_total INTEGER NOT NULL,
129        max_fn_complexity INTEGER NOT NULL,
130        symbol_count INTEGER NOT NULL,
131        import_count INTEGER NOT NULL,
132        fan_in INTEGER NOT NULL,
133        fan_out INTEGER NOT NULL,
134        churn_30d INTEGER NOT NULL,
135        churn_90d INTEGER NOT NULL,
136        authors_90d INTEGER NOT NULL,
137        last_changed_ms INTEGER,
138        PRIMARY KEY (snapshot_id, path)
139    )",
140    "CREATE TABLE IF NOT EXISTS repo_edges (
141        snapshot_id TEXT NOT NULL,
142        from_id TEXT NOT NULL,
143        to_id TEXT NOT NULL,
144        kind TEXT NOT NULL,
145        weight INTEGER NOT NULL,
146        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147    )",
148    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
149    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
151    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152    "CREATE TABLE IF NOT EXISTS rules_used (
153        id INTEGER PRIMARY KEY AUTOINCREMENT,
154        session_id TEXT NOT NULL,
155        rule TEXT NOT NULL
156    )",
157    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
159    "CREATE TABLE IF NOT EXISTS remote_pull_state (
160        id INTEGER PRIMARY KEY CHECK (id = 1),
161        query_provider TEXT NOT NULL DEFAULT 'none',
162        cursor_json TEXT NOT NULL DEFAULT '',
163        last_success_ms INTEGER
164    )",
165    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166    "CREATE TABLE IF NOT EXISTS remote_sessions (
167        team_id TEXT NOT NULL,
168        workspace_hash TEXT NOT NULL,
169        session_id_hash TEXT NOT NULL,
170        json TEXT NOT NULL,
171        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172    )",
173    "CREATE TABLE IF NOT EXISTS remote_events (
174        team_id TEXT NOT NULL,
175        workspace_hash TEXT NOT NULL,
176        session_id_hash TEXT NOT NULL,
177        event_seq INTEGER NOT NULL,
178        json TEXT NOT NULL,
179        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180    )",
181    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182        team_id TEXT NOT NULL,
183        workspace_hash TEXT NOT NULL,
184        span_id_hash TEXT NOT NULL,
185        json TEXT NOT NULL,
186        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187    )",
188    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189        team_id TEXT NOT NULL,
190        workspace_hash TEXT NOT NULL,
191        snapshot_id_hash TEXT NOT NULL,
192        chunk_index INTEGER NOT NULL,
193        json TEXT NOT NULL,
194        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195    )",
196    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197        team_id TEXT NOT NULL,
198        workspace_hash TEXT NOT NULL,
199        fact_key TEXT NOT NULL,
200        json TEXT NOT NULL,
201        PRIMARY KEY (team_id, workspace_hash, fact_key)
202    )",
203];
204
205/// Per-workspace activity dashboard stats.
206#[derive(Clone)]
207pub struct InsightsStats {
208    pub total_sessions: u64,
209    pub running_sessions: u64,
210    pub total_events: u64,
211    /// (day label e.g. "Mon", count) last 7 days oldest first
212    pub sessions_by_day: Vec<(String, u64)>,
213    /// Recent sessions DESC by started_at, max 3; paired with event count
214    pub recent: Vec<(SessionRecord, u64)>,
215    /// Top tools by event count, max 5
216    pub top_tools: Vec<(String, u64)>,
217    pub total_cost_usd_e6: i64,
218    pub sessions_with_cost: u64,
219}
220
221/// Sync daemon / outbox status for `kaizen sync status`.
222pub struct SyncStatusSnapshot {
223    pub pending_outbox: u64,
224    pub last_success_ms: Option<u64>,
225    pub last_error: Option<String>,
226    pub consecutive_failures: u32,
227}
228
229/// Aggregate stats across sessions + events for a workspace.
230#[derive(serde::Serialize)]
231pub struct SummaryStats {
232    pub session_count: u64,
233    pub total_cost_usd_e6: i64,
234    pub by_agent: Vec<(String, u64)>,
235    pub by_model: Vec<(String, u64)>,
236    pub top_tools: Vec<(String, u64)>,
237}
238
239/// Skill vs Cursor rule for [`GuidancePerfRow`].
240#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
241#[serde(rename_all = "lowercase")]
242pub enum GuidanceKind {
243    Skill,
244    Rule,
245}
246
247/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
248#[derive(Clone, Debug, serde::Serialize)]
249pub struct GuidancePerfRow {
250    pub kind: GuidanceKind,
251    pub id: String,
252    pub sessions: u64,
253    pub sessions_pct: f64,
254    pub total_cost_usd_e6: i64,
255    pub avg_cost_per_session_usd: Option<f64>,
256    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
257    pub on_disk: bool,
258}
259
260/// Aggregated skill/rule adoption and cost proxy for a time window.
261#[derive(Clone, Debug, serde::Serialize)]
262pub struct GuidanceReport {
263    pub workspace: String,
264    pub window_start_ms: u64,
265    pub window_end_ms: u64,
266    pub sessions_in_window: u64,
267    pub workspace_avg_cost_per_session_usd: Option<f64>,
268    pub rows: Vec<GuidancePerfRow>,
269}
270
271/// Result of [`Store::prune_sessions_started_before`].
272#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
273pub struct PruneStats {
274    pub sessions_removed: u64,
275    pub events_removed: u64,
276}
277
278/// `sync_state` keys for agent rescan throttling and auto-prune.
279pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
280pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
281
282pub struct ToolSpanSyncRow {
283    pub span_id: String,
284    pub session_id: String,
285    pub tool: Option<String>,
286    pub tool_call_id: Option<String>,
287    pub status: String,
288    pub started_at_ms: Option<u64>,
289    pub ended_at_ms: Option<u64>,
290    pub lead_time_ms: Option<u64>,
291    pub tokens_in: Option<u32>,
292    pub tokens_out: Option<u32>,
293    pub reasoning_tokens: Option<u32>,
294    pub cost_usd_e6: Option<i64>,
295    pub paths: Vec<String>,
296}
297
298pub struct Store {
299    conn: Connection,
300}
301
302impl Store {
303    pub(crate) fn conn(&self) -> &Connection {
304        &self.conn
305    }
306
307    pub fn open(path: &Path) -> Result<Self> {
308        if let Some(parent) = path.parent() {
309            std::fs::create_dir_all(parent)?;
310        }
311        let conn =
312            Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
313        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
314        for sql in MIGRATIONS {
315            conn.execute_batch(sql)?;
316        }
317        ensure_schema_columns(&conn)?;
318        Ok(Self { conn })
319    }
320
321    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
322        self.conn.execute(
323            "INSERT INTO sessions (
324                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
325                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
326             )
327             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
328             ON CONFLICT(id) DO UPDATE SET
329               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
330               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
331               status=excluded.status, trace_path=excluded.trace_path,
332               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
333               branch=excluded.branch, dirty_start=excluded.dirty_start,
334               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source",
335            params![
336                s.id,
337                s.agent,
338                s.model,
339                s.workspace,
340                s.started_at_ms as i64,
341                s.ended_at_ms.map(|v| v as i64),
342                format!("{:?}", s.status),
343                s.trace_path,
344                s.start_commit,
345                s.end_commit,
346                s.branch,
347                s.dirty_start.map(bool_to_i64),
348                s.dirty_end.map(bool_to_i64),
349                s.repo_binding_source.clone().unwrap_or_default(),
350            ],
351        )?;
352        self.conn.execute(
353            "INSERT INTO session_repo_binding (
354                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
355             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
356             ON CONFLICT(session_id) DO UPDATE SET
357                start_commit=excluded.start_commit,
358                end_commit=excluded.end_commit,
359                branch=excluded.branch,
360                dirty_start=excluded.dirty_start,
361                dirty_end=excluded.dirty_end,
362                repo_binding_source=excluded.repo_binding_source",
363            params![
364                s.id,
365                s.start_commit,
366                s.end_commit,
367                s.branch,
368                s.dirty_start.map(bool_to_i64),
369                s.dirty_end.map(bool_to_i64),
370                s.repo_binding_source.clone().unwrap_or_default(),
371            ],
372        )?;
373        Ok(())
374    }
375
376    /// Insert a minimal session row if none exists. Used by hook ingestion when
377    /// the first observed event is not `SessionStart` (hooks installed mid-session).
378    pub fn ensure_session_stub(
379        &self,
380        id: &str,
381        agent: &str,
382        workspace: &str,
383        started_at_ms: u64,
384    ) -> Result<()> {
385        self.conn.execute(
386            "INSERT OR IGNORE INTO sessions (
387                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
388                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
389             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
390            params![id, agent, workspace, started_at_ms as i64],
391        )?;
392        Ok(())
393    }
394
395    /// Next `seq` for a new event in this session (0 when there are no events yet).
396    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
397        let n: i64 = self.conn.query_row(
398            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
399            [session_id],
400            |r| r.get(0),
401        )?;
402        Ok(n as u64)
403    }
404
405    pub fn append_event(&self, e: &Event) -> Result<()> {
406        self.append_event_with_sync(e, None)
407    }
408
409    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
410    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
411        let payload = serde_json::to_string(&e.payload)?;
412        self.conn.execute(
413            "INSERT INTO events (
414                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
415                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
416             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
417             ON CONFLICT(session_id, seq) DO UPDATE SET
418                ts_ms = excluded.ts_ms,
419                ts_exact = excluded.ts_exact,
420                kind = excluded.kind,
421                source = excluded.source,
422                tool = excluded.tool,
423                tool_call_id = excluded.tool_call_id,
424                tokens_in = excluded.tokens_in,
425                tokens_out = excluded.tokens_out,
426                reasoning_tokens = excluded.reasoning_tokens,
427                cost_usd_e6 = excluded.cost_usd_e6,
428                payload = excluded.payload",
429            params![
430                e.session_id,
431                e.seq as i64,
432                e.ts_ms as i64,
433                bool_to_i64(e.ts_exact),
434                format!("{:?}", e.kind),
435                format!("{:?}", e.source),
436                e.tool,
437                e.tool_call_id,
438                e.tokens_in.map(|v| v as i64),
439                e.tokens_out.map(|v| v as i64),
440                e.reasoning_tokens.map(|v| v as i64),
441                e.cost_usd_e6,
442                payload,
443            ],
444        )?;
445        if self.conn.changes() == 0 {
446            return Ok(());
447        }
448        index_event_derived(&self.conn, e)?;
449        rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
450        let Some(ctx) = ctx else {
451            return Ok(());
452        };
453        let sync = &ctx.sync;
454        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
455            return Ok(());
456        }
457        let Some(salt) = try_team_salt(sync) else {
458            tracing::warn!(
459                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
460            );
461            return Ok(());
462        };
463        if sync.sample_rate < 1.0 {
464            let u: f64 = rand::random();
465            if u > sync.sample_rate {
466                return Ok(());
467            }
468        }
469        let Some(session) = self.get_session(&e.session_id)? else {
470            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
471            return Ok(());
472        };
473        let mut outbound = outbound_event_from_row(e, &session, &salt);
474        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
475        let row = serde_json::to_string(&outbound)?;
476        self.conn.execute(
477            "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
478            params![e.session_id, row],
479        )?;
480        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
481        Ok(())
482    }
483
484    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
485        let mut stmt = self.conn.prepare(
486            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
487        )?;
488        let rows = stmt.query_map(params![limit as i64], |row| {
489            Ok((
490                row.get::<_, i64>(0)?,
491                row.get::<_, String>(1)?,
492                row.get::<_, String>(2)?,
493            ))
494        })?;
495        let mut out = Vec::new();
496        for r in rows {
497            out.push(r?);
498        }
499        Ok(out)
500    }
501
502    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
503        for id in ids {
504            self.conn
505                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
506        }
507        Ok(())
508    }
509
510    pub fn replace_outbox_rows(
511        &self,
512        owner_id: &str,
513        kind: &str,
514        payloads: &[String],
515    ) -> Result<()> {
516        self.conn.execute(
517            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
518            params![owner_id, kind],
519        )?;
520        for payload in payloads {
521            self.conn.execute(
522                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
523                params![owner_id, kind, payload],
524            )?;
525        }
526        Ok(())
527    }
528
529    pub fn outbox_pending_count(&self) -> Result<u64> {
530        let c: i64 =
531            self.conn
532                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
533                    r.get(0)
534                })?;
535        Ok(c as u64)
536    }
537
538    pub fn set_sync_state_ok(&self) -> Result<()> {
539        let now = now_ms().to_string();
540        self.conn.execute(
541            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
542             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
543            params![now],
544        )?;
545        self.conn.execute(
546            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
547             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
548            [],
549        )?;
550        self.conn
551            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
552        Ok(())
553    }
554
555    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
556        let prev: i64 = self
557            .conn
558            .query_row(
559                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
560                [],
561                |r| {
562                    let s: String = r.get(0)?;
563                    Ok(s.parse::<i64>().unwrap_or(0))
564                },
565            )
566            .optional()?
567            .unwrap_or(0);
568        let next = prev.saturating_add(1);
569        self.conn.execute(
570            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
571             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
572            params![msg],
573        )?;
574        self.conn.execute(
575            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
576             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
577            params![next.to_string()],
578        )?;
579        Ok(())
580    }
581
582    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
583        let pending_outbox = self.outbox_pending_count()?;
584        let last_success_ms = self
585            .conn
586            .query_row(
587                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
588                [],
589                |r| r.get::<_, String>(0),
590            )
591            .optional()?
592            .and_then(|s| s.parse().ok());
593        let last_error = self
594            .conn
595            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
596                r.get::<_, String>(0)
597            })
598            .optional()?;
599        let consecutive_failures = self
600            .conn
601            .query_row(
602                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
603                [],
604                |r| r.get::<_, String>(0),
605            )
606            .optional()?
607            .and_then(|s| s.parse().ok())
608            .unwrap_or(0);
609        Ok(SyncStatusSnapshot {
610            pending_outbox,
611            last_success_ms,
612            last_error,
613            consecutive_failures,
614        })
615    }
616
617    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
618        let row: Option<String> = self
619            .conn
620            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
621                r.get::<_, String>(0)
622            })
623            .optional()?;
624        Ok(row.and_then(|s| s.parse().ok()))
625    }
626
627    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
628        self.conn.execute(
629            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
630             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
631            params![key, v.to_string()],
632        )?;
633        Ok(())
634    }
635
636    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
637    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
638        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
639        let sessions_to_remove: i64 = tx.query_row(
640            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
641            params![cutoff_ms],
642            |r| r.get(0),
643        )?;
644        let events_to_remove: i64 = tx.query_row(
645            "SELECT COUNT(*) FROM events WHERE session_id IN \
646             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
647            params![cutoff_ms],
648            |r| r.get(0),
649        )?;
650
651        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
652        tx.execute(
653            &format!(
654                "DELETE FROM tool_span_paths WHERE span_id IN \
655                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
656            ),
657            params![cutoff_ms],
658        )?;
659        tx.execute(
660            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
661            params![cutoff_ms],
662        )?;
663        tx.execute(
664            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
665            params![cutoff_ms],
666        )?;
667        tx.execute(
668            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
669            params![cutoff_ms],
670        )?;
671        tx.execute(
672            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
673            params![cutoff_ms],
674        )?;
675        tx.execute(
676            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
677            params![cutoff_ms],
678        )?;
679        tx.execute(
680            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
681            params![cutoff_ms],
682        )?;
683        tx.execute(
684            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
685            params![cutoff_ms],
686        )?;
687        tx.execute(
688            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
689            params![cutoff_ms],
690        )?;
691        tx.execute(
692            "DELETE FROM sessions WHERE started_at_ms < ?1",
693            params![cutoff_ms],
694        )?;
695        tx.commit()?;
696        Ok(PruneStats {
697            sessions_removed: sessions_to_remove as u64,
698            events_removed: events_to_remove as u64,
699        })
700    }
701
702    /// Reclaim file space after large deletes (exclusive lock; can be slow).
703    pub fn vacuum(&self) -> Result<()> {
704        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
705        Ok(())
706    }
707
708    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
709        let mut stmt = self.conn.prepare(
710            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
711                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
712             FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
713        )?;
714        let rows = stmt.query_map(params![workspace], |row| {
715            Ok((
716                row.get::<_, String>(0)?,
717                row.get::<_, String>(1)?,
718                row.get::<_, Option<String>>(2)?,
719                row.get::<_, String>(3)?,
720                row.get::<_, i64>(4)?,
721                row.get::<_, Option<i64>>(5)?,
722                row.get::<_, String>(6)?,
723                row.get::<_, String>(7)?,
724                row.get::<_, Option<String>>(8)?,
725                row.get::<_, Option<String>>(9)?,
726                row.get::<_, Option<String>>(10)?,
727                row.get::<_, Option<i64>>(11)?,
728                row.get::<_, Option<i64>>(12)?,
729                row.get::<_, String>(13)?,
730            ))
731        })?;
732
733        let mut out = Vec::new();
734        for row in rows {
735            let (
736                id,
737                agent,
738                model,
739                workspace,
740                started,
741                ended,
742                status_str,
743                trace,
744                start_commit,
745                end_commit,
746                branch,
747                dirty_start,
748                dirty_end,
749                source,
750            ) = row?;
751            out.push(SessionRecord {
752                id,
753                agent,
754                model,
755                workspace,
756                started_at_ms: started as u64,
757                ended_at_ms: ended.map(|v| v as u64),
758                status: status_from_str(&status_str),
759                trace_path: trace,
760                start_commit,
761                end_commit,
762                branch,
763                dirty_start: dirty_start.map(i64_to_bool),
764                dirty_end: dirty_end.map(i64_to_bool),
765                repo_binding_source: empty_to_none(source),
766            });
767        }
768        Ok(out)
769    }
770
771    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
772        let session_count: i64 = self.conn.query_row(
773            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
774            params![workspace],
775            |r| r.get(0),
776        )?;
777
778        let total_cost: i64 = self.conn.query_row(
779            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
780             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
781            params![workspace],
782            |r| r.get(0),
783        )?;
784
785        let mut stmt = self.conn.prepare(
786            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
787        )?;
788        let by_agent: Vec<(String, u64)> = stmt
789            .query_map(params![workspace], |r| {
790                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
791            })?
792            .filter_map(|r| r.ok())
793            .collect();
794
795        let mut stmt = self.conn.prepare(
796            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
797        )?;
798        let by_model: Vec<(String, u64)> = stmt
799            .query_map(params![workspace], |r| {
800                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
801            })?
802            .filter_map(|r| r.ok())
803            .collect();
804
805        let mut stmt = self.conn.prepare(
806            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
807             WHERE s.workspace = ?1 AND tool IS NOT NULL
808             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
809        )?;
810        let top_tools: Vec<(String, u64)> = stmt
811            .query_map(params![workspace], |r| {
812                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
813            })?
814            .filter_map(|r| r.ok())
815            .collect();
816
817        Ok(SummaryStats {
818            session_count: session_count as u64,
819            total_cost_usd_e6: total_cost,
820            by_agent,
821            by_model,
822            top_tools,
823        })
824    }
825
826    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
827        let mut stmt = self.conn.prepare(
828            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
829                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
830             FROM events WHERE session_id = ?1 ORDER BY seq ASC",
831        )?;
832        let rows = stmt.query_map(params![session_id], |row| {
833            Ok((
834                row.get::<_, String>(0)?,
835                row.get::<_, i64>(1)?,
836                row.get::<_, i64>(2)?,
837                row.get::<_, i64>(3)?,
838                row.get::<_, String>(4)?,
839                row.get::<_, String>(5)?,
840                row.get::<_, Option<String>>(6)?,
841                row.get::<_, Option<String>>(7)?,
842                row.get::<_, Option<i64>>(8)?,
843                row.get::<_, Option<i64>>(9)?,
844                row.get::<_, Option<i64>>(10)?,
845                row.get::<_, Option<i64>>(11)?,
846                row.get::<_, String>(12)?,
847            ))
848        })?;
849
850        let mut events = Vec::new();
851        for row in rows {
852            let (
853                sid,
854                seq,
855                ts_ms,
856                ts_exact,
857                kind_str,
858                source_str,
859                tool,
860                tool_call_id,
861                tokens_in,
862                tokens_out,
863                reasoning_tokens,
864                cost_usd_e6,
865                payload_str,
866            ) = row?;
867            events.push(Event {
868                session_id: sid,
869                seq: seq as u64,
870                ts_ms: ts_ms as u64,
871                ts_exact: ts_exact != 0,
872                kind: kind_from_str(&kind_str),
873                source: source_from_str(&source_str),
874                tool,
875                tool_call_id,
876                tokens_in: tokens_in.map(|v| v as u32),
877                tokens_out: tokens_out.map(|v| v as u32),
878                reasoning_tokens: reasoning_tokens.map(|v| v as u32),
879                cost_usd_e6,
880                payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
881            });
882        }
883        Ok(events)
884    }
885
886    /// Update only status for existing session.
887    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
888        self.conn.execute(
889            "UPDATE sessions SET status = ?1 WHERE id = ?2",
890            params![format!("{:?}", status), id],
891        )?;
892        Ok(())
893    }
894
895    /// Workspace activity dashboard — feeds `cmd_insights`.
896    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
897        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
898        Ok(InsightsStats {
899            total_sessions: count_q(
900                &self.conn,
901                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
902                workspace,
903            )?,
904            running_sessions: count_q(
905                &self.conn,
906                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
907                workspace,
908            )?,
909            total_events: count_q(
910                &self.conn,
911                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
912                workspace,
913            )?,
914            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
915            recent: recent_sessions_3(&self.conn, workspace)?,
916            top_tools: top_tools_5(&self.conn, workspace)?,
917            total_cost_usd_e6,
918            sessions_with_cost,
919        })
920    }
921
922    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
923    pub fn retro_events_in_window(
924        &self,
925        workspace: &str,
926        start_ms: u64,
927        end_ms: u64,
928    ) -> Result<Vec<(SessionRecord, Event)>> {
929        let mut stmt = self.conn.prepare(
930            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
931                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
932                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
933                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
934             FROM events e
935             JOIN sessions s ON s.id = e.session_id
936             WHERE s.workspace = ?1
937               AND (
938                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
939                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
940               )
941             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
942        )?;
943        let rows = stmt.query_map(
944            params![
945                workspace,
946                start_ms as i64,
947                end_ms as i64,
948                SYNTHETIC_TS_CEILING_MS,
949            ],
950            |row| {
951                let payload_str: String = row.get(12)?;
952                let status_str: String = row.get(19)?;
953                Ok((
954                    SessionRecord {
955                        id: row.get(13)?,
956                        agent: row.get(14)?,
957                        model: row.get(15)?,
958                        workspace: row.get(16)?,
959                        started_at_ms: row.get::<_, i64>(17)? as u64,
960                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
961                        status: status_from_str(&status_str),
962                        trace_path: row.get(20)?,
963                        start_commit: row.get(21)?,
964                        end_commit: row.get(22)?,
965                        branch: row.get(23)?,
966                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
967                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
968                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
969                    },
970                    Event {
971                        session_id: row.get(0)?,
972                        seq: row.get::<_, i64>(1)? as u64,
973                        ts_ms: row.get::<_, i64>(2)? as u64,
974                        ts_exact: row.get::<_, i64>(3)? != 0,
975                        kind: kind_from_str(&row.get::<_, String>(4)?),
976                        source: source_from_str(&row.get::<_, String>(5)?),
977                        tool: row.get(6)?,
978                        tool_call_id: row.get(7)?,
979                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
980                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
981                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
982                        cost_usd_e6: row.get(11)?,
983                        payload: serde_json::from_str(&payload_str)
984                            .unwrap_or(serde_json::Value::Null),
985                    },
986                ))
987            },
988        )?;
989
990        let mut out = Vec::new();
991        for r in rows {
992            out.push(r?);
993        }
994        Ok(out)
995    }
996
997    /// Distinct `(session_id, path)` for sessions with activity in the time window.
998    pub fn files_touched_in_window(
999        &self,
1000        workspace: &str,
1001        start_ms: u64,
1002        end_ms: u64,
1003    ) -> Result<Vec<(String, String)>> {
1004        let mut stmt = self.conn.prepare(
1005            "SELECT DISTINCT ft.session_id, ft.path
1006             FROM files_touched ft
1007             JOIN sessions s ON s.id = ft.session_id
1008             WHERE s.workspace = ?1
1009               AND EXISTS (
1010                 SELECT 1 FROM events e
1011                 JOIN sessions ss ON ss.id = e.session_id
1012                 WHERE e.session_id = ft.session_id
1013                   AND (
1014                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1015                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1016                   )
1017               )
1018             ORDER BY ft.session_id, ft.path",
1019        )?;
1020        let out: Vec<(String, String)> = stmt
1021            .query_map(
1022                params![
1023                    workspace,
1024                    start_ms as i64,
1025                    end_ms as i64,
1026                    SYNTHETIC_TS_CEILING_MS,
1027                ],
1028                |r| Ok((r.get(0)?, r.get(1)?)),
1029            )?
1030            .filter_map(|r| r.ok())
1031            .collect();
1032        Ok(out)
1033    }
1034
1035    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1036    /// (any session with an indexed skill row; join events optional — use row existence).
1037    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1038        let mut stmt = self.conn.prepare(
1039            "SELECT DISTINCT su.skill
1040             FROM skills_used su
1041             JOIN sessions s ON s.id = su.session_id
1042             WHERE s.workspace = ?1
1043               AND EXISTS (
1044                 SELECT 1 FROM events e
1045                 JOIN sessions ss ON ss.id = e.session_id
1046                 WHERE e.session_id = su.session_id
1047                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1048               )
1049             ORDER BY su.skill",
1050        )?;
1051        let out: Vec<String> = stmt
1052            .query_map(
1053                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1054                |r| r.get::<_, String>(0),
1055            )?
1056            .filter_map(|r| r.ok())
1057            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1058            .collect();
1059        Ok(out)
1060    }
1061
1062    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1063    pub fn skills_used_in_window(
1064        &self,
1065        workspace: &str,
1066        start_ms: u64,
1067        end_ms: u64,
1068    ) -> Result<Vec<(String, String)>> {
1069        let mut stmt = self.conn.prepare(
1070            "SELECT DISTINCT su.session_id, su.skill
1071             FROM skills_used su
1072             JOIN sessions s ON s.id = su.session_id
1073             WHERE s.workspace = ?1
1074               AND EXISTS (
1075                 SELECT 1 FROM events e
1076                 JOIN sessions ss ON ss.id = e.session_id
1077                 WHERE e.session_id = su.session_id
1078                   AND (
1079                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1080                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1081                   )
1082               )
1083             ORDER BY su.session_id, su.skill",
1084        )?;
1085        let out: Vec<(String, String)> = stmt
1086            .query_map(
1087                params![
1088                    workspace,
1089                    start_ms as i64,
1090                    end_ms as i64,
1091                    SYNTHETIC_TS_CEILING_MS,
1092                ],
1093                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1094            )?
1095            .filter_map(|r| r.ok())
1096            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1097            .collect();
1098        Ok(out)
1099    }
1100
1101    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1102    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1103        let mut stmt = self.conn.prepare(
1104            "SELECT DISTINCT ru.rule
1105             FROM rules_used ru
1106             JOIN sessions s ON s.id = ru.session_id
1107             WHERE s.workspace = ?1
1108               AND EXISTS (
1109                 SELECT 1 FROM events e
1110                 JOIN sessions ss ON ss.id = e.session_id
1111                 WHERE e.session_id = ru.session_id
1112                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1113               )
1114             ORDER BY ru.rule",
1115        )?;
1116        let out: Vec<String> = stmt
1117            .query_map(
1118                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1119                |r| r.get::<_, String>(0),
1120            )?
1121            .filter_map(|r| r.ok())
1122            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1123            .collect();
1124        Ok(out)
1125    }
1126
1127    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1128    pub fn rules_used_in_window(
1129        &self,
1130        workspace: &str,
1131        start_ms: u64,
1132        end_ms: u64,
1133    ) -> Result<Vec<(String, String)>> {
1134        let mut stmt = self.conn.prepare(
1135            "SELECT DISTINCT ru.session_id, ru.rule
1136             FROM rules_used ru
1137             JOIN sessions s ON s.id = ru.session_id
1138             WHERE s.workspace = ?1
1139               AND EXISTS (
1140                 SELECT 1 FROM events e
1141                 JOIN sessions ss ON ss.id = e.session_id
1142                 WHERE e.session_id = ru.session_id
1143                   AND (
1144                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1145                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1146                   )
1147               )
1148             ORDER BY ru.session_id, ru.rule",
1149        )?;
1150        let out: Vec<(String, String)> = stmt
1151            .query_map(
1152                params![
1153                    workspace,
1154                    start_ms as i64,
1155                    end_ms as i64,
1156                    SYNTHETIC_TS_CEILING_MS,
1157                ],
1158                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1159            )?
1160            .filter_map(|r| r.ok())
1161            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1162            .collect();
1163        Ok(out)
1164    }
1165
1166    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1167    pub fn sessions_active_in_window(
1168        &self,
1169        workspace: &str,
1170        start_ms: u64,
1171        end_ms: u64,
1172    ) -> Result<HashSet<String>> {
1173        let mut stmt = self.conn.prepare(
1174            "SELECT DISTINCT s.id
1175             FROM sessions s
1176             WHERE s.workspace = ?1
1177               AND EXISTS (
1178                 SELECT 1 FROM events e
1179                 WHERE e.session_id = s.id
1180                   AND (
1181                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1182                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1183                   )
1184               )",
1185        )?;
1186        let out: HashSet<String> = stmt
1187            .query_map(
1188                params![
1189                    workspace,
1190                    start_ms as i64,
1191                    end_ms as i64,
1192                    SYNTHETIC_TS_CEILING_MS,
1193                ],
1194                |r| r.get(0),
1195            )?
1196            .filter_map(|r| r.ok())
1197            .collect();
1198        Ok(out)
1199    }
1200
1201    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1202    pub fn session_costs_usd_e6_in_window(
1203        &self,
1204        workspace: &str,
1205        start_ms: u64,
1206        end_ms: u64,
1207    ) -> Result<HashMap<String, i64>> {
1208        let mut stmt = self.conn.prepare(
1209            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1210             FROM events e
1211             JOIN sessions s ON s.id = e.session_id
1212             WHERE s.workspace = ?1
1213               AND (
1214                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1215                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1216               )
1217             GROUP BY e.session_id",
1218        )?;
1219        let rows: Vec<(String, i64)> = stmt
1220            .query_map(
1221                params![
1222                    workspace,
1223                    start_ms as i64,
1224                    end_ms as i64,
1225                    SYNTHETIC_TS_CEILING_MS,
1226                ],
1227                |r| Ok((r.get(0)?, r.get(1)?)),
1228            )?
1229            .filter_map(|r| r.ok())
1230            .collect();
1231        Ok(rows.into_iter().collect())
1232    }
1233
1234    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1235    pub fn guidance_report(
1236        &self,
1237        workspace: &str,
1238        window_start_ms: u64,
1239        window_end_ms: u64,
1240        skill_slugs_on_disk: &HashSet<String>,
1241        rule_slugs_on_disk: &HashSet<String>,
1242    ) -> Result<GuidanceReport> {
1243        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1244        let denom = active.len() as u64;
1245        let costs =
1246            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1247
1248        let workspace_avg_cost_per_session_usd = if denom > 0 {
1249            let total_e6: i64 = active
1250                .iter()
1251                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1252                .sum();
1253            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1254        } else {
1255            None
1256        };
1257
1258        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1259        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1260            skill_sessions.entry(skill).or_default().insert(sid);
1261        }
1262        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1263        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1264            rule_sessions.entry(rule).or_default().insert(sid);
1265        }
1266
1267        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1268
1269        let mut push_row =
1270            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1271                let sessions = sids.len() as u64;
1272                let sessions_pct = if denom > 0 {
1273                    sessions as f64 * 100.0 / denom as f64
1274                } else {
1275                    0.0
1276                };
1277                let total_cost_usd_e6: i64 = sids
1278                    .iter()
1279                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1280                    .sum();
1281                let avg_cost_per_session_usd = if sessions > 0 {
1282                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1283                } else {
1284                    None
1285                };
1286                let vs_workspace_avg_cost_per_session_usd =
1287                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1288                        (Some(avg), Some(w)) => Some(avg - w),
1289                        _ => None,
1290                    };
1291                rows.push(GuidancePerfRow {
1292                    kind,
1293                    id,
1294                    sessions,
1295                    sessions_pct,
1296                    total_cost_usd_e6,
1297                    avg_cost_per_session_usd,
1298                    vs_workspace_avg_cost_per_session_usd,
1299                    on_disk,
1300                });
1301            };
1302
1303        let mut seen_skills: HashSet<String> = HashSet::new();
1304        for (id, sids) in &skill_sessions {
1305            seen_skills.insert(id.clone());
1306            push_row(
1307                GuidanceKind::Skill,
1308                id.clone(),
1309                sids,
1310                skill_slugs_on_disk.contains(id),
1311            );
1312        }
1313        for slug in skill_slugs_on_disk {
1314            if seen_skills.contains(slug) {
1315                continue;
1316            }
1317            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1318        }
1319
1320        let mut seen_rules: HashSet<String> = HashSet::new();
1321        for (id, sids) in &rule_sessions {
1322            seen_rules.insert(id.clone());
1323            push_row(
1324                GuidanceKind::Rule,
1325                id.clone(),
1326                sids,
1327                rule_slugs_on_disk.contains(id),
1328            );
1329        }
1330        for slug in rule_slugs_on_disk {
1331            if seen_rules.contains(slug) {
1332                continue;
1333            }
1334            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1335        }
1336
1337        rows.sort_by(|a, b| {
1338            b.sessions
1339                .cmp(&a.sessions)
1340                .then_with(|| a.kind.cmp(&b.kind))
1341                .then_with(|| a.id.cmp(&b.id))
1342        });
1343
1344        Ok(GuidanceReport {
1345            workspace: workspace.to_string(),
1346            window_start_ms,
1347            window_end_ms,
1348            sessions_in_window: denom,
1349            workspace_avg_cost_per_session_usd,
1350            rows,
1351        })
1352    }
1353
1354    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1355        let mut stmt = self.conn.prepare(
1356            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1357                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
1358             FROM sessions WHERE id = ?1",
1359        )?;
1360        let mut rows = stmt.query_map(params![id], |row| {
1361            Ok((
1362                row.get::<_, String>(0)?,
1363                row.get::<_, String>(1)?,
1364                row.get::<_, Option<String>>(2)?,
1365                row.get::<_, String>(3)?,
1366                row.get::<_, i64>(4)?,
1367                row.get::<_, Option<i64>>(5)?,
1368                row.get::<_, String>(6)?,
1369                row.get::<_, String>(7)?,
1370                row.get::<_, Option<String>>(8)?,
1371                row.get::<_, Option<String>>(9)?,
1372                row.get::<_, Option<String>>(10)?,
1373                row.get::<_, Option<i64>>(11)?,
1374                row.get::<_, Option<i64>>(12)?,
1375                row.get::<_, String>(13)?,
1376            ))
1377        })?;
1378
1379        if let Some(row) = rows.next() {
1380            let (
1381                id,
1382                agent,
1383                model,
1384                workspace,
1385                started,
1386                ended,
1387                status_str,
1388                trace,
1389                start_commit,
1390                end_commit,
1391                branch,
1392                dirty_start,
1393                dirty_end,
1394                source,
1395            ) = row?;
1396            Ok(Some(SessionRecord {
1397                id,
1398                agent,
1399                model,
1400                workspace,
1401                started_at_ms: started as u64,
1402                ended_at_ms: ended.map(|v| v as u64),
1403                status: status_from_str(&status_str),
1404                trace_path: trace,
1405                start_commit,
1406                end_commit,
1407                branch,
1408                dirty_start: dirty_start.map(i64_to_bool),
1409                dirty_end: dirty_end.map(i64_to_bool),
1410                repo_binding_source: empty_to_none(source),
1411            }))
1412        } else {
1413            Ok(None)
1414        }
1415    }
1416
1417    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1418        let mut stmt = self.conn.prepare(
1419            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1420                    indexed_at_ms, dirty, graph_path
1421             FROM repo_snapshots WHERE workspace = ?1
1422             ORDER BY indexed_at_ms DESC LIMIT 1",
1423        )?;
1424        let mut rows = stmt.query_map(params![workspace], |row| {
1425            Ok(RepoSnapshotRecord {
1426                id: row.get(0)?,
1427                workspace: row.get(1)?,
1428                head_commit: row.get(2)?,
1429                dirty_fingerprint: row.get(3)?,
1430                analyzer_version: row.get(4)?,
1431                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1432                dirty: row.get::<_, i64>(6)? != 0,
1433                graph_path: row.get(7)?,
1434            })
1435        })?;
1436        Ok(rows.next().transpose()?)
1437    }
1438
1439    pub fn save_repo_snapshot(
1440        &self,
1441        snapshot: &RepoSnapshotRecord,
1442        facts: &[FileFact],
1443        edges: &[RepoEdge],
1444    ) -> Result<()> {
1445        self.conn.execute(
1446            "INSERT INTO repo_snapshots (
1447                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1448                indexed_at_ms, dirty, graph_path
1449             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1450             ON CONFLICT(id) DO UPDATE SET
1451                workspace=excluded.workspace,
1452                head_commit=excluded.head_commit,
1453                dirty_fingerprint=excluded.dirty_fingerprint,
1454                analyzer_version=excluded.analyzer_version,
1455                indexed_at_ms=excluded.indexed_at_ms,
1456                dirty=excluded.dirty,
1457                graph_path=excluded.graph_path",
1458            params![
1459                snapshot.id,
1460                snapshot.workspace,
1461                snapshot.head_commit,
1462                snapshot.dirty_fingerprint,
1463                snapshot.analyzer_version,
1464                snapshot.indexed_at_ms as i64,
1465                bool_to_i64(snapshot.dirty),
1466                snapshot.graph_path,
1467            ],
1468        )?;
1469        self.conn.execute(
1470            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1471            params![snapshot.id],
1472        )?;
1473        self.conn.execute(
1474            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1475            params![snapshot.id],
1476        )?;
1477        for fact in facts {
1478            self.conn.execute(
1479                "INSERT INTO file_facts (
1480                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1481                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1482                    churn_30d, churn_90d, authors_90d, last_changed_ms
1483                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1484                params![
1485                    fact.snapshot_id,
1486                    fact.path,
1487                    fact.language,
1488                    fact.bytes as i64,
1489                    fact.loc as i64,
1490                    fact.sloc as i64,
1491                    fact.complexity_total as i64,
1492                    fact.max_fn_complexity as i64,
1493                    fact.symbol_count as i64,
1494                    fact.import_count as i64,
1495                    fact.fan_in as i64,
1496                    fact.fan_out as i64,
1497                    fact.churn_30d as i64,
1498                    fact.churn_90d as i64,
1499                    fact.authors_90d as i64,
1500                    fact.last_changed_ms.map(|v| v as i64),
1501                ],
1502            )?;
1503        }
1504        for edge in edges {
1505            self.conn.execute(
1506                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1507                 VALUES (?1, ?2, ?3, ?4, ?5)
1508                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1509                 DO UPDATE SET weight = weight + excluded.weight",
1510                params![
1511                    snapshot.id,
1512                    edge.from_path,
1513                    edge.to_path,
1514                    edge.kind,
1515                    edge.weight as i64,
1516                ],
1517            )?;
1518        }
1519        Ok(())
1520    }
1521
1522    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1523        let mut stmt = self.conn.prepare(
1524            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1525                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1526                    churn_30d, churn_90d, authors_90d, last_changed_ms
1527             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1528        )?;
1529        let rows = stmt.query_map(params![snapshot_id], |row| {
1530            Ok(FileFact {
1531                snapshot_id: row.get(0)?,
1532                path: row.get(1)?,
1533                language: row.get(2)?,
1534                bytes: row.get::<_, i64>(3)? as u64,
1535                loc: row.get::<_, i64>(4)? as u32,
1536                sloc: row.get::<_, i64>(5)? as u32,
1537                complexity_total: row.get::<_, i64>(6)? as u32,
1538                max_fn_complexity: row.get::<_, i64>(7)? as u32,
1539                symbol_count: row.get::<_, i64>(8)? as u32,
1540                import_count: row.get::<_, i64>(9)? as u32,
1541                fan_in: row.get::<_, i64>(10)? as u32,
1542                fan_out: row.get::<_, i64>(11)? as u32,
1543                churn_30d: row.get::<_, i64>(12)? as u32,
1544                churn_90d: row.get::<_, i64>(13)? as u32,
1545                authors_90d: row.get::<_, i64>(14)? as u32,
1546                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1547            })
1548        })?;
1549        Ok(rows.filter_map(|row| row.ok()).collect())
1550    }
1551
1552    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1553        let mut stmt = self.conn.prepare(
1554            "SELECT from_id, to_id, kind, weight
1555             FROM repo_edges WHERE snapshot_id = ?1
1556             ORDER BY kind, from_id, to_id",
1557        )?;
1558        let rows = stmt.query_map(params![snapshot_id], |row| {
1559            Ok(RepoEdge {
1560                from_path: row.get(0)?,
1561                to_path: row.get(1)?,
1562                kind: row.get(2)?,
1563                weight: row.get::<_, i64>(3)? as u32,
1564            })
1565        })?;
1566        Ok(rows.filter_map(|row| row.ok()).collect())
1567    }
1568
1569    pub fn tool_spans_in_window(
1570        &self,
1571        workspace: &str,
1572        start_ms: u64,
1573        end_ms: u64,
1574    ) -> Result<Vec<ToolSpanView>> {
1575        let mut stmt = self.conn.prepare(
1576            "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1577                    ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1578             FROM tool_spans ts
1579             JOIN sessions s ON s.id = ts.session_id
1580             WHERE s.workspace = ?1
1581               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1582               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1583             ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1584        )?;
1585        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1586            let paths_json: String = row.get(7)?;
1587            Ok(ToolSpanView {
1588                tool: row
1589                    .get::<_, Option<String>>(0)?
1590                    .unwrap_or_else(|| "unknown".into()),
1591                status: row.get(1)?,
1592                lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1593                tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1594                tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1595                reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1596                cost_usd_e6: row.get(6)?,
1597                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1598            })
1599        })?;
1600        Ok(rows.filter_map(|row| row.ok()).collect())
1601    }
1602
1603    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1604        let mut stmt = self.conn.prepare(
1605            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1606                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1607             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1608        )?;
1609        let rows = stmt.query_map(params![session_id], |row| {
1610            let paths_json: String = row.get(12)?;
1611            Ok(ToolSpanSyncRow {
1612                span_id: row.get(0)?,
1613                session_id: row.get(1)?,
1614                tool: row.get(2)?,
1615                tool_call_id: row.get(3)?,
1616                status: row.get(4)?,
1617                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1618                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1619                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1620                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1621                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1622                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1623                cost_usd_e6: row.get(11)?,
1624                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1625            })
1626        })?;
1627        Ok(rows.filter_map(|row| row.ok()).collect())
1628    }
1629}
1630
1631fn now_ms() -> u64 {
1632    std::time::SystemTime::now()
1633        .duration_since(std::time::UNIX_EPOCH)
1634        .unwrap_or_default()
1635        .as_millis() as u64
1636}
1637
1638fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1639    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1640}
1641
1642fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1643    let cost: i64 = conn.query_row(
1644        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1645        params![workspace], |r| r.get(0),
1646    )?;
1647    let with_cost: i64 = conn.query_row(
1648        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1649        params![workspace], |r| r.get(0),
1650    )?;
1651    Ok((cost, with_cost as u64))
1652}
1653
1654fn day_label(day_idx: u64) -> &'static str {
1655    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
1656}
1657
1658fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
1659    let week_ago = now.saturating_sub(7 * 86_400_000);
1660    let mut stmt = conn
1661        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
1662    let days: Vec<u64> = stmt
1663        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
1664        .filter_map(|r| r.ok())
1665        .map(|v| v as u64 / 86_400_000)
1666        .collect();
1667    let today = now / 86_400_000;
1668    Ok((0u64..7)
1669        .map(|i| {
1670            let d = today.saturating_sub(6 - i);
1671            (
1672                day_label(d).to_string(),
1673                days.iter().filter(|&&x| x == d).count() as u64,
1674            )
1675        })
1676        .collect())
1677}
1678
1679fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
1680    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
1681               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
1682               s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
1683               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
1684               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
1685    let mut stmt = conn.prepare(sql)?;
1686    let out: Vec<(SessionRecord, u64)> = stmt
1687        .query_map(params![workspace], |r| {
1688            let st: String = r.get(6)?;
1689            Ok((
1690                SessionRecord {
1691                    id: r.get(0)?,
1692                    agent: r.get(1)?,
1693                    model: r.get(2)?,
1694                    workspace: r.get(3)?,
1695                    started_at_ms: r.get::<_, i64>(4)? as u64,
1696                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1697                    status: status_from_str(&st),
1698                    trace_path: r.get(7)?,
1699                    start_commit: r.get(8)?,
1700                    end_commit: r.get(9)?,
1701                    branch: r.get(10)?,
1702                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
1703                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
1704                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
1705                },
1706                r.get::<_, i64>(14)? as u64,
1707            ))
1708        })?
1709        .filter_map(|r| r.ok())
1710        .collect();
1711    Ok(out)
1712}
1713
1714fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
1715    let mut stmt = conn.prepare(
1716        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
1717         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
1718    )?;
1719    let out: Vec<(String, u64)> = stmt
1720        .query_map(params![workspace], |r| {
1721            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1722        })?
1723        .filter_map(|r| r.ok())
1724        .collect();
1725    Ok(out)
1726}
1727
1728fn status_from_str(s: &str) -> SessionStatus {
1729    match s {
1730        "Running" => SessionStatus::Running,
1731        "Waiting" => SessionStatus::Waiting,
1732        "Idle" => SessionStatus::Idle,
1733        _ => SessionStatus::Done,
1734    }
1735}
1736
1737fn kind_from_str(s: &str) -> EventKind {
1738    match s {
1739        "ToolCall" => EventKind::ToolCall,
1740        "ToolResult" => EventKind::ToolResult,
1741        "Message" => EventKind::Message,
1742        "Error" => EventKind::Error,
1743        "Cost" => EventKind::Cost,
1744        _ => EventKind::Hook,
1745    }
1746}
1747
1748fn source_from_str(s: &str) -> EventSource {
1749    match s {
1750        "Tail" => EventSource::Tail,
1751        "Hook" => EventSource::Hook,
1752        _ => EventSource::Proxy,
1753    }
1754}
1755
1756fn ensure_schema_columns(conn: &Connection) -> Result<()> {
1757    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
1758    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
1759    ensure_column(conn, "sessions", "branch", "TEXT")?;
1760    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
1761    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
1762    ensure_column(
1763        conn,
1764        "sessions",
1765        "repo_binding_source",
1766        "TEXT NOT NULL DEFAULT ''",
1767    )?;
1768    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
1769    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
1770    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
1771    ensure_column(
1772        conn,
1773        "sync_outbox",
1774        "kind",
1775        "TEXT NOT NULL DEFAULT 'events'",
1776    )?;
1777    ensure_column(
1778        conn,
1779        "experiments",
1780        "state",
1781        "TEXT NOT NULL DEFAULT 'Draft'",
1782    )?;
1783    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
1784    Ok(())
1785}
1786
1787fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
1788    if has_column(conn, table, column)? {
1789        return Ok(());
1790    }
1791    conn.execute(
1792        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
1793        [],
1794    )?;
1795    Ok(())
1796}
1797
1798fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
1799    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1800    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1801    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
1802}
1803
1804fn bool_to_i64(v: bool) -> i64 {
1805    if v { 1 } else { 0 }
1806}
1807
1808fn i64_to_bool(v: i64) -> bool {
1809    v != 0
1810}
1811
1812fn empty_to_none(s: String) -> Option<String> {
1813    if s.is_empty() { None } else { Some(s) }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818    use super::*;
1819    use serde_json::json;
1820    use std::collections::HashSet;
1821    use tempfile::TempDir;
1822
1823    fn make_session(id: &str) -> SessionRecord {
1824        SessionRecord {
1825            id: id.to_string(),
1826            agent: "cursor".to_string(),
1827            model: None,
1828            workspace: "/ws".to_string(),
1829            started_at_ms: 1000,
1830            ended_at_ms: None,
1831            status: SessionStatus::Done,
1832            trace_path: "/trace".to_string(),
1833            start_commit: None,
1834            end_commit: None,
1835            branch: None,
1836            dirty_start: None,
1837            dirty_end: None,
1838            repo_binding_source: None,
1839        }
1840    }
1841
1842    fn make_event(session_id: &str, seq: u64) -> Event {
1843        Event {
1844            session_id: session_id.to_string(),
1845            seq,
1846            ts_ms: 1000 + seq * 100,
1847            ts_exact: false,
1848            kind: EventKind::ToolCall,
1849            source: EventSource::Tail,
1850            tool: Some("read_file".to_string()),
1851            tool_call_id: Some(format!("call_{seq}")),
1852            tokens_in: None,
1853            tokens_out: None,
1854            reasoning_tokens: None,
1855            cost_usd_e6: None,
1856            payload: json!({}),
1857        }
1858    }
1859
1860    #[test]
1861    fn open_and_wal_mode() {
1862        let dir = TempDir::new().unwrap();
1863        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1864        let mode: String = store
1865            .conn
1866            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
1867            .unwrap();
1868        assert_eq!(mode, "wal");
1869    }
1870
1871    #[test]
1872    fn upsert_and_get_session() {
1873        let dir = TempDir::new().unwrap();
1874        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1875        let s = make_session("s1");
1876        store.upsert_session(&s).unwrap();
1877
1878        let got = store.get_session("s1").unwrap().unwrap();
1879        assert_eq!(got.id, "s1");
1880        assert_eq!(got.status, SessionStatus::Done);
1881    }
1882
1883    #[test]
1884    fn append_and_list_events_round_trip() {
1885        let dir = TempDir::new().unwrap();
1886        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1887        let s = make_session("s2");
1888        store.upsert_session(&s).unwrap();
1889        store.append_event(&make_event("s2", 0)).unwrap();
1890        store.append_event(&make_event("s2", 1)).unwrap();
1891
1892        let sessions = store.list_sessions("/ws").unwrap();
1893        assert_eq!(sessions.len(), 1);
1894        assert_eq!(sessions[0].id, "s2");
1895    }
1896
1897    #[test]
1898    fn summary_stats_empty() {
1899        let dir = TempDir::new().unwrap();
1900        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1901        let stats = store.summary_stats("/ws").unwrap();
1902        assert_eq!(stats.session_count, 0);
1903        assert_eq!(stats.total_cost_usd_e6, 0);
1904    }
1905
1906    #[test]
1907    fn summary_stats_counts_sessions() {
1908        let dir = TempDir::new().unwrap();
1909        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1910        store.upsert_session(&make_session("a")).unwrap();
1911        store.upsert_session(&make_session("b")).unwrap();
1912        let stats = store.summary_stats("/ws").unwrap();
1913        assert_eq!(stats.session_count, 2);
1914        assert_eq!(stats.by_agent.len(), 1);
1915        assert_eq!(stats.by_agent[0].0, "cursor");
1916        assert_eq!(stats.by_agent[0].1, 2);
1917    }
1918
1919    #[test]
1920    fn list_events_for_session_round_trip() {
1921        let dir = TempDir::new().unwrap();
1922        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1923        store.upsert_session(&make_session("s4")).unwrap();
1924        store.append_event(&make_event("s4", 0)).unwrap();
1925        store.append_event(&make_event("s4", 1)).unwrap();
1926        let events = store.list_events_for_session("s4").unwrap();
1927        assert_eq!(events.len(), 2);
1928        assert_eq!(events[0].seq, 0);
1929        assert_eq!(events[1].seq, 1);
1930    }
1931
1932    #[test]
1933    fn append_event_dedup() {
1934        let dir = TempDir::new().unwrap();
1935        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1936        store.upsert_session(&make_session("s5")).unwrap();
1937        store.append_event(&make_event("s5", 0)).unwrap();
1938        // Duplicate — should be silently ignored
1939        store.append_event(&make_event("s5", 0)).unwrap();
1940        let events = store.list_events_for_session("s5").unwrap();
1941        assert_eq!(events.len(), 1);
1942    }
1943
1944    #[test]
1945    fn upsert_idempotent() {
1946        let dir = TempDir::new().unwrap();
1947        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1948        let mut s = make_session("s3");
1949        store.upsert_session(&s).unwrap();
1950        s.status = SessionStatus::Running;
1951        store.upsert_session(&s).unwrap();
1952
1953        let got = store.get_session("s3").unwrap().unwrap();
1954        assert_eq!(got.status, SessionStatus::Running);
1955    }
1956
1957    #[test]
1958    fn append_event_indexes_path_from_payload() {
1959        let dir = TempDir::new().unwrap();
1960        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1961        store.upsert_session(&make_session("sx")).unwrap();
1962        let mut ev = make_event("sx", 0);
1963        ev.payload = json!({"input": {"path": "src/lib.rs"}});
1964        store.append_event(&ev).unwrap();
1965        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
1966        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
1967    }
1968
1969    #[test]
1970    fn update_session_status_changes_status() {
1971        let dir = TempDir::new().unwrap();
1972        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1973        store.upsert_session(&make_session("s6")).unwrap();
1974        store
1975            .update_session_status("s6", SessionStatus::Running)
1976            .unwrap();
1977        let got = store.get_session("s6").unwrap().unwrap();
1978        assert_eq!(got.status, SessionStatus::Running);
1979    }
1980
1981    #[test]
1982    fn prune_sessions_removes_old_rows_and_keeps_recent() {
1983        let dir = TempDir::new().unwrap();
1984        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1985        let mut old = make_session("old");
1986        old.started_at_ms = 1_000;
1987        let mut new = make_session("new");
1988        new.started_at_ms = 9_000_000_000_000;
1989        store.upsert_session(&old).unwrap();
1990        store.upsert_session(&new).unwrap();
1991        store.append_event(&make_event("old", 0)).unwrap();
1992
1993        let stats = store.prune_sessions_started_before(5_000).unwrap();
1994        assert_eq!(
1995            stats,
1996            PruneStats {
1997                sessions_removed: 1,
1998                events_removed: 1,
1999            }
2000        );
2001        assert!(store.get_session("old").unwrap().is_none());
2002        assert!(store.get_session("new").unwrap().is_some());
2003        let sessions = store.list_sessions("/ws").unwrap();
2004        assert_eq!(sessions.len(), 1);
2005        assert_eq!(sessions[0].id, "new");
2006    }
2007
2008    #[test]
2009    fn append_event_indexes_rules_from_payload() {
2010        let dir = TempDir::new().unwrap();
2011        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2012        store.upsert_session(&make_session("sr")).unwrap();
2013        let mut ev = make_event("sr", 0);
2014        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2015        store.append_event(&ev).unwrap();
2016        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2017        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2018    }
2019
2020    #[test]
2021    fn guidance_report_counts_skill_and_rule_sessions() {
2022        let dir = TempDir::new().unwrap();
2023        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2024        store.upsert_session(&make_session("sx")).unwrap();
2025        let mut ev = make_event("sx", 0);
2026        ev.payload =
2027            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2028        ev.cost_usd_e6 = Some(500_000);
2029        store.append_event(&ev).unwrap();
2030
2031        let mut skill_slugs = HashSet::new();
2032        skill_slugs.insert("tdd".into());
2033        let mut rule_slugs = HashSet::new();
2034        rule_slugs.insert("style".into());
2035
2036        let rep = store
2037            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2038            .unwrap();
2039        assert_eq!(rep.sessions_in_window, 1);
2040        let tdd = rep
2041            .rows
2042            .iter()
2043            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2044            .unwrap();
2045        assert_eq!(tdd.sessions, 1);
2046        assert!(tdd.on_disk);
2047        let style = rep
2048            .rows
2049            .iter()
2050            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2051            .unwrap();
2052        assert_eq!(style.sessions, 1);
2053        assert!(style.on_disk);
2054    }
2055
2056    #[test]
2057    fn prune_sessions_removes_rules_used_rows() {
2058        let dir = TempDir::new().unwrap();
2059        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2060        let mut old = make_session("old_r");
2061        old.started_at_ms = 1_000;
2062        store.upsert_session(&old).unwrap();
2063        let mut ev = make_event("old_r", 0);
2064        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2065        store.append_event(&ev).unwrap();
2066
2067        store.prune_sessions_started_before(5_000).unwrap();
2068        let n: i64 = store
2069            .conn
2070            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2071            .unwrap();
2072        assert_eq!(n, 0);
2073    }
2074}