Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
19/// Rows below this use `sessions.started_at_ms` for time-window matching.
20const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23    "CREATE TABLE IF NOT EXISTS sessions (
24        id TEXT PRIMARY KEY,
25        agent TEXT NOT NULL,
26        model TEXT,
27        workspace TEXT NOT NULL,
28        started_at_ms INTEGER NOT NULL,
29        ended_at_ms INTEGER,
30        status TEXT NOT NULL,
31        trace_path TEXT NOT NULL
32    )",
33    "CREATE TABLE IF NOT EXISTS events (
34        id INTEGER PRIMARY KEY AUTOINCREMENT,
35        session_id TEXT NOT NULL,
36        seq INTEGER NOT NULL,
37        ts_ms INTEGER NOT NULL,
38        kind TEXT NOT NULL,
39        source TEXT NOT NULL,
40        tool TEXT,
41        tokens_in INTEGER,
42        tokens_out INTEGER,
43        cost_usd_e6 INTEGER,
44        payload TEXT NOT NULL
45    )",
46    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47    "CREATE TABLE IF NOT EXISTS files_touched (
48        id INTEGER PRIMARY KEY AUTOINCREMENT,
49        session_id TEXT NOT NULL,
50        path TEXT NOT NULL
51    )",
52    "CREATE TABLE IF NOT EXISTS skills_used (
53        id INTEGER PRIMARY KEY AUTOINCREMENT,
54        session_id TEXT NOT NULL,
55        skill TEXT NOT NULL
56    )",
57    "CREATE TABLE IF NOT EXISTS sync_outbox (
58        id INTEGER PRIMARY KEY AUTOINCREMENT,
59        session_id TEXT NOT NULL,
60        payload TEXT NOT NULL,
61        sent INTEGER NOT NULL DEFAULT 0
62    )",
63    "CREATE TABLE IF NOT EXISTS experiments (
64        id TEXT PRIMARY KEY,
65        name TEXT NOT NULL,
66        created_at_ms INTEGER NOT NULL,
67        metadata TEXT NOT NULL DEFAULT '{}'
68    )",
69    "CREATE TABLE IF NOT EXISTS experiment_tags (
70        experiment_id TEXT NOT NULL,
71        session_id TEXT NOT NULL,
72        variant TEXT NOT NULL,
73        PRIMARY KEY (experiment_id, session_id)
74    )",
75    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76    "CREATE TABLE IF NOT EXISTS sync_state (
77        k TEXT PRIMARY KEY,
78        v TEXT NOT NULL
79    )",
80    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82    "CREATE TABLE IF NOT EXISTS tool_spans (
83        span_id TEXT PRIMARY KEY,
84        session_id TEXT NOT NULL,
85        tool TEXT,
86        tool_call_id TEXT,
87        status TEXT NOT NULL,
88        started_at_ms INTEGER,
89        ended_at_ms INTEGER,
90        lead_time_ms INTEGER,
91        tokens_in INTEGER,
92        tokens_out INTEGER,
93        reasoning_tokens INTEGER,
94        cost_usd_e6 INTEGER,
95        paths_json TEXT NOT NULL DEFAULT '[]'
96    )",
97    "CREATE TABLE IF NOT EXISTS tool_span_paths (
98        span_id TEXT NOT NULL,
99        path TEXT NOT NULL,
100        PRIMARY KEY (span_id, path)
101    )",
102    "CREATE TABLE IF NOT EXISTS session_repo_binding (
103        session_id TEXT PRIMARY KEY,
104        start_commit TEXT,
105        end_commit TEXT,
106        branch TEXT,
107        dirty_start INTEGER,
108        dirty_end INTEGER,
109        repo_binding_source TEXT NOT NULL DEFAULT ''
110    )",
111    "CREATE TABLE IF NOT EXISTS repo_snapshots (
112        id TEXT PRIMARY KEY,
113        workspace TEXT NOT NULL,
114        head_commit TEXT,
115        dirty_fingerprint TEXT NOT NULL,
116        analyzer_version TEXT NOT NULL,
117        indexed_at_ms INTEGER NOT NULL,
118        dirty INTEGER NOT NULL DEFAULT 0,
119        graph_path TEXT NOT NULL
120    )",
121    "CREATE TABLE IF NOT EXISTS file_facts (
122        snapshot_id TEXT NOT NULL,
123        path TEXT NOT NULL,
124        language TEXT NOT NULL,
125        bytes INTEGER NOT NULL,
126        loc INTEGER NOT NULL,
127        sloc INTEGER NOT NULL,
128        complexity_total INTEGER NOT NULL,
129        max_fn_complexity INTEGER NOT NULL,
130        symbol_count INTEGER NOT NULL,
131        import_count INTEGER NOT NULL,
132        fan_in INTEGER NOT NULL,
133        fan_out INTEGER NOT NULL,
134        churn_30d INTEGER NOT NULL,
135        churn_90d INTEGER NOT NULL,
136        authors_90d INTEGER NOT NULL,
137        last_changed_ms INTEGER,
138        PRIMARY KEY (snapshot_id, path)
139    )",
140    "CREATE TABLE IF NOT EXISTS repo_edges (
141        snapshot_id TEXT NOT NULL,
142        from_id TEXT NOT NULL,
143        to_id TEXT NOT NULL,
144        kind TEXT NOT NULL,
145        weight INTEGER NOT NULL,
146        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147    )",
148    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
149    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
151    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152    "CREATE TABLE IF NOT EXISTS rules_used (
153        id INTEGER PRIMARY KEY AUTOINCREMENT,
154        session_id TEXT NOT NULL,
155        rule TEXT NOT NULL
156    )",
157    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
159    "CREATE TABLE IF NOT EXISTS remote_pull_state (
160        id INTEGER PRIMARY KEY CHECK (id = 1),
161        query_provider TEXT NOT NULL DEFAULT 'none',
162        cursor_json TEXT NOT NULL DEFAULT '',
163        last_success_ms INTEGER
164    )",
165    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166    "CREATE TABLE IF NOT EXISTS remote_sessions (
167        team_id TEXT NOT NULL,
168        workspace_hash TEXT NOT NULL,
169        session_id_hash TEXT NOT NULL,
170        json TEXT NOT NULL,
171        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172    )",
173    "CREATE TABLE IF NOT EXISTS remote_events (
174        team_id TEXT NOT NULL,
175        workspace_hash TEXT NOT NULL,
176        session_id_hash TEXT NOT NULL,
177        event_seq INTEGER NOT NULL,
178        json TEXT NOT NULL,
179        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180    )",
181    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182        team_id TEXT NOT NULL,
183        workspace_hash TEXT NOT NULL,
184        span_id_hash TEXT NOT NULL,
185        json TEXT NOT NULL,
186        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187    )",
188    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189        team_id TEXT NOT NULL,
190        workspace_hash TEXT NOT NULL,
191        snapshot_id_hash TEXT NOT NULL,
192        chunk_index INTEGER NOT NULL,
193        json TEXT NOT NULL,
194        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195    )",
196    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197        team_id TEXT NOT NULL,
198        workspace_hash TEXT NOT NULL,
199        fact_key TEXT NOT NULL,
200        json TEXT NOT NULL,
201        PRIMARY KEY (team_id, workspace_hash, fact_key)
202    )",
203    "CREATE TABLE IF NOT EXISTS session_evals (
204        id            TEXT    PRIMARY KEY,
205        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206        judge_model   TEXT    NOT NULL,
207        rubric_id     TEXT    NOT NULL,
208        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209        rationale     TEXT    NOT NULL,
210        flagged       INTEGER NOT NULL DEFAULT 0,
211        created_at_ms INTEGER NOT NULL
212    );
213    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
215];
216
217/// Per-workspace activity dashboard stats.
218#[derive(Clone)]
219pub struct InsightsStats {
220    pub total_sessions: u64,
221    pub running_sessions: u64,
222    pub total_events: u64,
223    /// (day label e.g. "Mon", count) last 7 days oldest first
224    pub sessions_by_day: Vec<(String, u64)>,
225    /// Recent sessions DESC by started_at, max 3; paired with event count
226    pub recent: Vec<(SessionRecord, u64)>,
227    /// Top tools by event count, max 5
228    pub top_tools: Vec<(String, u64)>,
229    pub total_cost_usd_e6: i64,
230    pub sessions_with_cost: u64,
231}
232
233/// Sync daemon / outbox status for `kaizen sync status`.
234pub struct SyncStatusSnapshot {
235    pub pending_outbox: u64,
236    pub last_success_ms: Option<u64>,
237    pub last_error: Option<String>,
238    pub consecutive_failures: u32,
239}
240
241/// Aggregate stats across sessions + events for a workspace.
242#[derive(serde::Serialize)]
243pub struct SummaryStats {
244    pub session_count: u64,
245    pub total_cost_usd_e6: i64,
246    pub by_agent: Vec<(String, u64)>,
247    pub by_model: Vec<(String, u64)>,
248    pub top_tools: Vec<(String, u64)>,
249}
250
251/// Skill vs Cursor rule for [`GuidancePerfRow`].
252#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
253#[serde(rename_all = "lowercase")]
254pub enum GuidanceKind {
255    Skill,
256    Rule,
257}
258
259/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
260#[derive(Clone, Debug, serde::Serialize)]
261pub struct GuidancePerfRow {
262    pub kind: GuidanceKind,
263    pub id: String,
264    pub sessions: u64,
265    pub sessions_pct: f64,
266    pub total_cost_usd_e6: i64,
267    pub avg_cost_per_session_usd: Option<f64>,
268    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
269    pub on_disk: bool,
270}
271
272/// Aggregated skill/rule adoption and cost proxy for a time window.
273#[derive(Clone, Debug, serde::Serialize)]
274pub struct GuidanceReport {
275    pub workspace: String,
276    pub window_start_ms: u64,
277    pub window_end_ms: u64,
278    pub sessions_in_window: u64,
279    pub workspace_avg_cost_per_session_usd: Option<f64>,
280    pub rows: Vec<GuidancePerfRow>,
281}
282
283/// Result of [`Store::prune_sessions_started_before`].
284#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
285pub struct PruneStats {
286    pub sessions_removed: u64,
287    pub events_removed: u64,
288}
289
290/// `sync_state` keys for agent rescan throttling and auto-prune.
291pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
292pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
293
294pub struct ToolSpanSyncRow {
295    pub span_id: String,
296    pub session_id: String,
297    pub tool: Option<String>,
298    pub tool_call_id: Option<String>,
299    pub status: String,
300    pub started_at_ms: Option<u64>,
301    pub ended_at_ms: Option<u64>,
302    pub lead_time_ms: Option<u64>,
303    pub tokens_in: Option<u32>,
304    pub tokens_out: Option<u32>,
305    pub reasoning_tokens: Option<u32>,
306    pub cost_usd_e6: Option<i64>,
307    pub paths: Vec<String>,
308}
309
310pub struct Store {
311    conn: Connection,
312}
313
314impl Store {
315    pub(crate) fn conn(&self) -> &Connection {
316        &self.conn
317    }
318
319    pub fn open(path: &Path) -> Result<Self> {
320        if let Some(parent) = path.parent() {
321            std::fs::create_dir_all(parent)?;
322        }
323        let conn =
324            Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
325        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
326        for sql in MIGRATIONS {
327            conn.execute_batch(sql)?;
328        }
329        ensure_schema_columns(&conn)?;
330        Ok(Self { conn })
331    }
332
333    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
334        self.conn.execute(
335            "INSERT INTO sessions (
336                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
337                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
338             )
339             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
340             ON CONFLICT(id) DO UPDATE SET
341               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
342               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
343               status=excluded.status, trace_path=excluded.trace_path,
344               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
345               branch=excluded.branch, dirty_start=excluded.dirty_start,
346               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source",
347            params![
348                s.id,
349                s.agent,
350                s.model,
351                s.workspace,
352                s.started_at_ms as i64,
353                s.ended_at_ms.map(|v| v as i64),
354                format!("{:?}", s.status),
355                s.trace_path,
356                s.start_commit,
357                s.end_commit,
358                s.branch,
359                s.dirty_start.map(bool_to_i64),
360                s.dirty_end.map(bool_to_i64),
361                s.repo_binding_source.clone().unwrap_or_default(),
362            ],
363        )?;
364        self.conn.execute(
365            "INSERT INTO session_repo_binding (
366                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
367             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
368             ON CONFLICT(session_id) DO UPDATE SET
369                start_commit=excluded.start_commit,
370                end_commit=excluded.end_commit,
371                branch=excluded.branch,
372                dirty_start=excluded.dirty_start,
373                dirty_end=excluded.dirty_end,
374                repo_binding_source=excluded.repo_binding_source",
375            params![
376                s.id,
377                s.start_commit,
378                s.end_commit,
379                s.branch,
380                s.dirty_start.map(bool_to_i64),
381                s.dirty_end.map(bool_to_i64),
382                s.repo_binding_source.clone().unwrap_or_default(),
383            ],
384        )?;
385        Ok(())
386    }
387
388    /// Insert a minimal session row if none exists. Used by hook ingestion when
389    /// the first observed event is not `SessionStart` (hooks installed mid-session).
390    pub fn ensure_session_stub(
391        &self,
392        id: &str,
393        agent: &str,
394        workspace: &str,
395        started_at_ms: u64,
396    ) -> Result<()> {
397        self.conn.execute(
398            "INSERT OR IGNORE INTO sessions (
399                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
400                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
401             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
402            params![id, agent, workspace, started_at_ms as i64],
403        )?;
404        Ok(())
405    }
406
407    /// Next `seq` for a new event in this session (0 when there are no events yet).
408    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
409        let n: i64 = self.conn.query_row(
410            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
411            [session_id],
412            |r| r.get(0),
413        )?;
414        Ok(n as u64)
415    }
416
417    pub fn append_event(&self, e: &Event) -> Result<()> {
418        self.append_event_with_sync(e, None)
419    }
420
421    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
422    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
423        let payload = serde_json::to_string(&e.payload)?;
424        self.conn.execute(
425            "INSERT INTO events (
426                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
427                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
428             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
429             ON CONFLICT(session_id, seq) DO UPDATE SET
430                ts_ms = excluded.ts_ms,
431                ts_exact = excluded.ts_exact,
432                kind = excluded.kind,
433                source = excluded.source,
434                tool = excluded.tool,
435                tool_call_id = excluded.tool_call_id,
436                tokens_in = excluded.tokens_in,
437                tokens_out = excluded.tokens_out,
438                reasoning_tokens = excluded.reasoning_tokens,
439                cost_usd_e6 = excluded.cost_usd_e6,
440                payload = excluded.payload",
441            params![
442                e.session_id,
443                e.seq as i64,
444                e.ts_ms as i64,
445                bool_to_i64(e.ts_exact),
446                format!("{:?}", e.kind),
447                format!("{:?}", e.source),
448                e.tool,
449                e.tool_call_id,
450                e.tokens_in.map(|v| v as i64),
451                e.tokens_out.map(|v| v as i64),
452                e.reasoning_tokens.map(|v| v as i64),
453                e.cost_usd_e6,
454                payload,
455            ],
456        )?;
457        if self.conn.changes() == 0 {
458            return Ok(());
459        }
460        index_event_derived(&self.conn, e)?;
461        rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
462        let Some(ctx) = ctx else {
463            return Ok(());
464        };
465        let sync = &ctx.sync;
466        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
467            return Ok(());
468        }
469        let Some(salt) = try_team_salt(sync) else {
470            tracing::warn!(
471                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
472            );
473            return Ok(());
474        };
475        if sync.sample_rate < 1.0 {
476            let u: f64 = rand::random();
477            if u > sync.sample_rate {
478                return Ok(());
479            }
480        }
481        let Some(session) = self.get_session(&e.session_id)? else {
482            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
483            return Ok(());
484        };
485        let mut outbound = outbound_event_from_row(e, &session, &salt);
486        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
487        let row = serde_json::to_string(&outbound)?;
488        self.conn.execute(
489            "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
490            params![e.session_id, row],
491        )?;
492        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
493        Ok(())
494    }
495
496    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
497        let mut stmt = self.conn.prepare(
498            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
499        )?;
500        let rows = stmt.query_map(params![limit as i64], |row| {
501            Ok((
502                row.get::<_, i64>(0)?,
503                row.get::<_, String>(1)?,
504                row.get::<_, String>(2)?,
505            ))
506        })?;
507        let mut out = Vec::new();
508        for r in rows {
509            out.push(r?);
510        }
511        Ok(out)
512    }
513
514    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
515        for id in ids {
516            self.conn
517                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
518        }
519        Ok(())
520    }
521
522    pub fn replace_outbox_rows(
523        &self,
524        owner_id: &str,
525        kind: &str,
526        payloads: &[String],
527    ) -> Result<()> {
528        self.conn.execute(
529            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
530            params![owner_id, kind],
531        )?;
532        for payload in payloads {
533            self.conn.execute(
534                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
535                params![owner_id, kind, payload],
536            )?;
537        }
538        Ok(())
539    }
540
541    pub fn outbox_pending_count(&self) -> Result<u64> {
542        let c: i64 =
543            self.conn
544                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
545                    r.get(0)
546                })?;
547        Ok(c as u64)
548    }
549
550    pub fn set_sync_state_ok(&self) -> Result<()> {
551        let now = now_ms().to_string();
552        self.conn.execute(
553            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
554             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
555            params![now],
556        )?;
557        self.conn.execute(
558            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
559             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
560            [],
561        )?;
562        self.conn
563            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
564        Ok(())
565    }
566
567    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
568        let prev: i64 = self
569            .conn
570            .query_row(
571                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
572                [],
573                |r| {
574                    let s: String = r.get(0)?;
575                    Ok(s.parse::<i64>().unwrap_or(0))
576                },
577            )
578            .optional()?
579            .unwrap_or(0);
580        let next = prev.saturating_add(1);
581        self.conn.execute(
582            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
583             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
584            params![msg],
585        )?;
586        self.conn.execute(
587            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
588             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
589            params![next.to_string()],
590        )?;
591        Ok(())
592    }
593
594    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
595        let pending_outbox = self.outbox_pending_count()?;
596        let last_success_ms = self
597            .conn
598            .query_row(
599                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
600                [],
601                |r| r.get::<_, String>(0),
602            )
603            .optional()?
604            .and_then(|s| s.parse().ok());
605        let last_error = self
606            .conn
607            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
608                r.get::<_, String>(0)
609            })
610            .optional()?;
611        let consecutive_failures = self
612            .conn
613            .query_row(
614                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
615                [],
616                |r| r.get::<_, String>(0),
617            )
618            .optional()?
619            .and_then(|s| s.parse().ok())
620            .unwrap_or(0);
621        Ok(SyncStatusSnapshot {
622            pending_outbox,
623            last_success_ms,
624            last_error,
625            consecutive_failures,
626        })
627    }
628
629    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
630        let row: Option<String> = self
631            .conn
632            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
633                r.get::<_, String>(0)
634            })
635            .optional()?;
636        Ok(row.and_then(|s| s.parse().ok()))
637    }
638
639    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
640        self.conn.execute(
641            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
642             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
643            params![key, v.to_string()],
644        )?;
645        Ok(())
646    }
647
648    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
649    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
650        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
651        let sessions_to_remove: i64 = tx.query_row(
652            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
653            params![cutoff_ms],
654            |r| r.get(0),
655        )?;
656        let events_to_remove: i64 = tx.query_row(
657            "SELECT COUNT(*) FROM events WHERE session_id IN \
658             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
659            params![cutoff_ms],
660            |r| r.get(0),
661        )?;
662
663        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
664        tx.execute(
665            &format!(
666                "DELETE FROM tool_span_paths WHERE span_id IN \
667                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
668            ),
669            params![cutoff_ms],
670        )?;
671        tx.execute(
672            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
673            params![cutoff_ms],
674        )?;
675        tx.execute(
676            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
677            params![cutoff_ms],
678        )?;
679        tx.execute(
680            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
681            params![cutoff_ms],
682        )?;
683        tx.execute(
684            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
685            params![cutoff_ms],
686        )?;
687        tx.execute(
688            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
689            params![cutoff_ms],
690        )?;
691        tx.execute(
692            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
693            params![cutoff_ms],
694        )?;
695        tx.execute(
696            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
697            params![cutoff_ms],
698        )?;
699        tx.execute(
700            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
701            params![cutoff_ms],
702        )?;
703        tx.execute(
704            "DELETE FROM sessions WHERE started_at_ms < ?1",
705            params![cutoff_ms],
706        )?;
707        tx.commit()?;
708        Ok(PruneStats {
709            sessions_removed: sessions_to_remove as u64,
710            events_removed: events_to_remove as u64,
711        })
712    }
713
714    /// Reclaim file space after large deletes (exclusive lock; can be slow).
715    pub fn vacuum(&self) -> Result<()> {
716        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
717        Ok(())
718    }
719
720    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
721        let mut stmt = self.conn.prepare(
722            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
723                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
724             FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
725        )?;
726        let rows = stmt.query_map(params![workspace], |row| {
727            Ok((
728                row.get::<_, String>(0)?,
729                row.get::<_, String>(1)?,
730                row.get::<_, Option<String>>(2)?,
731                row.get::<_, String>(3)?,
732                row.get::<_, i64>(4)?,
733                row.get::<_, Option<i64>>(5)?,
734                row.get::<_, String>(6)?,
735                row.get::<_, String>(7)?,
736                row.get::<_, Option<String>>(8)?,
737                row.get::<_, Option<String>>(9)?,
738                row.get::<_, Option<String>>(10)?,
739                row.get::<_, Option<i64>>(11)?,
740                row.get::<_, Option<i64>>(12)?,
741                row.get::<_, String>(13)?,
742            ))
743        })?;
744
745        let mut out = Vec::new();
746        for row in rows {
747            let (
748                id,
749                agent,
750                model,
751                workspace,
752                started,
753                ended,
754                status_str,
755                trace,
756                start_commit,
757                end_commit,
758                branch,
759                dirty_start,
760                dirty_end,
761                source,
762            ) = row?;
763            out.push(SessionRecord {
764                id,
765                agent,
766                model,
767                workspace,
768                started_at_ms: started as u64,
769                ended_at_ms: ended.map(|v| v as u64),
770                status: status_from_str(&status_str),
771                trace_path: trace,
772                start_commit,
773                end_commit,
774                branch,
775                dirty_start: dirty_start.map(i64_to_bool),
776                dirty_end: dirty_end.map(i64_to_bool),
777                repo_binding_source: empty_to_none(source),
778            });
779        }
780        Ok(out)
781    }
782
783    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
784        let session_count: i64 = self.conn.query_row(
785            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
786            params![workspace],
787            |r| r.get(0),
788        )?;
789
790        let total_cost: i64 = self.conn.query_row(
791            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
792             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
793            params![workspace],
794            |r| r.get(0),
795        )?;
796
797        let mut stmt = self.conn.prepare(
798            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
799        )?;
800        let by_agent: Vec<(String, u64)> = stmt
801            .query_map(params![workspace], |r| {
802                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
803            })?
804            .filter_map(|r| r.ok())
805            .collect();
806
807        let mut stmt = self.conn.prepare(
808            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
809        )?;
810        let by_model: Vec<(String, u64)> = stmt
811            .query_map(params![workspace], |r| {
812                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
813            })?
814            .filter_map(|r| r.ok())
815            .collect();
816
817        let mut stmt = self.conn.prepare(
818            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
819             WHERE s.workspace = ?1 AND tool IS NOT NULL
820             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
821        )?;
822        let top_tools: Vec<(String, u64)> = stmt
823            .query_map(params![workspace], |r| {
824                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
825            })?
826            .filter_map(|r| r.ok())
827            .collect();
828
829        Ok(SummaryStats {
830            session_count: session_count as u64,
831            total_cost_usd_e6: total_cost,
832            by_agent,
833            by_model,
834            top_tools,
835        })
836    }
837
838    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
839        let mut stmt = self.conn.prepare(
840            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
841                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
842             FROM events WHERE session_id = ?1 ORDER BY seq ASC",
843        )?;
844        let rows = stmt.query_map(params![session_id], |row| {
845            Ok((
846                row.get::<_, String>(0)?,
847                row.get::<_, i64>(1)?,
848                row.get::<_, i64>(2)?,
849                row.get::<_, i64>(3)?,
850                row.get::<_, String>(4)?,
851                row.get::<_, String>(5)?,
852                row.get::<_, Option<String>>(6)?,
853                row.get::<_, Option<String>>(7)?,
854                row.get::<_, Option<i64>>(8)?,
855                row.get::<_, Option<i64>>(9)?,
856                row.get::<_, Option<i64>>(10)?,
857                row.get::<_, Option<i64>>(11)?,
858                row.get::<_, String>(12)?,
859            ))
860        })?;
861
862        let mut events = Vec::new();
863        for row in rows {
864            let (
865                sid,
866                seq,
867                ts_ms,
868                ts_exact,
869                kind_str,
870                source_str,
871                tool,
872                tool_call_id,
873                tokens_in,
874                tokens_out,
875                reasoning_tokens,
876                cost_usd_e6,
877                payload_str,
878            ) = row?;
879            events.push(Event {
880                session_id: sid,
881                seq: seq as u64,
882                ts_ms: ts_ms as u64,
883                ts_exact: ts_exact != 0,
884                kind: kind_from_str(&kind_str),
885                source: source_from_str(&source_str),
886                tool,
887                tool_call_id,
888                tokens_in: tokens_in.map(|v| v as u32),
889                tokens_out: tokens_out.map(|v| v as u32),
890                reasoning_tokens: reasoning_tokens.map(|v| v as u32),
891                cost_usd_e6,
892                payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
893            });
894        }
895        Ok(events)
896    }
897
898    /// Update only status for existing session.
899    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
900        self.conn.execute(
901            "UPDATE sessions SET status = ?1 WHERE id = ?2",
902            params![format!("{:?}", status), id],
903        )?;
904        Ok(())
905    }
906
907    /// Workspace activity dashboard — feeds `cmd_insights`.
908    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
909        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
910        Ok(InsightsStats {
911            total_sessions: count_q(
912                &self.conn,
913                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
914                workspace,
915            )?,
916            running_sessions: count_q(
917                &self.conn,
918                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
919                workspace,
920            )?,
921            total_events: count_q(
922                &self.conn,
923                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
924                workspace,
925            )?,
926            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
927            recent: recent_sessions_3(&self.conn, workspace)?,
928            top_tools: top_tools_5(&self.conn, workspace)?,
929            total_cost_usd_e6,
930            sessions_with_cost,
931        })
932    }
933
934    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
935    pub fn retro_events_in_window(
936        &self,
937        workspace: &str,
938        start_ms: u64,
939        end_ms: u64,
940    ) -> Result<Vec<(SessionRecord, Event)>> {
941        let mut stmt = self.conn.prepare(
942            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
943                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
944                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
945                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
946             FROM events e
947             JOIN sessions s ON s.id = e.session_id
948             WHERE s.workspace = ?1
949               AND (
950                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
951                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
952               )
953             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
954        )?;
955        let rows = stmt.query_map(
956            params![
957                workspace,
958                start_ms as i64,
959                end_ms as i64,
960                SYNTHETIC_TS_CEILING_MS,
961            ],
962            |row| {
963                let payload_str: String = row.get(12)?;
964                let status_str: String = row.get(19)?;
965                Ok((
966                    SessionRecord {
967                        id: row.get(13)?,
968                        agent: row.get(14)?,
969                        model: row.get(15)?,
970                        workspace: row.get(16)?,
971                        started_at_ms: row.get::<_, i64>(17)? as u64,
972                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
973                        status: status_from_str(&status_str),
974                        trace_path: row.get(20)?,
975                        start_commit: row.get(21)?,
976                        end_commit: row.get(22)?,
977                        branch: row.get(23)?,
978                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
979                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
980                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
981                    },
982                    Event {
983                        session_id: row.get(0)?,
984                        seq: row.get::<_, i64>(1)? as u64,
985                        ts_ms: row.get::<_, i64>(2)? as u64,
986                        ts_exact: row.get::<_, i64>(3)? != 0,
987                        kind: kind_from_str(&row.get::<_, String>(4)?),
988                        source: source_from_str(&row.get::<_, String>(5)?),
989                        tool: row.get(6)?,
990                        tool_call_id: row.get(7)?,
991                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
992                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
993                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
994                        cost_usd_e6: row.get(11)?,
995                        payload: serde_json::from_str(&payload_str)
996                            .unwrap_or(serde_json::Value::Null),
997                    },
998                ))
999            },
1000        )?;
1001
1002        let mut out = Vec::new();
1003        for r in rows {
1004            out.push(r?);
1005        }
1006        Ok(out)
1007    }
1008
1009    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1010    pub fn files_touched_in_window(
1011        &self,
1012        workspace: &str,
1013        start_ms: u64,
1014        end_ms: u64,
1015    ) -> Result<Vec<(String, String)>> {
1016        let mut stmt = self.conn.prepare(
1017            "SELECT DISTINCT ft.session_id, ft.path
1018             FROM files_touched ft
1019             JOIN sessions s ON s.id = ft.session_id
1020             WHERE s.workspace = ?1
1021               AND EXISTS (
1022                 SELECT 1 FROM events e
1023                 JOIN sessions ss ON ss.id = e.session_id
1024                 WHERE e.session_id = ft.session_id
1025                   AND (
1026                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1027                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1028                   )
1029               )
1030             ORDER BY ft.session_id, ft.path",
1031        )?;
1032        let out: Vec<(String, String)> = stmt
1033            .query_map(
1034                params![
1035                    workspace,
1036                    start_ms as i64,
1037                    end_ms as i64,
1038                    SYNTHETIC_TS_CEILING_MS,
1039                ],
1040                |r| Ok((r.get(0)?, r.get(1)?)),
1041            )?
1042            .filter_map(|r| r.ok())
1043            .collect();
1044        Ok(out)
1045    }
1046
1047    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1048    /// (any session with an indexed skill row; join events optional — use row existence).
1049    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1050        let mut stmt = self.conn.prepare(
1051            "SELECT DISTINCT su.skill
1052             FROM skills_used su
1053             JOIN sessions s ON s.id = su.session_id
1054             WHERE s.workspace = ?1
1055               AND EXISTS (
1056                 SELECT 1 FROM events e
1057                 JOIN sessions ss ON ss.id = e.session_id
1058                 WHERE e.session_id = su.session_id
1059                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1060               )
1061             ORDER BY su.skill",
1062        )?;
1063        let out: Vec<String> = stmt
1064            .query_map(
1065                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1066                |r| r.get::<_, String>(0),
1067            )?
1068            .filter_map(|r| r.ok())
1069            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1070            .collect();
1071        Ok(out)
1072    }
1073
1074    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1075    pub fn skills_used_in_window(
1076        &self,
1077        workspace: &str,
1078        start_ms: u64,
1079        end_ms: u64,
1080    ) -> Result<Vec<(String, String)>> {
1081        let mut stmt = self.conn.prepare(
1082            "SELECT DISTINCT su.session_id, su.skill
1083             FROM skills_used su
1084             JOIN sessions s ON s.id = su.session_id
1085             WHERE s.workspace = ?1
1086               AND EXISTS (
1087                 SELECT 1 FROM events e
1088                 JOIN sessions ss ON ss.id = e.session_id
1089                 WHERE e.session_id = su.session_id
1090                   AND (
1091                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1092                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1093                   )
1094               )
1095             ORDER BY su.session_id, su.skill",
1096        )?;
1097        let out: Vec<(String, String)> = stmt
1098            .query_map(
1099                params![
1100                    workspace,
1101                    start_ms as i64,
1102                    end_ms as i64,
1103                    SYNTHETIC_TS_CEILING_MS,
1104                ],
1105                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1106            )?
1107            .filter_map(|r| r.ok())
1108            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1109            .collect();
1110        Ok(out)
1111    }
1112
1113    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1114    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1115        let mut stmt = self.conn.prepare(
1116            "SELECT DISTINCT ru.rule
1117             FROM rules_used ru
1118             JOIN sessions s ON s.id = ru.session_id
1119             WHERE s.workspace = ?1
1120               AND EXISTS (
1121                 SELECT 1 FROM events e
1122                 JOIN sessions ss ON ss.id = e.session_id
1123                 WHERE e.session_id = ru.session_id
1124                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1125               )
1126             ORDER BY ru.rule",
1127        )?;
1128        let out: Vec<String> = stmt
1129            .query_map(
1130                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1131                |r| r.get::<_, String>(0),
1132            )?
1133            .filter_map(|r| r.ok())
1134            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1135            .collect();
1136        Ok(out)
1137    }
1138
1139    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1140    pub fn rules_used_in_window(
1141        &self,
1142        workspace: &str,
1143        start_ms: u64,
1144        end_ms: u64,
1145    ) -> Result<Vec<(String, String)>> {
1146        let mut stmt = self.conn.prepare(
1147            "SELECT DISTINCT ru.session_id, ru.rule
1148             FROM rules_used ru
1149             JOIN sessions s ON s.id = ru.session_id
1150             WHERE s.workspace = ?1
1151               AND EXISTS (
1152                 SELECT 1 FROM events e
1153                 JOIN sessions ss ON ss.id = e.session_id
1154                 WHERE e.session_id = ru.session_id
1155                   AND (
1156                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1157                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1158                   )
1159               )
1160             ORDER BY ru.session_id, ru.rule",
1161        )?;
1162        let out: Vec<(String, String)> = stmt
1163            .query_map(
1164                params![
1165                    workspace,
1166                    start_ms as i64,
1167                    end_ms as i64,
1168                    SYNTHETIC_TS_CEILING_MS,
1169                ],
1170                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1171            )?
1172            .filter_map(|r| r.ok())
1173            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1174            .collect();
1175        Ok(out)
1176    }
1177
1178    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1179    pub fn sessions_active_in_window(
1180        &self,
1181        workspace: &str,
1182        start_ms: u64,
1183        end_ms: u64,
1184    ) -> Result<HashSet<String>> {
1185        let mut stmt = self.conn.prepare(
1186            "SELECT DISTINCT s.id
1187             FROM sessions s
1188             WHERE s.workspace = ?1
1189               AND EXISTS (
1190                 SELECT 1 FROM events e
1191                 WHERE e.session_id = s.id
1192                   AND (
1193                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1194                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1195                   )
1196               )",
1197        )?;
1198        let out: HashSet<String> = stmt
1199            .query_map(
1200                params![
1201                    workspace,
1202                    start_ms as i64,
1203                    end_ms as i64,
1204                    SYNTHETIC_TS_CEILING_MS,
1205                ],
1206                |r| r.get(0),
1207            )?
1208            .filter_map(|r| r.ok())
1209            .collect();
1210        Ok(out)
1211    }
1212
1213    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1214    pub fn session_costs_usd_e6_in_window(
1215        &self,
1216        workspace: &str,
1217        start_ms: u64,
1218        end_ms: u64,
1219    ) -> Result<HashMap<String, i64>> {
1220        let mut stmt = self.conn.prepare(
1221            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1222             FROM events e
1223             JOIN sessions s ON s.id = e.session_id
1224             WHERE s.workspace = ?1
1225               AND (
1226                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1227                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1228               )
1229             GROUP BY e.session_id",
1230        )?;
1231        let rows: Vec<(String, i64)> = stmt
1232            .query_map(
1233                params![
1234                    workspace,
1235                    start_ms as i64,
1236                    end_ms as i64,
1237                    SYNTHETIC_TS_CEILING_MS,
1238                ],
1239                |r| Ok((r.get(0)?, r.get(1)?)),
1240            )?
1241            .filter_map(|r| r.ok())
1242            .collect();
1243        Ok(rows.into_iter().collect())
1244    }
1245
1246    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1247    pub fn guidance_report(
1248        &self,
1249        workspace: &str,
1250        window_start_ms: u64,
1251        window_end_ms: u64,
1252        skill_slugs_on_disk: &HashSet<String>,
1253        rule_slugs_on_disk: &HashSet<String>,
1254    ) -> Result<GuidanceReport> {
1255        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1256        let denom = active.len() as u64;
1257        let costs =
1258            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1259
1260        let workspace_avg_cost_per_session_usd = if denom > 0 {
1261            let total_e6: i64 = active
1262                .iter()
1263                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1264                .sum();
1265            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1266        } else {
1267            None
1268        };
1269
1270        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1271        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1272            skill_sessions.entry(skill).or_default().insert(sid);
1273        }
1274        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1275        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1276            rule_sessions.entry(rule).or_default().insert(sid);
1277        }
1278
1279        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1280
1281        let mut push_row =
1282            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1283                let sessions = sids.len() as u64;
1284                let sessions_pct = if denom > 0 {
1285                    sessions as f64 * 100.0 / denom as f64
1286                } else {
1287                    0.0
1288                };
1289                let total_cost_usd_e6: i64 = sids
1290                    .iter()
1291                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1292                    .sum();
1293                let avg_cost_per_session_usd = if sessions > 0 {
1294                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1295                } else {
1296                    None
1297                };
1298                let vs_workspace_avg_cost_per_session_usd =
1299                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1300                        (Some(avg), Some(w)) => Some(avg - w),
1301                        _ => None,
1302                    };
1303                rows.push(GuidancePerfRow {
1304                    kind,
1305                    id,
1306                    sessions,
1307                    sessions_pct,
1308                    total_cost_usd_e6,
1309                    avg_cost_per_session_usd,
1310                    vs_workspace_avg_cost_per_session_usd,
1311                    on_disk,
1312                });
1313            };
1314
1315        let mut seen_skills: HashSet<String> = HashSet::new();
1316        for (id, sids) in &skill_sessions {
1317            seen_skills.insert(id.clone());
1318            push_row(
1319                GuidanceKind::Skill,
1320                id.clone(),
1321                sids,
1322                skill_slugs_on_disk.contains(id),
1323            );
1324        }
1325        for slug in skill_slugs_on_disk {
1326            if seen_skills.contains(slug) {
1327                continue;
1328            }
1329            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1330        }
1331
1332        let mut seen_rules: HashSet<String> = HashSet::new();
1333        for (id, sids) in &rule_sessions {
1334            seen_rules.insert(id.clone());
1335            push_row(
1336                GuidanceKind::Rule,
1337                id.clone(),
1338                sids,
1339                rule_slugs_on_disk.contains(id),
1340            );
1341        }
1342        for slug in rule_slugs_on_disk {
1343            if seen_rules.contains(slug) {
1344                continue;
1345            }
1346            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1347        }
1348
1349        rows.sort_by(|a, b| {
1350            b.sessions
1351                .cmp(&a.sessions)
1352                .then_with(|| a.kind.cmp(&b.kind))
1353                .then_with(|| a.id.cmp(&b.id))
1354        });
1355
1356        Ok(GuidanceReport {
1357            workspace: workspace.to_string(),
1358            window_start_ms,
1359            window_end_ms,
1360            sessions_in_window: denom,
1361            workspace_avg_cost_per_session_usd,
1362            rows,
1363        })
1364    }
1365
1366    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1367        let mut stmt = self.conn.prepare(
1368            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1369                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
1370             FROM sessions WHERE id = ?1",
1371        )?;
1372        let mut rows = stmt.query_map(params![id], |row| {
1373            Ok((
1374                row.get::<_, String>(0)?,
1375                row.get::<_, String>(1)?,
1376                row.get::<_, Option<String>>(2)?,
1377                row.get::<_, String>(3)?,
1378                row.get::<_, i64>(4)?,
1379                row.get::<_, Option<i64>>(5)?,
1380                row.get::<_, String>(6)?,
1381                row.get::<_, String>(7)?,
1382                row.get::<_, Option<String>>(8)?,
1383                row.get::<_, Option<String>>(9)?,
1384                row.get::<_, Option<String>>(10)?,
1385                row.get::<_, Option<i64>>(11)?,
1386                row.get::<_, Option<i64>>(12)?,
1387                row.get::<_, String>(13)?,
1388            ))
1389        })?;
1390
1391        if let Some(row) = rows.next() {
1392            let (
1393                id,
1394                agent,
1395                model,
1396                workspace,
1397                started,
1398                ended,
1399                status_str,
1400                trace,
1401                start_commit,
1402                end_commit,
1403                branch,
1404                dirty_start,
1405                dirty_end,
1406                source,
1407            ) = row?;
1408            Ok(Some(SessionRecord {
1409                id,
1410                agent,
1411                model,
1412                workspace,
1413                started_at_ms: started as u64,
1414                ended_at_ms: ended.map(|v| v as u64),
1415                status: status_from_str(&status_str),
1416                trace_path: trace,
1417                start_commit,
1418                end_commit,
1419                branch,
1420                dirty_start: dirty_start.map(i64_to_bool),
1421                dirty_end: dirty_end.map(i64_to_bool),
1422                repo_binding_source: empty_to_none(source),
1423            }))
1424        } else {
1425            Ok(None)
1426        }
1427    }
1428
1429    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1430        let mut stmt = self.conn.prepare(
1431            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1432                    indexed_at_ms, dirty, graph_path
1433             FROM repo_snapshots WHERE workspace = ?1
1434             ORDER BY indexed_at_ms DESC LIMIT 1",
1435        )?;
1436        let mut rows = stmt.query_map(params![workspace], |row| {
1437            Ok(RepoSnapshotRecord {
1438                id: row.get(0)?,
1439                workspace: row.get(1)?,
1440                head_commit: row.get(2)?,
1441                dirty_fingerprint: row.get(3)?,
1442                analyzer_version: row.get(4)?,
1443                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1444                dirty: row.get::<_, i64>(6)? != 0,
1445                graph_path: row.get(7)?,
1446            })
1447        })?;
1448        Ok(rows.next().transpose()?)
1449    }
1450
1451    pub fn save_repo_snapshot(
1452        &self,
1453        snapshot: &RepoSnapshotRecord,
1454        facts: &[FileFact],
1455        edges: &[RepoEdge],
1456    ) -> Result<()> {
1457        self.conn.execute(
1458            "INSERT INTO repo_snapshots (
1459                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1460                indexed_at_ms, dirty, graph_path
1461             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1462             ON CONFLICT(id) DO UPDATE SET
1463                workspace=excluded.workspace,
1464                head_commit=excluded.head_commit,
1465                dirty_fingerprint=excluded.dirty_fingerprint,
1466                analyzer_version=excluded.analyzer_version,
1467                indexed_at_ms=excluded.indexed_at_ms,
1468                dirty=excluded.dirty,
1469                graph_path=excluded.graph_path",
1470            params![
1471                snapshot.id,
1472                snapshot.workspace,
1473                snapshot.head_commit,
1474                snapshot.dirty_fingerprint,
1475                snapshot.analyzer_version,
1476                snapshot.indexed_at_ms as i64,
1477                bool_to_i64(snapshot.dirty),
1478                snapshot.graph_path,
1479            ],
1480        )?;
1481        self.conn.execute(
1482            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1483            params![snapshot.id],
1484        )?;
1485        self.conn.execute(
1486            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1487            params![snapshot.id],
1488        )?;
1489        for fact in facts {
1490            self.conn.execute(
1491                "INSERT INTO file_facts (
1492                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1493                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1494                    churn_30d, churn_90d, authors_90d, last_changed_ms
1495                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1496                params![
1497                    fact.snapshot_id,
1498                    fact.path,
1499                    fact.language,
1500                    fact.bytes as i64,
1501                    fact.loc as i64,
1502                    fact.sloc as i64,
1503                    fact.complexity_total as i64,
1504                    fact.max_fn_complexity as i64,
1505                    fact.symbol_count as i64,
1506                    fact.import_count as i64,
1507                    fact.fan_in as i64,
1508                    fact.fan_out as i64,
1509                    fact.churn_30d as i64,
1510                    fact.churn_90d as i64,
1511                    fact.authors_90d as i64,
1512                    fact.last_changed_ms.map(|v| v as i64),
1513                ],
1514            )?;
1515        }
1516        for edge in edges {
1517            self.conn.execute(
1518                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1519                 VALUES (?1, ?2, ?3, ?4, ?5)
1520                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1521                 DO UPDATE SET weight = weight + excluded.weight",
1522                params![
1523                    snapshot.id,
1524                    edge.from_path,
1525                    edge.to_path,
1526                    edge.kind,
1527                    edge.weight as i64,
1528                ],
1529            )?;
1530        }
1531        Ok(())
1532    }
1533
1534    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1535        let mut stmt = self.conn.prepare(
1536            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1537                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1538                    churn_30d, churn_90d, authors_90d, last_changed_ms
1539             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1540        )?;
1541        let rows = stmt.query_map(params![snapshot_id], |row| {
1542            Ok(FileFact {
1543                snapshot_id: row.get(0)?,
1544                path: row.get(1)?,
1545                language: row.get(2)?,
1546                bytes: row.get::<_, i64>(3)? as u64,
1547                loc: row.get::<_, i64>(4)? as u32,
1548                sloc: row.get::<_, i64>(5)? as u32,
1549                complexity_total: row.get::<_, i64>(6)? as u32,
1550                max_fn_complexity: row.get::<_, i64>(7)? as u32,
1551                symbol_count: row.get::<_, i64>(8)? as u32,
1552                import_count: row.get::<_, i64>(9)? as u32,
1553                fan_in: row.get::<_, i64>(10)? as u32,
1554                fan_out: row.get::<_, i64>(11)? as u32,
1555                churn_30d: row.get::<_, i64>(12)? as u32,
1556                churn_90d: row.get::<_, i64>(13)? as u32,
1557                authors_90d: row.get::<_, i64>(14)? as u32,
1558                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1559            })
1560        })?;
1561        Ok(rows.filter_map(|row| row.ok()).collect())
1562    }
1563
1564    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1565        let mut stmt = self.conn.prepare(
1566            "SELECT from_id, to_id, kind, weight
1567             FROM repo_edges WHERE snapshot_id = ?1
1568             ORDER BY kind, from_id, to_id",
1569        )?;
1570        let rows = stmt.query_map(params![snapshot_id], |row| {
1571            Ok(RepoEdge {
1572                from_path: row.get(0)?,
1573                to_path: row.get(1)?,
1574                kind: row.get(2)?,
1575                weight: row.get::<_, i64>(3)? as u32,
1576            })
1577        })?;
1578        Ok(rows.filter_map(|row| row.ok()).collect())
1579    }
1580
1581    pub fn tool_spans_in_window(
1582        &self,
1583        workspace: &str,
1584        start_ms: u64,
1585        end_ms: u64,
1586    ) -> Result<Vec<ToolSpanView>> {
1587        let mut stmt = self.conn.prepare(
1588            "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1589                    ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1590             FROM tool_spans ts
1591             JOIN sessions s ON s.id = ts.session_id
1592             WHERE s.workspace = ?1
1593               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1594               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1595             ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1596        )?;
1597        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1598            let paths_json: String = row.get(7)?;
1599            Ok(ToolSpanView {
1600                tool: row
1601                    .get::<_, Option<String>>(0)?
1602                    .unwrap_or_else(|| "unknown".into()),
1603                status: row.get(1)?,
1604                lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1605                tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1606                tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1607                reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1608                cost_usd_e6: row.get(6)?,
1609                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1610            })
1611        })?;
1612        Ok(rows.filter_map(|row| row.ok()).collect())
1613    }
1614
1615    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1616        let mut stmt = self.conn.prepare(
1617            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1618                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1619             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1620        )?;
1621        let rows = stmt.query_map(params![session_id], |row| {
1622            let paths_json: String = row.get(12)?;
1623            Ok(ToolSpanSyncRow {
1624                span_id: row.get(0)?,
1625                session_id: row.get(1)?,
1626                tool: row.get(2)?,
1627                tool_call_id: row.get(3)?,
1628                status: row.get(4)?,
1629                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1630                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1631                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1632                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1633                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1634                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1635                cost_usd_e6: row.get(11)?,
1636                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1637            })
1638        })?;
1639        Ok(rows.filter_map(|row| row.ok()).collect())
1640    }
1641
1642    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1643        self.conn.execute(
1644            "INSERT OR REPLACE INTO session_evals
1645             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1646             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1647            rusqlite::params![
1648                eval.id,
1649                eval.session_id,
1650                eval.judge_model,
1651                eval.rubric_id,
1652                eval.score,
1653                eval.rationale,
1654                eval.flagged as i64,
1655                eval.created_at_ms as i64,
1656            ],
1657        )?;
1658        Ok(())
1659    }
1660
1661    pub fn list_evals_in_window(
1662        &self,
1663        start_ms: u64,
1664        end_ms: u64,
1665    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1666        let mut stmt = self.conn.prepare(
1667            "SELECT id, session_id, judge_model, rubric_id, score,
1668                    rationale, flagged, created_at_ms
1669             FROM session_evals
1670             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1671             ORDER BY created_at_ms ASC",
1672        )?;
1673        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1674            Ok(crate::eval::types::EvalRow {
1675                id: r.get(0)?,
1676                session_id: r.get(1)?,
1677                judge_model: r.get(2)?,
1678                rubric_id: r.get(3)?,
1679                score: r.get(4)?,
1680                rationale: r.get(5)?,
1681                flagged: r.get::<_, i64>(6)? != 0,
1682                created_at_ms: r.get::<_, i64>(7)? as u64,
1683            })
1684        })?;
1685        rows.collect()
1686    }
1687
1688    pub fn list_evals_for_session(
1689        &self,
1690        session_id: &str,
1691    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1692        let mut stmt = self.conn.prepare(
1693            "SELECT id, session_id, judge_model, rubric_id, score,
1694                    rationale, flagged, created_at_ms
1695             FROM session_evals
1696             WHERE session_id = ?1
1697             ORDER BY created_at_ms DESC",
1698        )?;
1699        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1700            Ok(crate::eval::types::EvalRow {
1701                id: r.get(0)?,
1702                session_id: r.get(1)?,
1703                judge_model: r.get(2)?,
1704                rubric_id: r.get(3)?,
1705                score: r.get(4)?,
1706                rationale: r.get(5)?,
1707                flagged: r.get::<_, i64>(6)? != 0,
1708                created_at_ms: r.get::<_, i64>(7)? as u64,
1709            })
1710        })?;
1711        rows.collect()
1712    }
1713
1714    pub fn list_sessions_for_eval(
1715        &self,
1716        since_ms: u64,
1717        min_cost_usd: f64,
1718    ) -> Result<Vec<crate::core::event::SessionRecord>> {
1719        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
1720        let mut stmt = self.conn.prepare(
1721            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1722                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
1723                    s.dirty_start, s.dirty_end, s.repo_binding_source
1724             FROM sessions s
1725             WHERE s.started_at_ms >= ?1
1726               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
1727               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
1728             ORDER BY s.started_at_ms DESC",
1729        )?;
1730        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
1731            Ok((
1732                r.get::<_, String>(0)?,
1733                r.get::<_, String>(1)?,
1734                r.get::<_, Option<String>>(2)?,
1735                r.get::<_, String>(3)?,
1736                r.get::<_, i64>(4)?,
1737                r.get::<_, Option<i64>>(5)?,
1738                r.get::<_, String>(6)?,
1739                r.get::<_, String>(7)?,
1740                r.get::<_, Option<String>>(8)?,
1741                r.get::<_, Option<String>>(9)?,
1742                r.get::<_, Option<String>>(10)?,
1743                r.get::<_, Option<i64>>(11)?,
1744                r.get::<_, Option<i64>>(12)?,
1745                r.get::<_, Option<String>>(13)?,
1746            ))
1747        })?;
1748        let mut out = Vec::new();
1749        for row in rows {
1750            let (
1751                id,
1752                agent,
1753                model,
1754                workspace,
1755                started,
1756                ended,
1757                status_str,
1758                trace,
1759                start_commit,
1760                end_commit,
1761                branch,
1762                dirty_start,
1763                dirty_end,
1764                source,
1765            ) = row?;
1766            out.push(crate::core::event::SessionRecord {
1767                id,
1768                agent,
1769                model,
1770                workspace,
1771                started_at_ms: started as u64,
1772                ended_at_ms: ended.map(|v| v as u64),
1773                status: status_from_str(&status_str),
1774                trace_path: trace,
1775                start_commit,
1776                end_commit,
1777                branch,
1778                dirty_start: dirty_start.map(i64_to_bool),
1779                dirty_end: dirty_end.map(i64_to_bool),
1780                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
1781            });
1782        }
1783        Ok(out)
1784    }
1785}
1786
1787fn now_ms() -> u64 {
1788    std::time::SystemTime::now()
1789        .duration_since(std::time::UNIX_EPOCH)
1790        .unwrap_or_default()
1791        .as_millis() as u64
1792}
1793
1794fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1795    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1796}
1797
1798fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1799    let cost: i64 = conn.query_row(
1800        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1801        params![workspace], |r| r.get(0),
1802    )?;
1803    let with_cost: i64 = conn.query_row(
1804        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1805        params![workspace], |r| r.get(0),
1806    )?;
1807    Ok((cost, with_cost as u64))
1808}
1809
1810fn day_label(day_idx: u64) -> &'static str {
1811    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
1812}
1813
1814fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
1815    let week_ago = now.saturating_sub(7 * 86_400_000);
1816    let mut stmt = conn
1817        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
1818    let days: Vec<u64> = stmt
1819        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
1820        .filter_map(|r| r.ok())
1821        .map(|v| v as u64 / 86_400_000)
1822        .collect();
1823    let today = now / 86_400_000;
1824    Ok((0u64..7)
1825        .map(|i| {
1826            let d = today.saturating_sub(6 - i);
1827            (
1828                day_label(d).to_string(),
1829                days.iter().filter(|&&x| x == d).count() as u64,
1830            )
1831        })
1832        .collect())
1833}
1834
1835fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
1836    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
1837               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
1838               s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
1839               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
1840               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
1841    let mut stmt = conn.prepare(sql)?;
1842    let out: Vec<(SessionRecord, u64)> = stmt
1843        .query_map(params![workspace], |r| {
1844            let st: String = r.get(6)?;
1845            Ok((
1846                SessionRecord {
1847                    id: r.get(0)?,
1848                    agent: r.get(1)?,
1849                    model: r.get(2)?,
1850                    workspace: r.get(3)?,
1851                    started_at_ms: r.get::<_, i64>(4)? as u64,
1852                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1853                    status: status_from_str(&st),
1854                    trace_path: r.get(7)?,
1855                    start_commit: r.get(8)?,
1856                    end_commit: r.get(9)?,
1857                    branch: r.get(10)?,
1858                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
1859                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
1860                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
1861                },
1862                r.get::<_, i64>(14)? as u64,
1863            ))
1864        })?
1865        .filter_map(|r| r.ok())
1866        .collect();
1867    Ok(out)
1868}
1869
1870fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
1871    let mut stmt = conn.prepare(
1872        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
1873         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
1874    )?;
1875    let out: Vec<(String, u64)> = stmt
1876        .query_map(params![workspace], |r| {
1877            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1878        })?
1879        .filter_map(|r| r.ok())
1880        .collect();
1881    Ok(out)
1882}
1883
1884fn status_from_str(s: &str) -> SessionStatus {
1885    match s {
1886        "Running" => SessionStatus::Running,
1887        "Waiting" => SessionStatus::Waiting,
1888        "Idle" => SessionStatus::Idle,
1889        _ => SessionStatus::Done,
1890    }
1891}
1892
1893fn kind_from_str(s: &str) -> EventKind {
1894    match s {
1895        "ToolCall" => EventKind::ToolCall,
1896        "ToolResult" => EventKind::ToolResult,
1897        "Message" => EventKind::Message,
1898        "Error" => EventKind::Error,
1899        "Cost" => EventKind::Cost,
1900        _ => EventKind::Hook,
1901    }
1902}
1903
1904fn source_from_str(s: &str) -> EventSource {
1905    match s {
1906        "Tail" => EventSource::Tail,
1907        "Hook" => EventSource::Hook,
1908        _ => EventSource::Proxy,
1909    }
1910}
1911
1912fn ensure_schema_columns(conn: &Connection) -> Result<()> {
1913    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
1914    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
1915    ensure_column(conn, "sessions", "branch", "TEXT")?;
1916    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
1917    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
1918    ensure_column(
1919        conn,
1920        "sessions",
1921        "repo_binding_source",
1922        "TEXT NOT NULL DEFAULT ''",
1923    )?;
1924    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
1925    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
1926    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
1927    ensure_column(
1928        conn,
1929        "sync_outbox",
1930        "kind",
1931        "TEXT NOT NULL DEFAULT 'events'",
1932    )?;
1933    ensure_column(
1934        conn,
1935        "experiments",
1936        "state",
1937        "TEXT NOT NULL DEFAULT 'Draft'",
1938    )?;
1939    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
1940    Ok(())
1941}
1942
1943fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
1944    if has_column(conn, table, column)? {
1945        return Ok(());
1946    }
1947    conn.execute(
1948        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
1949        [],
1950    )?;
1951    Ok(())
1952}
1953
1954fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
1955    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1956    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1957    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
1958}
1959
1960fn bool_to_i64(v: bool) -> i64 {
1961    if v { 1 } else { 0 }
1962}
1963
1964fn i64_to_bool(v: i64) -> bool {
1965    v != 0
1966}
1967
1968fn empty_to_none(s: String) -> Option<String> {
1969    if s.is_empty() { None } else { Some(s) }
1970}
1971
1972#[cfg(test)]
1973mod tests {
1974    use super::*;
1975    use serde_json::json;
1976    use std::collections::HashSet;
1977    use tempfile::TempDir;
1978
1979    fn make_session(id: &str) -> SessionRecord {
1980        SessionRecord {
1981            id: id.to_string(),
1982            agent: "cursor".to_string(),
1983            model: None,
1984            workspace: "/ws".to_string(),
1985            started_at_ms: 1000,
1986            ended_at_ms: None,
1987            status: SessionStatus::Done,
1988            trace_path: "/trace".to_string(),
1989            start_commit: None,
1990            end_commit: None,
1991            branch: None,
1992            dirty_start: None,
1993            dirty_end: None,
1994            repo_binding_source: None,
1995        }
1996    }
1997
1998    fn make_event(session_id: &str, seq: u64) -> Event {
1999        Event {
2000            session_id: session_id.to_string(),
2001            seq,
2002            ts_ms: 1000 + seq * 100,
2003            ts_exact: false,
2004            kind: EventKind::ToolCall,
2005            source: EventSource::Tail,
2006            tool: Some("read_file".to_string()),
2007            tool_call_id: Some(format!("call_{seq}")),
2008            tokens_in: None,
2009            tokens_out: None,
2010            reasoning_tokens: None,
2011            cost_usd_e6: None,
2012            payload: json!({}),
2013        }
2014    }
2015
2016    #[test]
2017    fn open_and_wal_mode() {
2018        let dir = TempDir::new().unwrap();
2019        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2020        let mode: String = store
2021            .conn
2022            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2023            .unwrap();
2024        assert_eq!(mode, "wal");
2025    }
2026
2027    #[test]
2028    fn upsert_and_get_session() {
2029        let dir = TempDir::new().unwrap();
2030        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2031        let s = make_session("s1");
2032        store.upsert_session(&s).unwrap();
2033
2034        let got = store.get_session("s1").unwrap().unwrap();
2035        assert_eq!(got.id, "s1");
2036        assert_eq!(got.status, SessionStatus::Done);
2037    }
2038
2039    #[test]
2040    fn append_and_list_events_round_trip() {
2041        let dir = TempDir::new().unwrap();
2042        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2043        let s = make_session("s2");
2044        store.upsert_session(&s).unwrap();
2045        store.append_event(&make_event("s2", 0)).unwrap();
2046        store.append_event(&make_event("s2", 1)).unwrap();
2047
2048        let sessions = store.list_sessions("/ws").unwrap();
2049        assert_eq!(sessions.len(), 1);
2050        assert_eq!(sessions[0].id, "s2");
2051    }
2052
2053    #[test]
2054    fn summary_stats_empty() {
2055        let dir = TempDir::new().unwrap();
2056        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2057        let stats = store.summary_stats("/ws").unwrap();
2058        assert_eq!(stats.session_count, 0);
2059        assert_eq!(stats.total_cost_usd_e6, 0);
2060    }
2061
2062    #[test]
2063    fn summary_stats_counts_sessions() {
2064        let dir = TempDir::new().unwrap();
2065        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2066        store.upsert_session(&make_session("a")).unwrap();
2067        store.upsert_session(&make_session("b")).unwrap();
2068        let stats = store.summary_stats("/ws").unwrap();
2069        assert_eq!(stats.session_count, 2);
2070        assert_eq!(stats.by_agent.len(), 1);
2071        assert_eq!(stats.by_agent[0].0, "cursor");
2072        assert_eq!(stats.by_agent[0].1, 2);
2073    }
2074
2075    #[test]
2076    fn list_events_for_session_round_trip() {
2077        let dir = TempDir::new().unwrap();
2078        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2079        store.upsert_session(&make_session("s4")).unwrap();
2080        store.append_event(&make_event("s4", 0)).unwrap();
2081        store.append_event(&make_event("s4", 1)).unwrap();
2082        let events = store.list_events_for_session("s4").unwrap();
2083        assert_eq!(events.len(), 2);
2084        assert_eq!(events[0].seq, 0);
2085        assert_eq!(events[1].seq, 1);
2086    }
2087
2088    #[test]
2089    fn append_event_dedup() {
2090        let dir = TempDir::new().unwrap();
2091        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2092        store.upsert_session(&make_session("s5")).unwrap();
2093        store.append_event(&make_event("s5", 0)).unwrap();
2094        // Duplicate — should be silently ignored
2095        store.append_event(&make_event("s5", 0)).unwrap();
2096        let events = store.list_events_for_session("s5").unwrap();
2097        assert_eq!(events.len(), 1);
2098    }
2099
2100    #[test]
2101    fn upsert_idempotent() {
2102        let dir = TempDir::new().unwrap();
2103        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2104        let mut s = make_session("s3");
2105        store.upsert_session(&s).unwrap();
2106        s.status = SessionStatus::Running;
2107        store.upsert_session(&s).unwrap();
2108
2109        let got = store.get_session("s3").unwrap().unwrap();
2110        assert_eq!(got.status, SessionStatus::Running);
2111    }
2112
2113    #[test]
2114    fn append_event_indexes_path_from_payload() {
2115        let dir = TempDir::new().unwrap();
2116        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2117        store.upsert_session(&make_session("sx")).unwrap();
2118        let mut ev = make_event("sx", 0);
2119        ev.payload = json!({"input": {"path": "src/lib.rs"}});
2120        store.append_event(&ev).unwrap();
2121        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2122        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2123    }
2124
2125    #[test]
2126    fn update_session_status_changes_status() {
2127        let dir = TempDir::new().unwrap();
2128        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2129        store.upsert_session(&make_session("s6")).unwrap();
2130        store
2131            .update_session_status("s6", SessionStatus::Running)
2132            .unwrap();
2133        let got = store.get_session("s6").unwrap().unwrap();
2134        assert_eq!(got.status, SessionStatus::Running);
2135    }
2136
2137    #[test]
2138    fn prune_sessions_removes_old_rows_and_keeps_recent() {
2139        let dir = TempDir::new().unwrap();
2140        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2141        let mut old = make_session("old");
2142        old.started_at_ms = 1_000;
2143        let mut new = make_session("new");
2144        new.started_at_ms = 9_000_000_000_000;
2145        store.upsert_session(&old).unwrap();
2146        store.upsert_session(&new).unwrap();
2147        store.append_event(&make_event("old", 0)).unwrap();
2148
2149        let stats = store.prune_sessions_started_before(5_000).unwrap();
2150        assert_eq!(
2151            stats,
2152            PruneStats {
2153                sessions_removed: 1,
2154                events_removed: 1,
2155            }
2156        );
2157        assert!(store.get_session("old").unwrap().is_none());
2158        assert!(store.get_session("new").unwrap().is_some());
2159        let sessions = store.list_sessions("/ws").unwrap();
2160        assert_eq!(sessions.len(), 1);
2161        assert_eq!(sessions[0].id, "new");
2162    }
2163
2164    #[test]
2165    fn append_event_indexes_rules_from_payload() {
2166        let dir = TempDir::new().unwrap();
2167        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2168        store.upsert_session(&make_session("sr")).unwrap();
2169        let mut ev = make_event("sr", 0);
2170        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2171        store.append_event(&ev).unwrap();
2172        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2173        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2174    }
2175
2176    #[test]
2177    fn guidance_report_counts_skill_and_rule_sessions() {
2178        let dir = TempDir::new().unwrap();
2179        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2180        store.upsert_session(&make_session("sx")).unwrap();
2181        let mut ev = make_event("sx", 0);
2182        ev.payload =
2183            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2184        ev.cost_usd_e6 = Some(500_000);
2185        store.append_event(&ev).unwrap();
2186
2187        let mut skill_slugs = HashSet::new();
2188        skill_slugs.insert("tdd".into());
2189        let mut rule_slugs = HashSet::new();
2190        rule_slugs.insert("style".into());
2191
2192        let rep = store
2193            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2194            .unwrap();
2195        assert_eq!(rep.sessions_in_window, 1);
2196        let tdd = rep
2197            .rows
2198            .iter()
2199            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2200            .unwrap();
2201        assert_eq!(tdd.sessions, 1);
2202        assert!(tdd.on_disk);
2203        let style = rep
2204            .rows
2205            .iter()
2206            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2207            .unwrap();
2208        assert_eq!(style.sessions, 1);
2209        assert!(style.on_disk);
2210    }
2211
2212    #[test]
2213    fn prune_sessions_removes_rules_used_rows() {
2214        let dir = TempDir::new().unwrap();
2215        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2216        let mut old = make_session("old_r");
2217        old.started_at_ms = 1_000;
2218        store.upsert_session(&old).unwrap();
2219        let mut ev = make_event("old_r", 0);
2220        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2221        store.append_event(&ev).unwrap();
2222
2223        store.prune_sessions_started_before(5_000).unwrap();
2224        let n: i64 = store
2225            .conn
2226            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2227            .unwrap();
2228        assert_eq!(n, 0);
2229    }
2230}