Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
19/// Rows below this use `sessions.started_at_ms` for time-window matching.
20const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23    "CREATE TABLE IF NOT EXISTS sessions (
24        id TEXT PRIMARY KEY,
25        agent TEXT NOT NULL,
26        model TEXT,
27        workspace TEXT NOT NULL,
28        started_at_ms INTEGER NOT NULL,
29        ended_at_ms INTEGER,
30        status TEXT NOT NULL,
31        trace_path TEXT NOT NULL
32    )",
33    "CREATE TABLE IF NOT EXISTS events (
34        id INTEGER PRIMARY KEY AUTOINCREMENT,
35        session_id TEXT NOT NULL,
36        seq INTEGER NOT NULL,
37        ts_ms INTEGER NOT NULL,
38        kind TEXT NOT NULL,
39        source TEXT NOT NULL,
40        tool TEXT,
41        tokens_in INTEGER,
42        tokens_out INTEGER,
43        cost_usd_e6 INTEGER,
44        payload TEXT NOT NULL
45    )",
46    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47    "CREATE TABLE IF NOT EXISTS files_touched (
48        id INTEGER PRIMARY KEY AUTOINCREMENT,
49        session_id TEXT NOT NULL,
50        path TEXT NOT NULL
51    )",
52    "CREATE TABLE IF NOT EXISTS skills_used (
53        id INTEGER PRIMARY KEY AUTOINCREMENT,
54        session_id TEXT NOT NULL,
55        skill TEXT NOT NULL
56    )",
57    "CREATE TABLE IF NOT EXISTS sync_outbox (
58        id INTEGER PRIMARY KEY AUTOINCREMENT,
59        session_id TEXT NOT NULL,
60        payload TEXT NOT NULL,
61        sent INTEGER NOT NULL DEFAULT 0
62    )",
63    "CREATE TABLE IF NOT EXISTS experiments (
64        id TEXT PRIMARY KEY,
65        name TEXT NOT NULL,
66        created_at_ms INTEGER NOT NULL,
67        metadata TEXT NOT NULL DEFAULT '{}'
68    )",
69    "CREATE TABLE IF NOT EXISTS experiment_tags (
70        experiment_id TEXT NOT NULL,
71        session_id TEXT NOT NULL,
72        variant TEXT NOT NULL,
73        PRIMARY KEY (experiment_id, session_id)
74    )",
75    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76    "CREATE TABLE IF NOT EXISTS sync_state (
77        k TEXT PRIMARY KEY,
78        v TEXT NOT NULL
79    )",
80    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82    "CREATE TABLE IF NOT EXISTS tool_spans (
83        span_id TEXT PRIMARY KEY,
84        session_id TEXT NOT NULL,
85        tool TEXT,
86        tool_call_id TEXT,
87        status TEXT NOT NULL,
88        started_at_ms INTEGER,
89        ended_at_ms INTEGER,
90        lead_time_ms INTEGER,
91        tokens_in INTEGER,
92        tokens_out INTEGER,
93        reasoning_tokens INTEGER,
94        cost_usd_e6 INTEGER,
95        paths_json TEXT NOT NULL DEFAULT '[]'
96    )",
97    "CREATE TABLE IF NOT EXISTS tool_span_paths (
98        span_id TEXT NOT NULL,
99        path TEXT NOT NULL,
100        PRIMARY KEY (span_id, path)
101    )",
102    "CREATE TABLE IF NOT EXISTS session_repo_binding (
103        session_id TEXT PRIMARY KEY,
104        start_commit TEXT,
105        end_commit TEXT,
106        branch TEXT,
107        dirty_start INTEGER,
108        dirty_end INTEGER,
109        repo_binding_source TEXT NOT NULL DEFAULT ''
110    )",
111    "CREATE TABLE IF NOT EXISTS repo_snapshots (
112        id TEXT PRIMARY KEY,
113        workspace TEXT NOT NULL,
114        head_commit TEXT,
115        dirty_fingerprint TEXT NOT NULL,
116        analyzer_version TEXT NOT NULL,
117        indexed_at_ms INTEGER NOT NULL,
118        dirty INTEGER NOT NULL DEFAULT 0,
119        graph_path TEXT NOT NULL
120    )",
121    "CREATE TABLE IF NOT EXISTS file_facts (
122        snapshot_id TEXT NOT NULL,
123        path TEXT NOT NULL,
124        language TEXT NOT NULL,
125        bytes INTEGER NOT NULL,
126        loc INTEGER NOT NULL,
127        sloc INTEGER NOT NULL,
128        complexity_total INTEGER NOT NULL,
129        max_fn_complexity INTEGER NOT NULL,
130        symbol_count INTEGER NOT NULL,
131        import_count INTEGER NOT NULL,
132        fan_in INTEGER NOT NULL,
133        fan_out INTEGER NOT NULL,
134        churn_30d INTEGER NOT NULL,
135        churn_90d INTEGER NOT NULL,
136        authors_90d INTEGER NOT NULL,
137        last_changed_ms INTEGER,
138        PRIMARY KEY (snapshot_id, path)
139    )",
140    "CREATE TABLE IF NOT EXISTS repo_edges (
141        snapshot_id TEXT NOT NULL,
142        from_id TEXT NOT NULL,
143        to_id TEXT NOT NULL,
144        kind TEXT NOT NULL,
145        weight INTEGER NOT NULL,
146        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147    )",
148    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
149    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
151    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152    "CREATE TABLE IF NOT EXISTS rules_used (
153        id INTEGER PRIMARY KEY AUTOINCREMENT,
154        session_id TEXT NOT NULL,
155        rule TEXT NOT NULL
156    )",
157    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
159    "CREATE TABLE IF NOT EXISTS remote_pull_state (
160        id INTEGER PRIMARY KEY CHECK (id = 1),
161        query_provider TEXT NOT NULL DEFAULT 'none',
162        cursor_json TEXT NOT NULL DEFAULT '',
163        last_success_ms INTEGER
164    )",
165    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166    "CREATE TABLE IF NOT EXISTS remote_sessions (
167        team_id TEXT NOT NULL,
168        workspace_hash TEXT NOT NULL,
169        session_id_hash TEXT NOT NULL,
170        json TEXT NOT NULL,
171        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172    )",
173    "CREATE TABLE IF NOT EXISTS remote_events (
174        team_id TEXT NOT NULL,
175        workspace_hash TEXT NOT NULL,
176        session_id_hash TEXT NOT NULL,
177        event_seq INTEGER NOT NULL,
178        json TEXT NOT NULL,
179        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180    )",
181    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182        team_id TEXT NOT NULL,
183        workspace_hash TEXT NOT NULL,
184        span_id_hash TEXT NOT NULL,
185        json TEXT NOT NULL,
186        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187    )",
188    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189        team_id TEXT NOT NULL,
190        workspace_hash TEXT NOT NULL,
191        snapshot_id_hash TEXT NOT NULL,
192        chunk_index INTEGER NOT NULL,
193        json TEXT NOT NULL,
194        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195    )",
196    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197        team_id TEXT NOT NULL,
198        workspace_hash TEXT NOT NULL,
199        fact_key TEXT NOT NULL,
200        json TEXT NOT NULL,
201        PRIMARY KEY (team_id, workspace_hash, fact_key)
202    )",
203    "CREATE TABLE IF NOT EXISTS session_evals (
204        id            TEXT    PRIMARY KEY,
205        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206        judge_model   TEXT    NOT NULL,
207        rubric_id     TEXT    NOT NULL,
208        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209        rationale     TEXT    NOT NULL,
210        flagged       INTEGER NOT NULL DEFAULT 0,
211        created_at_ms INTEGER NOT NULL
212    );
213    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
215    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
216        fingerprint   TEXT    PRIMARY KEY,
217        captured_at_ms INTEGER NOT NULL,
218        files_json    TEXT    NOT NULL,
219        total_bytes   INTEGER NOT NULL
220    )",
221    "CREATE TABLE IF NOT EXISTS session_feedback (
222        id TEXT PRIMARY KEY,
223        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
224        score INTEGER CHECK(score BETWEEN 1 AND 5),
225        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
226        note TEXT,
227        created_at_ms INTEGER NOT NULL
228    );
229    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
230    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
231    "CREATE TABLE IF NOT EXISTS session_outcomes (
232        session_id TEXT PRIMARY KEY NOT NULL,
233        test_passed INTEGER,
234        test_failed INTEGER,
235        test_skipped INTEGER,
236        build_ok INTEGER,
237        lint_errors INTEGER,
238        revert_lines_14d INTEGER,
239        pr_open INTEGER,
240        ci_ok INTEGER,
241        measured_at_ms INTEGER NOT NULL,
242        measure_error TEXT
243    )",
244    "CREATE TABLE IF NOT EXISTS session_samples (
245        session_id TEXT NOT NULL,
246        ts_ms INTEGER NOT NULL,
247        pid INTEGER NOT NULL,
248        cpu_percent REAL,
249        rss_bytes INTEGER,
250        PRIMARY KEY (session_id, ts_ms, pid)
251    )",
252    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
253];
254
255/// Per-workspace activity dashboard stats.
256#[derive(Clone)]
257pub struct InsightsStats {
258    pub total_sessions: u64,
259    pub running_sessions: u64,
260    pub total_events: u64,
261    /// (day label e.g. "Mon", count) last 7 days oldest first
262    pub sessions_by_day: Vec<(String, u64)>,
263    /// Recent sessions DESC by started_at, max 3; paired with event count
264    pub recent: Vec<(SessionRecord, u64)>,
265    /// Top tools by event count, max 5
266    pub top_tools: Vec<(String, u64)>,
267    pub total_cost_usd_e6: i64,
268    pub sessions_with_cost: u64,
269}
270
271/// Sync daemon / outbox status for `kaizen sync status`.
272pub struct SyncStatusSnapshot {
273    pub pending_outbox: u64,
274    pub last_success_ms: Option<u64>,
275    pub last_error: Option<String>,
276    pub consecutive_failures: u32,
277}
278
279/// Aggregate stats across sessions + events for a workspace.
280#[derive(serde::Serialize)]
281pub struct SummaryStats {
282    pub session_count: u64,
283    pub total_cost_usd_e6: i64,
284    pub by_agent: Vec<(String, u64)>,
285    pub by_model: Vec<(String, u64)>,
286    pub top_tools: Vec<(String, u64)>,
287}
288
289/// Skill vs Cursor rule for [`GuidancePerfRow`].
290#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
291#[serde(rename_all = "lowercase")]
292pub enum GuidanceKind {
293    Skill,
294    Rule,
295}
296
297/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
298#[derive(Clone, Debug, serde::Serialize)]
299pub struct GuidancePerfRow {
300    pub kind: GuidanceKind,
301    pub id: String,
302    pub sessions: u64,
303    pub sessions_pct: f64,
304    pub total_cost_usd_e6: i64,
305    pub avg_cost_per_session_usd: Option<f64>,
306    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
307    pub on_disk: bool,
308}
309
310/// Aggregated skill/rule adoption and cost proxy for a time window.
311#[derive(Clone, Debug, serde::Serialize)]
312pub struct GuidanceReport {
313    pub workspace: String,
314    pub window_start_ms: u64,
315    pub window_end_ms: u64,
316    pub sessions_in_window: u64,
317    pub workspace_avg_cost_per_session_usd: Option<f64>,
318    pub rows: Vec<GuidancePerfRow>,
319}
320
321/// Result of [`Store::prune_sessions_started_before`].
322#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
323pub struct PruneStats {
324    pub sessions_removed: u64,
325    pub events_removed: u64,
326}
327
328/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
329#[derive(Debug, Clone, Eq, PartialEq)]
330pub struct SessionOutcomeRow {
331    pub session_id: String,
332    pub test_passed: Option<i64>,
333    pub test_failed: Option<i64>,
334    pub test_skipped: Option<i64>,
335    pub build_ok: Option<bool>,
336    pub lint_errors: Option<i64>,
337    pub revert_lines_14d: Option<i64>,
338    pub pr_open: Option<i64>,
339    pub ci_ok: Option<bool>,
340    pub measured_at_ms: u64,
341    pub measure_error: Option<String>,
342}
343
344/// Aggregated process samples for retro (Tier D).
345#[derive(Debug, Clone)]
346pub struct SessionSampleAgg {
347    pub session_id: String,
348    pub sample_count: u64,
349    pub max_cpu_percent: f64,
350    pub max_rss_bytes: u64,
351}
352
353/// `sync_state` keys for agent rescan throttling and auto-prune.
354pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
355pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
356
357pub struct ToolSpanSyncRow {
358    pub span_id: String,
359    pub session_id: String,
360    pub tool: Option<String>,
361    pub tool_call_id: Option<String>,
362    pub status: String,
363    pub started_at_ms: Option<u64>,
364    pub ended_at_ms: Option<u64>,
365    pub lead_time_ms: Option<u64>,
366    pub tokens_in: Option<u32>,
367    pub tokens_out: Option<u32>,
368    pub reasoning_tokens: Option<u32>,
369    pub cost_usd_e6: Option<i64>,
370    pub paths: Vec<String>,
371}
372
373pub struct Store {
374    conn: Connection,
375}
376
377impl Store {
378    pub(crate) fn conn(&self) -> &Connection {
379        &self.conn
380    }
381
382    pub fn open(path: &Path) -> Result<Self> {
383        if let Some(parent) = path.parent() {
384            std::fs::create_dir_all(parent)?;
385        }
386        let conn =
387            Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
388        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
389        for sql in MIGRATIONS {
390            conn.execute_batch(sql)?;
391        }
392        ensure_schema_columns(&conn)?;
393        Ok(Self { conn })
394    }
395
396    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
397        self.conn.execute(
398            "INSERT INTO sessions (
399                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
400                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
401                prompt_fingerprint, parent_session_id, agent_version, os, arch,
402                repo_file_count, repo_total_loc
403             )
404             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
405                ?16, ?17, ?18, ?19, ?20, ?21)
406             ON CONFLICT(id) DO UPDATE SET
407               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
408               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
409               status=excluded.status, trace_path=excluded.trace_path,
410               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
411               branch=excluded.branch, dirty_start=excluded.dirty_start,
412               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
413               prompt_fingerprint=excluded.prompt_fingerprint,
414               parent_session_id=excluded.parent_session_id,
415               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
416               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
417            params![
418                s.id,
419                s.agent,
420                s.model,
421                s.workspace,
422                s.started_at_ms as i64,
423                s.ended_at_ms.map(|v| v as i64),
424                format!("{:?}", s.status),
425                s.trace_path,
426                s.start_commit,
427                s.end_commit,
428                s.branch,
429                s.dirty_start.map(bool_to_i64),
430                s.dirty_end.map(bool_to_i64),
431                s.repo_binding_source.clone().unwrap_or_default(),
432                s.prompt_fingerprint.as_deref(),
433                s.parent_session_id.as_deref(),
434                s.agent_version.as_deref(),
435                s.os.as_deref(),
436                s.arch.as_deref(),
437                s.repo_file_count.map(|v| v as i64),
438                s.repo_total_loc.map(|v| v as i64),
439            ],
440        )?;
441        self.conn.execute(
442            "INSERT INTO session_repo_binding (
443                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
444             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
445             ON CONFLICT(session_id) DO UPDATE SET
446                start_commit=excluded.start_commit,
447                end_commit=excluded.end_commit,
448                branch=excluded.branch,
449                dirty_start=excluded.dirty_start,
450                dirty_end=excluded.dirty_end,
451                repo_binding_source=excluded.repo_binding_source",
452            params![
453                s.id,
454                s.start_commit,
455                s.end_commit,
456                s.branch,
457                s.dirty_start.map(bool_to_i64),
458                s.dirty_end.map(bool_to_i64),
459                s.repo_binding_source.clone().unwrap_or_default(),
460            ],
461        )?;
462        Ok(())
463    }
464
465    /// Insert a minimal session row if none exists. Used by hook ingestion when
466    /// the first observed event is not `SessionStart` (hooks installed mid-session).
467    pub fn ensure_session_stub(
468        &self,
469        id: &str,
470        agent: &str,
471        workspace: &str,
472        started_at_ms: u64,
473    ) -> Result<()> {
474        self.conn.execute(
475            "INSERT OR IGNORE INTO sessions (
476                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
477                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
478                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
479             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
480                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
481            params![id, agent, workspace, started_at_ms as i64],
482        )?;
483        Ok(())
484    }
485
486    /// Next `seq` for a new event in this session (0 when there are no events yet).
487    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
488        let n: i64 = self.conn.query_row(
489            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
490            [session_id],
491            |r| r.get(0),
492        )?;
493        Ok(n as u64)
494    }
495
496    pub fn append_event(&self, e: &Event) -> Result<()> {
497        self.append_event_with_sync(e, None)
498    }
499
500    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
501    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
502        let payload = serde_json::to_string(&e.payload)?;
503        self.conn.execute(
504            "INSERT INTO events (
505                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
506                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
507                stop_reason, latency_ms, ttft_ms, retry_count,
508                context_used_tokens, context_max_tokens,
509                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
510             ) VALUES (
511                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
512                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
513             )
514             ON CONFLICT(session_id, seq) DO UPDATE SET
515                ts_ms = excluded.ts_ms,
516                ts_exact = excluded.ts_exact,
517                kind = excluded.kind,
518                source = excluded.source,
519                tool = excluded.tool,
520                tool_call_id = excluded.tool_call_id,
521                tokens_in = excluded.tokens_in,
522                tokens_out = excluded.tokens_out,
523                reasoning_tokens = excluded.reasoning_tokens,
524                cost_usd_e6 = excluded.cost_usd_e6,
525                payload = excluded.payload,
526                stop_reason = excluded.stop_reason,
527                latency_ms = excluded.latency_ms,
528                ttft_ms = excluded.ttft_ms,
529                retry_count = excluded.retry_count,
530                context_used_tokens = excluded.context_used_tokens,
531                context_max_tokens = excluded.context_max_tokens,
532                cache_creation_tokens = excluded.cache_creation_tokens,
533                cache_read_tokens = excluded.cache_read_tokens,
534                system_prompt_tokens = excluded.system_prompt_tokens",
535            params![
536                e.session_id,
537                e.seq as i64,
538                e.ts_ms as i64,
539                bool_to_i64(e.ts_exact),
540                format!("{:?}", e.kind),
541                format!("{:?}", e.source),
542                e.tool,
543                e.tool_call_id,
544                e.tokens_in.map(|v| v as i64),
545                e.tokens_out.map(|v| v as i64),
546                e.reasoning_tokens.map(|v| v as i64),
547                e.cost_usd_e6,
548                payload,
549                e.stop_reason,
550                e.latency_ms.map(|v| v as i64),
551                e.ttft_ms.map(|v| v as i64),
552                e.retry_count.map(|v| v as i64),
553                e.context_used_tokens.map(|v| v as i64),
554                e.context_max_tokens.map(|v| v as i64),
555                e.cache_creation_tokens.map(|v| v as i64),
556                e.cache_read_tokens.map(|v| v as i64),
557                e.system_prompt_tokens.map(|v| v as i64),
558            ],
559        )?;
560        if self.conn.changes() == 0 {
561            return Ok(());
562        }
563        index_event_derived(&self.conn, e)?;
564        rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
565        let Some(ctx) = ctx else {
566            return Ok(());
567        };
568        let sync = &ctx.sync;
569        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
570            return Ok(());
571        }
572        let Some(salt) = try_team_salt(sync) else {
573            tracing::warn!(
574                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
575            );
576            return Ok(());
577        };
578        if sync.sample_rate < 1.0 {
579            let u: f64 = rand::random();
580            if u > sync.sample_rate {
581                return Ok(());
582            }
583        }
584        let Some(session) = self.get_session(&e.session_id)? else {
585            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
586            return Ok(());
587        };
588        let mut outbound = outbound_event_from_row(e, &session, &salt);
589        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
590        let row = serde_json::to_string(&outbound)?;
591        self.conn.execute(
592            "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
593            params![e.session_id, row],
594        )?;
595        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
596        Ok(())
597    }
598
599    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
600        let mut stmt = self.conn.prepare(
601            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
602        )?;
603        let rows = stmt.query_map(params![limit as i64], |row| {
604            Ok((
605                row.get::<_, i64>(0)?,
606                row.get::<_, String>(1)?,
607                row.get::<_, String>(2)?,
608            ))
609        })?;
610        let mut out = Vec::new();
611        for r in rows {
612            out.push(r?);
613        }
614        Ok(out)
615    }
616
617    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
618        for id in ids {
619            self.conn
620                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
621        }
622        Ok(())
623    }
624
625    pub fn replace_outbox_rows(
626        &self,
627        owner_id: &str,
628        kind: &str,
629        payloads: &[String],
630    ) -> Result<()> {
631        self.conn.execute(
632            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
633            params![owner_id, kind],
634        )?;
635        for payload in payloads {
636            self.conn.execute(
637                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
638                params![owner_id, kind, payload],
639            )?;
640        }
641        Ok(())
642    }
643
644    pub fn outbox_pending_count(&self) -> Result<u64> {
645        let c: i64 =
646            self.conn
647                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
648                    r.get(0)
649                })?;
650        Ok(c as u64)
651    }
652
653    pub fn set_sync_state_ok(&self) -> Result<()> {
654        let now = now_ms().to_string();
655        self.conn.execute(
656            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
657             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
658            params![now],
659        )?;
660        self.conn.execute(
661            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
662             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
663            [],
664        )?;
665        self.conn
666            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
667        Ok(())
668    }
669
670    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
671        let prev: i64 = self
672            .conn
673            .query_row(
674                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
675                [],
676                |r| {
677                    let s: String = r.get(0)?;
678                    Ok(s.parse::<i64>().unwrap_or(0))
679                },
680            )
681            .optional()?
682            .unwrap_or(0);
683        let next = prev.saturating_add(1);
684        self.conn.execute(
685            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
686             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
687            params![msg],
688        )?;
689        self.conn.execute(
690            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
691             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
692            params![next.to_string()],
693        )?;
694        Ok(())
695    }
696
697    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
698        let pending_outbox = self.outbox_pending_count()?;
699        let last_success_ms = self
700            .conn
701            .query_row(
702                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
703                [],
704                |r| r.get::<_, String>(0),
705            )
706            .optional()?
707            .and_then(|s| s.parse().ok());
708        let last_error = self
709            .conn
710            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
711                r.get::<_, String>(0)
712            })
713            .optional()?;
714        let consecutive_failures = self
715            .conn
716            .query_row(
717                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
718                [],
719                |r| r.get::<_, String>(0),
720            )
721            .optional()?
722            .and_then(|s| s.parse().ok())
723            .unwrap_or(0);
724        Ok(SyncStatusSnapshot {
725            pending_outbox,
726            last_success_ms,
727            last_error,
728            consecutive_failures,
729        })
730    }
731
732    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
733        let row: Option<String> = self
734            .conn
735            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
736                r.get::<_, String>(0)
737            })
738            .optional()?;
739        Ok(row.and_then(|s| s.parse().ok()))
740    }
741
742    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
743        self.conn.execute(
744            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
745             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
746            params![key, v.to_string()],
747        )?;
748        Ok(())
749    }
750
751    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
752    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
753        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
754        let sessions_to_remove: i64 = tx.query_row(
755            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
756            params![cutoff_ms],
757            |r| r.get(0),
758        )?;
759        let events_to_remove: i64 = tx.query_row(
760            "SELECT COUNT(*) FROM events WHERE session_id IN \
761             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
762            params![cutoff_ms],
763            |r| r.get(0),
764        )?;
765
766        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
767        tx.execute(
768            &format!(
769                "DELETE FROM tool_span_paths WHERE span_id IN \
770                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
771            ),
772            params![cutoff_ms],
773        )?;
774        tx.execute(
775            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
776            params![cutoff_ms],
777        )?;
778        tx.execute(
779            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
780            params![cutoff_ms],
781        )?;
782        tx.execute(
783            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
784            params![cutoff_ms],
785        )?;
786        tx.execute(
787            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
788            params![cutoff_ms],
789        )?;
790        tx.execute(
791            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
792            params![cutoff_ms],
793        )?;
794        tx.execute(
795            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
796            params![cutoff_ms],
797        )?;
798        tx.execute(
799            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
800            params![cutoff_ms],
801        )?;
802        tx.execute(
803            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
804            params![cutoff_ms],
805        )?;
806        tx.execute(
807            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
808            params![cutoff_ms],
809        )?;
810        tx.execute(
811            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
812            params![cutoff_ms],
813        )?;
814        tx.execute(
815            "DELETE FROM sessions WHERE started_at_ms < ?1",
816            params![cutoff_ms],
817        )?;
818        tx.commit()?;
819        Ok(PruneStats {
820            sessions_removed: sessions_to_remove as u64,
821            events_removed: events_to_remove as u64,
822        })
823    }
824
825    /// Reclaim file space after large deletes (exclusive lock; can be slow).
826    pub fn vacuum(&self) -> Result<()> {
827        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
828        Ok(())
829    }
830
831    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
832        let mut stmt = self.conn.prepare(
833            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
834                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
835                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
836                    repo_file_count, repo_total_loc
837             FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
838        )?;
839        let rows = stmt.query_map(params![workspace], |row| {
840            Ok((
841                row.get::<_, String>(0)?,
842                row.get::<_, String>(1)?,
843                row.get::<_, Option<String>>(2)?,
844                row.get::<_, String>(3)?,
845                row.get::<_, i64>(4)?,
846                row.get::<_, Option<i64>>(5)?,
847                row.get::<_, String>(6)?,
848                row.get::<_, String>(7)?,
849                row.get::<_, Option<String>>(8)?,
850                row.get::<_, Option<String>>(9)?,
851                row.get::<_, Option<String>>(10)?,
852                row.get::<_, Option<i64>>(11)?,
853                row.get::<_, Option<i64>>(12)?,
854                row.get::<_, String>(13)?,
855                row.get::<_, Option<String>>(14)?,
856                row.get::<_, Option<String>>(15)?,
857                row.get::<_, Option<String>>(16)?,
858                row.get::<_, Option<String>>(17)?,
859                row.get::<_, Option<String>>(18)?,
860                row.get::<_, Option<i64>>(19)?,
861                row.get::<_, Option<i64>>(20)?,
862            ))
863        })?;
864
865        let mut out = Vec::new();
866        for row in rows {
867            let (
868                id,
869                agent,
870                model,
871                workspace,
872                started,
873                ended,
874                status_str,
875                trace,
876                start_commit,
877                end_commit,
878                branch,
879                dirty_start,
880                dirty_end,
881                source,
882                prompt_fingerprint,
883                parent_session_id,
884                agent_version,
885                os,
886                arch,
887                repo_file_count,
888                repo_total_loc,
889            ) = row?;
890            out.push(SessionRecord {
891                id,
892                agent,
893                model,
894                workspace,
895                started_at_ms: started as u64,
896                ended_at_ms: ended.map(|v| v as u64),
897                status: status_from_str(&status_str),
898                trace_path: trace,
899                start_commit,
900                end_commit,
901                branch,
902                dirty_start: dirty_start.map(i64_to_bool),
903                dirty_end: dirty_end.map(i64_to_bool),
904                repo_binding_source: empty_to_none(source),
905                prompt_fingerprint,
906                parent_session_id,
907                agent_version,
908                os,
909                arch,
910                repo_file_count: repo_file_count.map(|v| v as u32),
911                repo_total_loc: repo_total_loc.map(|v| v as u64),
912            });
913        }
914        Ok(out)
915    }
916
917    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
918        let session_count: i64 = self.conn.query_row(
919            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
920            params![workspace],
921            |r| r.get(0),
922        )?;
923
924        let total_cost: i64 = self.conn.query_row(
925            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
926             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
927            params![workspace],
928            |r| r.get(0),
929        )?;
930
931        let mut stmt = self.conn.prepare(
932            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
933        )?;
934        let by_agent: Vec<(String, u64)> = stmt
935            .query_map(params![workspace], |r| {
936                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
937            })?
938            .filter_map(|r| r.ok())
939            .collect();
940
941        let mut stmt = self.conn.prepare(
942            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
943        )?;
944        let by_model: Vec<(String, u64)> = stmt
945            .query_map(params![workspace], |r| {
946                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
947            })?
948            .filter_map(|r| r.ok())
949            .collect();
950
951        let mut stmt = self.conn.prepare(
952            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
953             WHERE s.workspace = ?1 AND tool IS NOT NULL
954             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
955        )?;
956        let top_tools: Vec<(String, u64)> = stmt
957            .query_map(params![workspace], |r| {
958                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
959            })?
960            .filter_map(|r| r.ok())
961            .collect();
962
963        Ok(SummaryStats {
964            session_count: session_count as u64,
965            total_cost_usd_e6: total_cost,
966            by_agent,
967            by_model,
968            top_tools,
969        })
970    }
971
972    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
973        let mut stmt = self.conn.prepare(
974            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
975                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
976                    stop_reason, latency_ms, ttft_ms, retry_count,
977                    context_used_tokens, context_max_tokens,
978                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
979             FROM events WHERE session_id = ?1 ORDER BY seq ASC",
980        )?;
981        let rows = stmt.query_map(params![session_id], |row| {
982            let payload_str: String = row.get(12)?;
983            Ok(Event {
984                session_id: row.get(0)?,
985                seq: row.get::<_, i64>(1)? as u64,
986                ts_ms: row.get::<_, i64>(2)? as u64,
987                ts_exact: row.get::<_, i64>(3)? != 0,
988                kind: kind_from_str(&row.get::<_, String>(4)?),
989                source: source_from_str(&row.get::<_, String>(5)?),
990                tool: row.get(6)?,
991                tool_call_id: row.get(7)?,
992                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
993                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
994                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
995                cost_usd_e6: row.get(11)?,
996                payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
997                stop_reason: row.get(13)?,
998                latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
999                ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
1000                retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
1001                context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
1002                context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
1003                cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
1004                cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
1005                system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
1006            })
1007        })?;
1008
1009        let mut events = Vec::new();
1010        for row in rows {
1011            events.push(row?);
1012        }
1013        Ok(events)
1014    }
1015
1016    /// Update only status for existing session.
1017    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1018        self.conn.execute(
1019            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1020            params![format!("{:?}", status), id],
1021        )?;
1022        Ok(())
1023    }
1024
1025    /// Workspace activity dashboard — feeds `cmd_insights`.
1026    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1027        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1028        Ok(InsightsStats {
1029            total_sessions: count_q(
1030                &self.conn,
1031                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1032                workspace,
1033            )?,
1034            running_sessions: count_q(
1035                &self.conn,
1036                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1037                workspace,
1038            )?,
1039            total_events: count_q(
1040                &self.conn,
1041                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1042                workspace,
1043            )?,
1044            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1045            recent: recent_sessions_3(&self.conn, workspace)?,
1046            top_tools: top_tools_5(&self.conn, workspace)?,
1047            total_cost_usd_e6,
1048            sessions_with_cost,
1049        })
1050    }
1051
1052    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1053    pub fn retro_events_in_window(
1054        &self,
1055        workspace: &str,
1056        start_ms: u64,
1057        end_ms: u64,
1058    ) -> Result<Vec<(SessionRecord, Event)>> {
1059        let mut stmt = self.conn.prepare(
1060            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1061                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1062                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1063                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1064                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1065                    s.repo_file_count, s.repo_total_loc,
1066                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1067                    e.context_used_tokens, e.context_max_tokens,
1068                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1069             FROM events e
1070             JOIN sessions s ON s.id = e.session_id
1071             WHERE s.workspace = ?1
1072               AND (
1073                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1074                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1075               )
1076             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1077        )?;
1078        let rows = stmt.query_map(
1079            params![
1080                workspace,
1081                start_ms as i64,
1082                end_ms as i64,
1083                SYNTHETIC_TS_CEILING_MS,
1084            ],
1085            |row| {
1086                let payload_str: String = row.get(12)?;
1087                let status_str: String = row.get(19)?;
1088                Ok((
1089                    SessionRecord {
1090                        id: row.get(13)?,
1091                        agent: row.get(14)?,
1092                        model: row.get(15)?,
1093                        workspace: row.get(16)?,
1094                        started_at_ms: row.get::<_, i64>(17)? as u64,
1095                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1096                        status: status_from_str(&status_str),
1097                        trace_path: row.get(20)?,
1098                        start_commit: row.get(21)?,
1099                        end_commit: row.get(22)?,
1100                        branch: row.get(23)?,
1101                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1102                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1103                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1104                        prompt_fingerprint: row.get(27)?,
1105                        parent_session_id: row.get(28)?,
1106                        agent_version: row.get(29)?,
1107                        os: row.get(30)?,
1108                        arch: row.get(31)?,
1109                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1110                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1111                    },
1112                    Event {
1113                        session_id: row.get(0)?,
1114                        seq: row.get::<_, i64>(1)? as u64,
1115                        ts_ms: row.get::<_, i64>(2)? as u64,
1116                        ts_exact: row.get::<_, i64>(3)? != 0,
1117                        kind: kind_from_str(&row.get::<_, String>(4)?),
1118                        source: source_from_str(&row.get::<_, String>(5)?),
1119                        tool: row.get(6)?,
1120                        tool_call_id: row.get(7)?,
1121                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1122                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1123                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1124                        cost_usd_e6: row.get(11)?,
1125                        payload: serde_json::from_str(&payload_str)
1126                            .unwrap_or(serde_json::Value::Null),
1127                        stop_reason: row.get(34)?,
1128                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1129                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1130                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1131                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1132                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1133                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1134                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1135                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1136                    },
1137                ))
1138            },
1139        )?;
1140
1141        let mut out = Vec::new();
1142        for r in rows {
1143            out.push(r?);
1144        }
1145        Ok(out)
1146    }
1147
1148    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1149    pub fn files_touched_in_window(
1150        &self,
1151        workspace: &str,
1152        start_ms: u64,
1153        end_ms: u64,
1154    ) -> Result<Vec<(String, String)>> {
1155        let mut stmt = self.conn.prepare(
1156            "SELECT DISTINCT ft.session_id, ft.path
1157             FROM files_touched ft
1158             JOIN sessions s ON s.id = ft.session_id
1159             WHERE s.workspace = ?1
1160               AND EXISTS (
1161                 SELECT 1 FROM events e
1162                 JOIN sessions ss ON ss.id = e.session_id
1163                 WHERE e.session_id = ft.session_id
1164                   AND (
1165                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1166                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1167                   )
1168               )
1169             ORDER BY ft.session_id, ft.path",
1170        )?;
1171        let out: Vec<(String, String)> = stmt
1172            .query_map(
1173                params![
1174                    workspace,
1175                    start_ms as i64,
1176                    end_ms as i64,
1177                    SYNTHETIC_TS_CEILING_MS,
1178                ],
1179                |r| Ok((r.get(0)?, r.get(1)?)),
1180            )?
1181            .filter_map(|r| r.ok())
1182            .collect();
1183        Ok(out)
1184    }
1185
1186    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1187    /// (any session with an indexed skill row; join events optional — use row existence).
1188    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1189        let mut stmt = self.conn.prepare(
1190            "SELECT DISTINCT su.skill
1191             FROM skills_used su
1192             JOIN sessions s ON s.id = su.session_id
1193             WHERE s.workspace = ?1
1194               AND EXISTS (
1195                 SELECT 1 FROM events e
1196                 JOIN sessions ss ON ss.id = e.session_id
1197                 WHERE e.session_id = su.session_id
1198                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1199               )
1200             ORDER BY su.skill",
1201        )?;
1202        let out: Vec<String> = stmt
1203            .query_map(
1204                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1205                |r| r.get::<_, String>(0),
1206            )?
1207            .filter_map(|r| r.ok())
1208            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1209            .collect();
1210        Ok(out)
1211    }
1212
1213    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1214    pub fn skills_used_in_window(
1215        &self,
1216        workspace: &str,
1217        start_ms: u64,
1218        end_ms: u64,
1219    ) -> Result<Vec<(String, String)>> {
1220        let mut stmt = self.conn.prepare(
1221            "SELECT DISTINCT su.session_id, su.skill
1222             FROM skills_used su
1223             JOIN sessions s ON s.id = su.session_id
1224             WHERE s.workspace = ?1
1225               AND EXISTS (
1226                 SELECT 1 FROM events e
1227                 JOIN sessions ss ON ss.id = e.session_id
1228                 WHERE e.session_id = su.session_id
1229                   AND (
1230                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1231                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1232                   )
1233               )
1234             ORDER BY su.session_id, su.skill",
1235        )?;
1236        let out: Vec<(String, String)> = stmt
1237            .query_map(
1238                params![
1239                    workspace,
1240                    start_ms as i64,
1241                    end_ms as i64,
1242                    SYNTHETIC_TS_CEILING_MS,
1243                ],
1244                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1245            )?
1246            .filter_map(|r| r.ok())
1247            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1248            .collect();
1249        Ok(out)
1250    }
1251
1252    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1253    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1254        let mut stmt = self.conn.prepare(
1255            "SELECT DISTINCT ru.rule
1256             FROM rules_used ru
1257             JOIN sessions s ON s.id = ru.session_id
1258             WHERE s.workspace = ?1
1259               AND EXISTS (
1260                 SELECT 1 FROM events e
1261                 JOIN sessions ss ON ss.id = e.session_id
1262                 WHERE e.session_id = ru.session_id
1263                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1264               )
1265             ORDER BY ru.rule",
1266        )?;
1267        let out: Vec<String> = stmt
1268            .query_map(
1269                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1270                |r| r.get::<_, String>(0),
1271            )?
1272            .filter_map(|r| r.ok())
1273            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1274            .collect();
1275        Ok(out)
1276    }
1277
1278    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1279    pub fn rules_used_in_window(
1280        &self,
1281        workspace: &str,
1282        start_ms: u64,
1283        end_ms: u64,
1284    ) -> Result<Vec<(String, String)>> {
1285        let mut stmt = self.conn.prepare(
1286            "SELECT DISTINCT ru.session_id, ru.rule
1287             FROM rules_used ru
1288             JOIN sessions s ON s.id = ru.session_id
1289             WHERE s.workspace = ?1
1290               AND EXISTS (
1291                 SELECT 1 FROM events e
1292                 JOIN sessions ss ON ss.id = e.session_id
1293                 WHERE e.session_id = ru.session_id
1294                   AND (
1295                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1296                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1297                   )
1298               )
1299             ORDER BY ru.session_id, ru.rule",
1300        )?;
1301        let out: Vec<(String, String)> = stmt
1302            .query_map(
1303                params![
1304                    workspace,
1305                    start_ms as i64,
1306                    end_ms as i64,
1307                    SYNTHETIC_TS_CEILING_MS,
1308                ],
1309                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1310            )?
1311            .filter_map(|r| r.ok())
1312            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1313            .collect();
1314        Ok(out)
1315    }
1316
1317    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1318    pub fn sessions_active_in_window(
1319        &self,
1320        workspace: &str,
1321        start_ms: u64,
1322        end_ms: u64,
1323    ) -> Result<HashSet<String>> {
1324        let mut stmt = self.conn.prepare(
1325            "SELECT DISTINCT s.id
1326             FROM sessions s
1327             WHERE s.workspace = ?1
1328               AND EXISTS (
1329                 SELECT 1 FROM events e
1330                 WHERE e.session_id = s.id
1331                   AND (
1332                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1333                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1334                   )
1335               )",
1336        )?;
1337        let out: HashSet<String> = stmt
1338            .query_map(
1339                params![
1340                    workspace,
1341                    start_ms as i64,
1342                    end_ms as i64,
1343                    SYNTHETIC_TS_CEILING_MS,
1344                ],
1345                |r| r.get(0),
1346            )?
1347            .filter_map(|r| r.ok())
1348            .collect();
1349        Ok(out)
1350    }
1351
1352    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1353    pub fn session_costs_usd_e6_in_window(
1354        &self,
1355        workspace: &str,
1356        start_ms: u64,
1357        end_ms: u64,
1358    ) -> Result<HashMap<String, i64>> {
1359        let mut stmt = self.conn.prepare(
1360            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1361             FROM events e
1362             JOIN sessions s ON s.id = e.session_id
1363             WHERE s.workspace = ?1
1364               AND (
1365                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1366                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1367               )
1368             GROUP BY e.session_id",
1369        )?;
1370        let rows: Vec<(String, i64)> = stmt
1371            .query_map(
1372                params![
1373                    workspace,
1374                    start_ms as i64,
1375                    end_ms as i64,
1376                    SYNTHETIC_TS_CEILING_MS,
1377                ],
1378                |r| Ok((r.get(0)?, r.get(1)?)),
1379            )?
1380            .filter_map(|r| r.ok())
1381            .collect();
1382        Ok(rows.into_iter().collect())
1383    }
1384
1385    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1386    pub fn guidance_report(
1387        &self,
1388        workspace: &str,
1389        window_start_ms: u64,
1390        window_end_ms: u64,
1391        skill_slugs_on_disk: &HashSet<String>,
1392        rule_slugs_on_disk: &HashSet<String>,
1393    ) -> Result<GuidanceReport> {
1394        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1395        let denom = active.len() as u64;
1396        let costs =
1397            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1398
1399        let workspace_avg_cost_per_session_usd = if denom > 0 {
1400            let total_e6: i64 = active
1401                .iter()
1402                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1403                .sum();
1404            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1405        } else {
1406            None
1407        };
1408
1409        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1410        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1411            skill_sessions.entry(skill).or_default().insert(sid);
1412        }
1413        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1414        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1415            rule_sessions.entry(rule).or_default().insert(sid);
1416        }
1417
1418        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1419
1420        let mut push_row =
1421            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1422                let sessions = sids.len() as u64;
1423                let sessions_pct = if denom > 0 {
1424                    sessions as f64 * 100.0 / denom as f64
1425                } else {
1426                    0.0
1427                };
1428                let total_cost_usd_e6: i64 = sids
1429                    .iter()
1430                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1431                    .sum();
1432                let avg_cost_per_session_usd = if sessions > 0 {
1433                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1434                } else {
1435                    None
1436                };
1437                let vs_workspace_avg_cost_per_session_usd =
1438                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1439                        (Some(avg), Some(w)) => Some(avg - w),
1440                        _ => None,
1441                    };
1442                rows.push(GuidancePerfRow {
1443                    kind,
1444                    id,
1445                    sessions,
1446                    sessions_pct,
1447                    total_cost_usd_e6,
1448                    avg_cost_per_session_usd,
1449                    vs_workspace_avg_cost_per_session_usd,
1450                    on_disk,
1451                });
1452            };
1453
1454        let mut seen_skills: HashSet<String> = HashSet::new();
1455        for (id, sids) in &skill_sessions {
1456            seen_skills.insert(id.clone());
1457            push_row(
1458                GuidanceKind::Skill,
1459                id.clone(),
1460                sids,
1461                skill_slugs_on_disk.contains(id),
1462            );
1463        }
1464        for slug in skill_slugs_on_disk {
1465            if seen_skills.contains(slug) {
1466                continue;
1467            }
1468            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1469        }
1470
1471        let mut seen_rules: HashSet<String> = HashSet::new();
1472        for (id, sids) in &rule_sessions {
1473            seen_rules.insert(id.clone());
1474            push_row(
1475                GuidanceKind::Rule,
1476                id.clone(),
1477                sids,
1478                rule_slugs_on_disk.contains(id),
1479            );
1480        }
1481        for slug in rule_slugs_on_disk {
1482            if seen_rules.contains(slug) {
1483                continue;
1484            }
1485            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1486        }
1487
1488        rows.sort_by(|a, b| {
1489            b.sessions
1490                .cmp(&a.sessions)
1491                .then_with(|| a.kind.cmp(&b.kind))
1492                .then_with(|| a.id.cmp(&b.id))
1493        });
1494
1495        Ok(GuidanceReport {
1496            workspace: workspace.to_string(),
1497            window_start_ms,
1498            window_end_ms,
1499            sessions_in_window: denom,
1500            workspace_avg_cost_per_session_usd,
1501            rows,
1502        })
1503    }
1504
1505    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1506        let mut stmt = self.conn.prepare(
1507            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1508                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1509                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1510                    repo_file_count, repo_total_loc
1511             FROM sessions WHERE id = ?1",
1512        )?;
1513        let mut rows = stmt.query_map(params![id], |row| {
1514            Ok((
1515                row.get::<_, String>(0)?,
1516                row.get::<_, String>(1)?,
1517                row.get::<_, Option<String>>(2)?,
1518                row.get::<_, String>(3)?,
1519                row.get::<_, i64>(4)?,
1520                row.get::<_, Option<i64>>(5)?,
1521                row.get::<_, String>(6)?,
1522                row.get::<_, String>(7)?,
1523                row.get::<_, Option<String>>(8)?,
1524                row.get::<_, Option<String>>(9)?,
1525                row.get::<_, Option<String>>(10)?,
1526                row.get::<_, Option<i64>>(11)?,
1527                row.get::<_, Option<i64>>(12)?,
1528                row.get::<_, String>(13)?,
1529                row.get::<_, Option<String>>(14)?,
1530                row.get::<_, Option<String>>(15)?,
1531                row.get::<_, Option<String>>(16)?,
1532                row.get::<_, Option<String>>(17)?,
1533                row.get::<_, Option<String>>(18)?,
1534                row.get::<_, Option<i64>>(19)?,
1535                row.get::<_, Option<i64>>(20)?,
1536            ))
1537        })?;
1538
1539        if let Some(row) = rows.next() {
1540            let (
1541                id,
1542                agent,
1543                model,
1544                workspace,
1545                started,
1546                ended,
1547                status_str,
1548                trace,
1549                start_commit,
1550                end_commit,
1551                branch,
1552                dirty_start,
1553                dirty_end,
1554                source,
1555                prompt_fingerprint,
1556                parent_session_id,
1557                agent_version,
1558                os,
1559                arch,
1560                repo_file_count,
1561                repo_total_loc,
1562            ) = row?;
1563            Ok(Some(SessionRecord {
1564                id,
1565                agent,
1566                model,
1567                workspace,
1568                started_at_ms: started as u64,
1569                ended_at_ms: ended.map(|v| v as u64),
1570                status: status_from_str(&status_str),
1571                trace_path: trace,
1572                start_commit,
1573                end_commit,
1574                branch,
1575                dirty_start: dirty_start.map(i64_to_bool),
1576                dirty_end: dirty_end.map(i64_to_bool),
1577                repo_binding_source: empty_to_none(source),
1578                prompt_fingerprint,
1579                parent_session_id,
1580                agent_version,
1581                os,
1582                arch,
1583                repo_file_count: repo_file_count.map(|v| v as u32),
1584                repo_total_loc: repo_total_loc.map(|v| v as u64),
1585            }))
1586        } else {
1587            Ok(None)
1588        }
1589    }
1590
1591    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1592        let mut stmt = self.conn.prepare(
1593            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1594                    indexed_at_ms, dirty, graph_path
1595             FROM repo_snapshots WHERE workspace = ?1
1596             ORDER BY indexed_at_ms DESC LIMIT 1",
1597        )?;
1598        let mut rows = stmt.query_map(params![workspace], |row| {
1599            Ok(RepoSnapshotRecord {
1600                id: row.get(0)?,
1601                workspace: row.get(1)?,
1602                head_commit: row.get(2)?,
1603                dirty_fingerprint: row.get(3)?,
1604                analyzer_version: row.get(4)?,
1605                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1606                dirty: row.get::<_, i64>(6)? != 0,
1607                graph_path: row.get(7)?,
1608            })
1609        })?;
1610        Ok(rows.next().transpose()?)
1611    }
1612
1613    pub fn save_repo_snapshot(
1614        &self,
1615        snapshot: &RepoSnapshotRecord,
1616        facts: &[FileFact],
1617        edges: &[RepoEdge],
1618    ) -> Result<()> {
1619        self.conn.execute(
1620            "INSERT INTO repo_snapshots (
1621                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1622                indexed_at_ms, dirty, graph_path
1623             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1624             ON CONFLICT(id) DO UPDATE SET
1625                workspace=excluded.workspace,
1626                head_commit=excluded.head_commit,
1627                dirty_fingerprint=excluded.dirty_fingerprint,
1628                analyzer_version=excluded.analyzer_version,
1629                indexed_at_ms=excluded.indexed_at_ms,
1630                dirty=excluded.dirty,
1631                graph_path=excluded.graph_path",
1632            params![
1633                snapshot.id,
1634                snapshot.workspace,
1635                snapshot.head_commit,
1636                snapshot.dirty_fingerprint,
1637                snapshot.analyzer_version,
1638                snapshot.indexed_at_ms as i64,
1639                bool_to_i64(snapshot.dirty),
1640                snapshot.graph_path,
1641            ],
1642        )?;
1643        self.conn.execute(
1644            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1645            params![snapshot.id],
1646        )?;
1647        self.conn.execute(
1648            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1649            params![snapshot.id],
1650        )?;
1651        for fact in facts {
1652            self.conn.execute(
1653                "INSERT INTO file_facts (
1654                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1655                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1656                    churn_30d, churn_90d, authors_90d, last_changed_ms
1657                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1658                params![
1659                    fact.snapshot_id,
1660                    fact.path,
1661                    fact.language,
1662                    fact.bytes as i64,
1663                    fact.loc as i64,
1664                    fact.sloc as i64,
1665                    fact.complexity_total as i64,
1666                    fact.max_fn_complexity as i64,
1667                    fact.symbol_count as i64,
1668                    fact.import_count as i64,
1669                    fact.fan_in as i64,
1670                    fact.fan_out as i64,
1671                    fact.churn_30d as i64,
1672                    fact.churn_90d as i64,
1673                    fact.authors_90d as i64,
1674                    fact.last_changed_ms.map(|v| v as i64),
1675                ],
1676            )?;
1677        }
1678        for edge in edges {
1679            self.conn.execute(
1680                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1681                 VALUES (?1, ?2, ?3, ?4, ?5)
1682                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1683                 DO UPDATE SET weight = weight + excluded.weight",
1684                params![
1685                    snapshot.id,
1686                    edge.from_path,
1687                    edge.to_path,
1688                    edge.kind,
1689                    edge.weight as i64,
1690                ],
1691            )?;
1692        }
1693        Ok(())
1694    }
1695
1696    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1697        let mut stmt = self.conn.prepare(
1698            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1699                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1700                    churn_30d, churn_90d, authors_90d, last_changed_ms
1701             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1702        )?;
1703        let rows = stmt.query_map(params![snapshot_id], |row| {
1704            Ok(FileFact {
1705                snapshot_id: row.get(0)?,
1706                path: row.get(1)?,
1707                language: row.get(2)?,
1708                bytes: row.get::<_, i64>(3)? as u64,
1709                loc: row.get::<_, i64>(4)? as u32,
1710                sloc: row.get::<_, i64>(5)? as u32,
1711                complexity_total: row.get::<_, i64>(6)? as u32,
1712                max_fn_complexity: row.get::<_, i64>(7)? as u32,
1713                symbol_count: row.get::<_, i64>(8)? as u32,
1714                import_count: row.get::<_, i64>(9)? as u32,
1715                fan_in: row.get::<_, i64>(10)? as u32,
1716                fan_out: row.get::<_, i64>(11)? as u32,
1717                churn_30d: row.get::<_, i64>(12)? as u32,
1718                churn_90d: row.get::<_, i64>(13)? as u32,
1719                authors_90d: row.get::<_, i64>(14)? as u32,
1720                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1721            })
1722        })?;
1723        Ok(rows.filter_map(|row| row.ok()).collect())
1724    }
1725
1726    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1727        let mut stmt = self.conn.prepare(
1728            "SELECT from_id, to_id, kind, weight
1729             FROM repo_edges WHERE snapshot_id = ?1
1730             ORDER BY kind, from_id, to_id",
1731        )?;
1732        let rows = stmt.query_map(params![snapshot_id], |row| {
1733            Ok(RepoEdge {
1734                from_path: row.get(0)?,
1735                to_path: row.get(1)?,
1736                kind: row.get(2)?,
1737                weight: row.get::<_, i64>(3)? as u32,
1738            })
1739        })?;
1740        Ok(rows.filter_map(|row| row.ok()).collect())
1741    }
1742
1743    pub fn tool_spans_in_window(
1744        &self,
1745        workspace: &str,
1746        start_ms: u64,
1747        end_ms: u64,
1748    ) -> Result<Vec<ToolSpanView>> {
1749        let mut stmt = self.conn.prepare(
1750            "SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1751                    ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
1752                    ts.parent_span_id, ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count
1753             FROM tool_spans ts
1754             JOIN sessions s ON s.id = ts.session_id
1755             WHERE s.workspace = ?1
1756               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1757               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1758             ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1759        )?;
1760        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1761            let paths_json: String = row.get(8)?;
1762            Ok(ToolSpanView {
1763                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1764                tool: row
1765                    .get::<_, Option<String>>(1)?
1766                    .unwrap_or_else(|| "unknown".into()),
1767                status: row.get(2)?,
1768                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1769                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1770                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1771                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1772                cost_usd_e6: row.get(7)?,
1773                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1774                parent_span_id: row.get(9)?,
1775                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1776                subtree_cost_usd_e6: row.get(11)?,
1777                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1778            })
1779        })?;
1780        Ok(rows.filter_map(|row| row.ok()).collect())
1781    }
1782
1783    pub fn session_span_tree(
1784        &self,
1785        session_id: &str,
1786    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
1787        let mut stmt = self.conn.prepare(
1788            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1789                    reasoning_tokens, cost_usd_e6, paths_json,
1790                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1791             FROM tool_spans
1792             WHERE session_id = ?1
1793             ORDER BY depth ASC, started_at_ms ASC",
1794        )?;
1795        let rows = stmt.query_map(params![session_id], |row| {
1796            let paths_json: String = row.get(8)?;
1797            Ok(crate::metrics::types::ToolSpanView {
1798                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1799                tool: row
1800                    .get::<_, Option<String>>(1)?
1801                    .unwrap_or_else(|| "unknown".into()),
1802                status: row.get(2)?,
1803                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1804                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1805                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1806                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1807                cost_usd_e6: row.get(7)?,
1808                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1809                parent_span_id: row.get(9)?,
1810                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1811                subtree_cost_usd_e6: row.get(11)?,
1812                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1813            })
1814        })?;
1815        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1816        Ok(crate::store::span_tree::build_tree(spans))
1817    }
1818
1819    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1820        let mut stmt = self.conn.prepare(
1821            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1822                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1823             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1824        )?;
1825        let rows = stmt.query_map(params![session_id], |row| {
1826            let paths_json: String = row.get(12)?;
1827            Ok(ToolSpanSyncRow {
1828                span_id: row.get(0)?,
1829                session_id: row.get(1)?,
1830                tool: row.get(2)?,
1831                tool_call_id: row.get(3)?,
1832                status: row.get(4)?,
1833                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1834                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1835                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1836                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1837                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1838                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1839                cost_usd_e6: row.get(11)?,
1840                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1841            })
1842        })?;
1843        Ok(rows.filter_map(|row| row.ok()).collect())
1844    }
1845
1846    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1847        self.conn.execute(
1848            "INSERT OR REPLACE INTO session_evals
1849             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1850             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1851            rusqlite::params![
1852                eval.id,
1853                eval.session_id,
1854                eval.judge_model,
1855                eval.rubric_id,
1856                eval.score,
1857                eval.rationale,
1858                eval.flagged as i64,
1859                eval.created_at_ms as i64,
1860            ],
1861        )?;
1862        Ok(())
1863    }
1864
1865    pub fn list_evals_in_window(
1866        &self,
1867        start_ms: u64,
1868        end_ms: u64,
1869    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1870        let mut stmt = self.conn.prepare(
1871            "SELECT id, session_id, judge_model, rubric_id, score,
1872                    rationale, flagged, created_at_ms
1873             FROM session_evals
1874             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1875             ORDER BY created_at_ms ASC",
1876        )?;
1877        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1878            Ok(crate::eval::types::EvalRow {
1879                id: r.get(0)?,
1880                session_id: r.get(1)?,
1881                judge_model: r.get(2)?,
1882                rubric_id: r.get(3)?,
1883                score: r.get(4)?,
1884                rationale: r.get(5)?,
1885                flagged: r.get::<_, i64>(6)? != 0,
1886                created_at_ms: r.get::<_, i64>(7)? as u64,
1887            })
1888        })?;
1889        rows.collect()
1890    }
1891
1892    pub fn list_evals_for_session(
1893        &self,
1894        session_id: &str,
1895    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1896        let mut stmt = self.conn.prepare(
1897            "SELECT id, session_id, judge_model, rubric_id, score,
1898                    rationale, flagged, created_at_ms
1899             FROM session_evals
1900             WHERE session_id = ?1
1901             ORDER BY created_at_ms DESC",
1902        )?;
1903        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1904            Ok(crate::eval::types::EvalRow {
1905                id: r.get(0)?,
1906                session_id: r.get(1)?,
1907                judge_model: r.get(2)?,
1908                rubric_id: r.get(3)?,
1909                score: r.get(4)?,
1910                rationale: r.get(5)?,
1911                flagged: r.get::<_, i64>(6)? != 0,
1912                created_at_ms: r.get::<_, i64>(7)? as u64,
1913            })
1914        })?;
1915        rows.collect()
1916    }
1917
1918    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
1919        use crate::feedback::types::FeedbackLabel;
1920        self.conn.execute(
1921            "INSERT OR REPLACE INTO session_feedback
1922             (id, session_id, score, label, note, created_at_ms)
1923             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1924            rusqlite::params![
1925                r.id,
1926                r.session_id,
1927                r.score.as_ref().map(|s| s.0 as i64),
1928                r.label.as_ref().map(FeedbackLabel::to_db_str),
1929                r.note,
1930                r.created_at_ms as i64,
1931            ],
1932        )?;
1933        let payload = serde_json::to_string(r).unwrap_or_default();
1934        self.conn.execute(
1935            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
1936             VALUES (?1, 'session_feedback', ?2, 0)",
1937            rusqlite::params![r.session_id, payload],
1938        )?;
1939        Ok(())
1940    }
1941
1942    pub fn list_feedback_in_window(
1943        &self,
1944        start_ms: u64,
1945        end_ms: u64,
1946    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
1947        let mut stmt = self.conn.prepare(
1948            "SELECT id, session_id, score, label, note, created_at_ms
1949             FROM session_feedback
1950             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1951             ORDER BY created_at_ms ASC",
1952        )?;
1953        let rows = stmt.query_map(
1954            rusqlite::params![start_ms as i64, end_ms as i64],
1955            feedback_row,
1956        )?;
1957        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1958    }
1959
1960    pub fn feedback_for_sessions(
1961        &self,
1962        ids: &[String],
1963    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
1964        if ids.is_empty() {
1965            return Ok(std::collections::HashMap::new());
1966        }
1967        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1968        let sql = format!(
1969            "SELECT id, session_id, score, label, note, created_at_ms
1970             FROM session_feedback WHERE session_id IN ({placeholders})
1971             ORDER BY created_at_ms DESC"
1972        );
1973        let mut stmt = self.conn.prepare(&sql)?;
1974        let params: Vec<&dyn rusqlite::ToSql> =
1975            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1976        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
1977        let mut map = std::collections::HashMap::new();
1978        for row in rows {
1979            let r = row?;
1980            map.entry(r.session_id.clone()).or_insert(r);
1981        }
1982        Ok(map)
1983    }
1984
1985    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
1986        self.conn.execute(
1987            "INSERT INTO session_outcomes (
1988                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
1989                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
1990            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
1991            ON CONFLICT(session_id) DO UPDATE SET
1992                test_passed=excluded.test_passed,
1993                test_failed=excluded.test_failed,
1994                test_skipped=excluded.test_skipped,
1995                build_ok=excluded.build_ok,
1996                lint_errors=excluded.lint_errors,
1997                revert_lines_14d=excluded.revert_lines_14d,
1998                pr_open=excluded.pr_open,
1999                ci_ok=excluded.ci_ok,
2000                measured_at_ms=excluded.measured_at_ms,
2001                measure_error=excluded.measure_error",
2002            params![
2003                row.session_id,
2004                row.test_passed,
2005                row.test_failed,
2006                row.test_skipped,
2007                row.build_ok.map(bool_to_i64),
2008                row.lint_errors,
2009                row.revert_lines_14d,
2010                row.pr_open,
2011                row.ci_ok.map(bool_to_i64),
2012                row.measured_at_ms as i64,
2013                row.measure_error.as_deref(),
2014            ],
2015        )?;
2016        Ok(())
2017    }
2018
2019    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2020        let mut stmt = self.conn.prepare(
2021            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2022                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2023             FROM session_outcomes WHERE session_id = ?1",
2024        )?;
2025        let row = stmt
2026            .query_row(params![session_id], outcome_row)
2027            .optional()?;
2028        Ok(row)
2029    }
2030
2031    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2032    pub fn list_session_outcomes_in_window(
2033        &self,
2034        workspace: &str,
2035        start_ms: u64,
2036        end_ms: u64,
2037    ) -> Result<Vec<SessionOutcomeRow>> {
2038        let mut stmt = self.conn.prepare(
2039            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2040                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2041             FROM session_outcomes o
2042             JOIN sessions s ON s.id = o.session_id
2043             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2044             ORDER BY o.measured_at_ms ASC",
2045        )?;
2046        let rows = stmt.query_map(
2047            params![workspace, start_ms as i64, end_ms as i64],
2048            outcome_row,
2049        )?;
2050        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2051    }
2052
2053    pub fn append_session_sample(
2054        &self,
2055        session_id: &str,
2056        ts_ms: u64,
2057        pid: u32,
2058        cpu_percent: Option<f64>,
2059        rss_bytes: Option<u64>,
2060    ) -> Result<()> {
2061        self.conn.execute(
2062            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2063             VALUES (?1, ?2, ?3, ?4, ?5)",
2064            params![
2065                session_id,
2066                ts_ms as i64,
2067                pid as i64,
2068                cpu_percent,
2069                rss_bytes.map(|b| b as i64)
2070            ],
2071        )?;
2072        Ok(())
2073    }
2074
2075    /// Per-session maxima for retro heuristics.
2076    pub fn list_session_sample_aggs_in_window(
2077        &self,
2078        workspace: &str,
2079        start_ms: u64,
2080        end_ms: u64,
2081    ) -> Result<Vec<SessionSampleAgg>> {
2082        let mut stmt = self.conn.prepare(
2083            "SELECT ss.session_id, COUNT(*) AS n,
2084                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2085             FROM session_samples ss
2086             JOIN sessions s ON s.id = ss.session_id
2087             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2088             GROUP BY ss.session_id",
2089        )?;
2090        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2091            let sid: String = r.get(0)?;
2092            let n: i64 = r.get(1)?;
2093            let max_cpu: Option<f64> = r.get(2)?;
2094            let max_rss: Option<i64> = r.get(3)?;
2095            Ok(SessionSampleAgg {
2096                session_id: sid,
2097                sample_count: n as u64,
2098                max_cpu_percent: max_cpu.unwrap_or(0.0),
2099                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2100            })
2101        })?;
2102        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2103    }
2104
2105    pub fn list_sessions_for_eval(
2106        &self,
2107        since_ms: u64,
2108        min_cost_usd: f64,
2109    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2110        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2111        let mut stmt = self.conn.prepare(
2112            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2113                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2114                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2115                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2116             FROM sessions s
2117             WHERE s.started_at_ms >= ?1
2118               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2119               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2120             ORDER BY s.started_at_ms DESC",
2121        )?;
2122        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2123            Ok((
2124                r.get::<_, String>(0)?,
2125                r.get::<_, String>(1)?,
2126                r.get::<_, Option<String>>(2)?,
2127                r.get::<_, String>(3)?,
2128                r.get::<_, i64>(4)?,
2129                r.get::<_, Option<i64>>(5)?,
2130                r.get::<_, String>(6)?,
2131                r.get::<_, String>(7)?,
2132                r.get::<_, Option<String>>(8)?,
2133                r.get::<_, Option<String>>(9)?,
2134                r.get::<_, Option<String>>(10)?,
2135                r.get::<_, Option<i64>>(11)?,
2136                r.get::<_, Option<i64>>(12)?,
2137                r.get::<_, Option<String>>(13)?,
2138                r.get::<_, Option<String>>(14)?,
2139                r.get::<_, Option<String>>(15)?,
2140                r.get::<_, Option<String>>(16)?,
2141                r.get::<_, Option<String>>(17)?,
2142                r.get::<_, Option<String>>(18)?,
2143                r.get::<_, Option<i64>>(19)?,
2144                r.get::<_, Option<i64>>(20)?,
2145            ))
2146        })?;
2147        let mut out = Vec::new();
2148        for row in rows {
2149            let (
2150                id,
2151                agent,
2152                model,
2153                workspace,
2154                started,
2155                ended,
2156                status_str,
2157                trace,
2158                start_commit,
2159                end_commit,
2160                branch,
2161                dirty_start,
2162                dirty_end,
2163                source,
2164                prompt_fingerprint,
2165                parent_session_id,
2166                agent_version,
2167                os,
2168                arch,
2169                repo_file_count,
2170                repo_total_loc,
2171            ) = row?;
2172            out.push(crate::core::event::SessionRecord {
2173                id,
2174                agent,
2175                model,
2176                workspace,
2177                started_at_ms: started as u64,
2178                ended_at_ms: ended.map(|v| v as u64),
2179                status: status_from_str(&status_str),
2180                trace_path: trace,
2181                start_commit,
2182                end_commit,
2183                branch,
2184                dirty_start: dirty_start.map(i64_to_bool),
2185                dirty_end: dirty_end.map(i64_to_bool),
2186                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2187                prompt_fingerprint,
2188                parent_session_id,
2189                agent_version,
2190                os,
2191                arch,
2192                repo_file_count: repo_file_count.map(|v| v as u32),
2193                repo_total_loc: repo_total_loc.map(|v| v as u64),
2194            });
2195        }
2196        Ok(out)
2197    }
2198
2199    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2200        self.conn.execute(
2201            "INSERT OR IGNORE INTO prompt_snapshots
2202             (fingerprint, captured_at_ms, files_json, total_bytes)
2203             VALUES (?1, ?2, ?3, ?4)",
2204            params![
2205                snap.fingerprint,
2206                snap.captured_at_ms as i64,
2207                snap.files_json,
2208                snap.total_bytes as i64
2209            ],
2210        )?;
2211        Ok(())
2212    }
2213
2214    pub fn get_prompt_snapshot(
2215        &self,
2216        fingerprint: &str,
2217    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2218        self.conn
2219            .query_row(
2220                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2221                 FROM prompt_snapshots WHERE fingerprint = ?1",
2222                params![fingerprint],
2223                |r| {
2224                    Ok(crate::prompt::PromptSnapshot {
2225                        fingerprint: r.get(0)?,
2226                        captured_at_ms: r.get::<_, i64>(1)? as u64,
2227                        files_json: r.get(2)?,
2228                        total_bytes: r.get::<_, i64>(3)? as u64,
2229                    })
2230                },
2231            )
2232            .optional()
2233            .map_err(Into::into)
2234    }
2235
2236    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2237        let mut stmt = self.conn.prepare(
2238            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2239             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2240        )?;
2241        let rows = stmt.query_map([], |r| {
2242            Ok(crate::prompt::PromptSnapshot {
2243                fingerprint: r.get(0)?,
2244                captured_at_ms: r.get::<_, i64>(1)? as u64,
2245                files_json: r.get(2)?,
2246                total_bytes: r.get::<_, i64>(3)? as u64,
2247            })
2248        })?;
2249        Ok(rows.filter_map(|r| r.ok()).collect())
2250    }
2251
2252    /// Sessions with a non-null prompt_fingerprint in the given window.
2253    pub fn sessions_with_prompt_fingerprint(
2254        &self,
2255        workspace: &str,
2256        start_ms: u64,
2257        end_ms: u64,
2258    ) -> Result<Vec<(String, String)>> {
2259        let mut stmt = self.conn.prepare(
2260            "SELECT id, prompt_fingerprint FROM sessions
2261             WHERE workspace = ?1
2262               AND started_at_ms >= ?2 AND started_at_ms < ?3
2263               AND prompt_fingerprint IS NOT NULL",
2264        )?;
2265        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2266            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2267        })?;
2268        Ok(rows.filter_map(|r| r.ok()).collect())
2269    }
2270}
2271
2272fn now_ms() -> u64 {
2273    std::time::SystemTime::now()
2274        .duration_since(std::time::UNIX_EPOCH)
2275        .unwrap_or_default()
2276        .as_millis() as u64
2277}
2278
2279fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2280    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2281}
2282
2283fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2284    let cost: i64 = conn.query_row(
2285        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2286        params![workspace], |r| r.get(0),
2287    )?;
2288    let with_cost: i64 = conn.query_row(
2289        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2290        params![workspace], |r| r.get(0),
2291    )?;
2292    Ok((cost, with_cost as u64))
2293}
2294
2295fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2296    let build_raw: Option<i64> = r.get(4)?;
2297    let ci_raw: Option<i64> = r.get(8)?;
2298    Ok(SessionOutcomeRow {
2299        session_id: r.get(0)?,
2300        test_passed: r.get(1)?,
2301        test_failed: r.get(2)?,
2302        test_skipped: r.get(3)?,
2303        build_ok: build_raw.map(|v| v != 0),
2304        lint_errors: r.get(5)?,
2305        revert_lines_14d: r.get(6)?,
2306        pr_open: r.get(7)?,
2307        ci_ok: ci_raw.map(|v| v != 0),
2308        measured_at_ms: r.get::<_, i64>(9)? as u64,
2309        measure_error: r.get(10)?,
2310    })
2311}
2312
2313fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2314    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2315    let score = r
2316        .get::<_, Option<i64>>(2)?
2317        .and_then(|v| FeedbackScore::new(v as u8));
2318    let label = r
2319        .get::<_, Option<String>>(3)?
2320        .and_then(|s| FeedbackLabel::from_str_opt(&s));
2321    Ok(FeedbackRecord {
2322        id: r.get(0)?,
2323        session_id: r.get(1)?,
2324        score,
2325        label,
2326        note: r.get(4)?,
2327        created_at_ms: r.get::<_, i64>(5)? as u64,
2328    })
2329}
2330
2331fn day_label(day_idx: u64) -> &'static str {
2332    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2333}
2334
2335fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2336    let week_ago = now.saturating_sub(7 * 86_400_000);
2337    let mut stmt = conn
2338        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2339    let days: Vec<u64> = stmt
2340        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2341        .filter_map(|r| r.ok())
2342        .map(|v| v as u64 / 86_400_000)
2343        .collect();
2344    let today = now / 86_400_000;
2345    Ok((0u64..7)
2346        .map(|i| {
2347            let d = today.saturating_sub(6 - i);
2348            (
2349                day_label(d).to_string(),
2350                days.iter().filter(|&&x| x == d).count() as u64,
2351            )
2352        })
2353        .collect())
2354}
2355
2356fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2357    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2358               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2359               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2360               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2361               COUNT(e.id) FROM sessions s \
2362               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2363               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2364    let mut stmt = conn.prepare(sql)?;
2365    let out: Vec<(SessionRecord, u64)> = stmt
2366        .query_map(params![workspace], |r| {
2367            let st: String = r.get(6)?;
2368            Ok((
2369                SessionRecord {
2370                    id: r.get(0)?,
2371                    agent: r.get(1)?,
2372                    model: r.get(2)?,
2373                    workspace: r.get(3)?,
2374                    started_at_ms: r.get::<_, i64>(4)? as u64,
2375                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2376                    status: status_from_str(&st),
2377                    trace_path: r.get(7)?,
2378                    start_commit: r.get(8)?,
2379                    end_commit: r.get(9)?,
2380                    branch: r.get(10)?,
2381                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2382                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2383                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2384                    prompt_fingerprint: r.get(14)?,
2385                    parent_session_id: r.get(15)?,
2386                    agent_version: r.get(16)?,
2387                    os: r.get(17)?,
2388                    arch: r.get(18)?,
2389                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2390                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2391                },
2392                r.get::<_, i64>(21)? as u64,
2393            ))
2394        })?
2395        .filter_map(|r| r.ok())
2396        .collect();
2397    Ok(out)
2398}
2399
2400fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2401    let mut stmt = conn.prepare(
2402        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2403         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2404    )?;
2405    let out: Vec<(String, u64)> = stmt
2406        .query_map(params![workspace], |r| {
2407            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2408        })?
2409        .filter_map(|r| r.ok())
2410        .collect();
2411    Ok(out)
2412}
2413
2414fn status_from_str(s: &str) -> SessionStatus {
2415    match s {
2416        "Running" => SessionStatus::Running,
2417        "Waiting" => SessionStatus::Waiting,
2418        "Idle" => SessionStatus::Idle,
2419        _ => SessionStatus::Done,
2420    }
2421}
2422
2423fn kind_from_str(s: &str) -> EventKind {
2424    match s {
2425        "ToolCall" => EventKind::ToolCall,
2426        "ToolResult" => EventKind::ToolResult,
2427        "Message" => EventKind::Message,
2428        "Error" => EventKind::Error,
2429        "Cost" => EventKind::Cost,
2430        "Hook" => EventKind::Hook,
2431        "Lifecycle" => EventKind::Lifecycle,
2432        _ => EventKind::Hook,
2433    }
2434}
2435
2436fn source_from_str(s: &str) -> EventSource {
2437    match s {
2438        "Tail" => EventSource::Tail,
2439        "Hook" => EventSource::Hook,
2440        _ => EventSource::Proxy,
2441    }
2442}
2443
2444fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2445    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2446    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2447    ensure_column(conn, "sessions", "branch", "TEXT")?;
2448    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2449    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2450    ensure_column(
2451        conn,
2452        "sessions",
2453        "repo_binding_source",
2454        "TEXT NOT NULL DEFAULT ''",
2455    )?;
2456    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2457    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2458    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2459    ensure_column(conn, "events", "stop_reason", "TEXT")?;
2460    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2461    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2462    ensure_column(conn, "events", "retry_count", "INTEGER")?;
2463    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2464    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2465    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2466    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2467    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2468    ensure_column(
2469        conn,
2470        "sync_outbox",
2471        "kind",
2472        "TEXT NOT NULL DEFAULT 'events'",
2473    )?;
2474    ensure_column(
2475        conn,
2476        "experiments",
2477        "state",
2478        "TEXT NOT NULL DEFAULT 'Draft'",
2479    )?;
2480    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2481    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2482    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2483    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2484    ensure_column(conn, "sessions", "os", "TEXT")?;
2485    ensure_column(conn, "sessions", "arch", "TEXT")?;
2486    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2487    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2488    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2489    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2490    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2491    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2492    conn.execute_batch(
2493        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2494         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2495    )?;
2496    Ok(())
2497}
2498
2499fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2500    if has_column(conn, table, column)? {
2501        return Ok(());
2502    }
2503    conn.execute(
2504        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2505        [],
2506    )?;
2507    Ok(())
2508}
2509
2510fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2511    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2512    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2513    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2514}
2515
2516fn bool_to_i64(v: bool) -> i64 {
2517    if v { 1 } else { 0 }
2518}
2519
2520fn i64_to_bool(v: i64) -> bool {
2521    v != 0
2522}
2523
2524fn empty_to_none(s: String) -> Option<String> {
2525    if s.is_empty() { None } else { Some(s) }
2526}
2527
2528#[cfg(test)]
2529mod tests {
2530    use super::*;
2531    use serde_json::json;
2532    use std::collections::HashSet;
2533    use tempfile::TempDir;
2534
2535    fn make_session(id: &str) -> SessionRecord {
2536        SessionRecord {
2537            id: id.to_string(),
2538            agent: "cursor".to_string(),
2539            model: None,
2540            workspace: "/ws".to_string(),
2541            started_at_ms: 1000,
2542            ended_at_ms: None,
2543            status: SessionStatus::Done,
2544            trace_path: "/trace".to_string(),
2545            start_commit: None,
2546            end_commit: None,
2547            branch: None,
2548            dirty_start: None,
2549            dirty_end: None,
2550            repo_binding_source: None,
2551            prompt_fingerprint: None,
2552            parent_session_id: None,
2553            agent_version: None,
2554            os: None,
2555            arch: None,
2556            repo_file_count: None,
2557            repo_total_loc: None,
2558        }
2559    }
2560
2561    fn make_event(session_id: &str, seq: u64) -> Event {
2562        Event {
2563            session_id: session_id.to_string(),
2564            seq,
2565            ts_ms: 1000 + seq * 100,
2566            ts_exact: false,
2567            kind: EventKind::ToolCall,
2568            source: EventSource::Tail,
2569            tool: Some("read_file".to_string()),
2570            tool_call_id: Some(format!("call_{seq}")),
2571            tokens_in: None,
2572            tokens_out: None,
2573            reasoning_tokens: None,
2574            cost_usd_e6: None,
2575            stop_reason: None,
2576            latency_ms: None,
2577            ttft_ms: None,
2578            retry_count: None,
2579            context_used_tokens: None,
2580            context_max_tokens: None,
2581            cache_creation_tokens: None,
2582            cache_read_tokens: None,
2583            system_prompt_tokens: None,
2584            payload: json!({}),
2585        }
2586    }
2587
2588    #[test]
2589    fn open_and_wal_mode() {
2590        let dir = TempDir::new().unwrap();
2591        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2592        let mode: String = store
2593            .conn
2594            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2595            .unwrap();
2596        assert_eq!(mode, "wal");
2597    }
2598
2599    #[test]
2600    fn upsert_and_get_session() {
2601        let dir = TempDir::new().unwrap();
2602        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2603        let s = make_session("s1");
2604        store.upsert_session(&s).unwrap();
2605
2606        let got = store.get_session("s1").unwrap().unwrap();
2607        assert_eq!(got.id, "s1");
2608        assert_eq!(got.status, SessionStatus::Done);
2609    }
2610
2611    #[test]
2612    fn append_and_list_events_round_trip() {
2613        let dir = TempDir::new().unwrap();
2614        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2615        let s = make_session("s2");
2616        store.upsert_session(&s).unwrap();
2617        store.append_event(&make_event("s2", 0)).unwrap();
2618        store.append_event(&make_event("s2", 1)).unwrap();
2619
2620        let sessions = store.list_sessions("/ws").unwrap();
2621        assert_eq!(sessions.len(), 1);
2622        assert_eq!(sessions[0].id, "s2");
2623    }
2624
2625    #[test]
2626    fn summary_stats_empty() {
2627        let dir = TempDir::new().unwrap();
2628        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2629        let stats = store.summary_stats("/ws").unwrap();
2630        assert_eq!(stats.session_count, 0);
2631        assert_eq!(stats.total_cost_usd_e6, 0);
2632    }
2633
2634    #[test]
2635    fn summary_stats_counts_sessions() {
2636        let dir = TempDir::new().unwrap();
2637        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2638        store.upsert_session(&make_session("a")).unwrap();
2639        store.upsert_session(&make_session("b")).unwrap();
2640        let stats = store.summary_stats("/ws").unwrap();
2641        assert_eq!(stats.session_count, 2);
2642        assert_eq!(stats.by_agent.len(), 1);
2643        assert_eq!(stats.by_agent[0].0, "cursor");
2644        assert_eq!(stats.by_agent[0].1, 2);
2645    }
2646
2647    #[test]
2648    fn list_events_for_session_round_trip() {
2649        let dir = TempDir::new().unwrap();
2650        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2651        store.upsert_session(&make_session("s4")).unwrap();
2652        store.append_event(&make_event("s4", 0)).unwrap();
2653        store.append_event(&make_event("s4", 1)).unwrap();
2654        let events = store.list_events_for_session("s4").unwrap();
2655        assert_eq!(events.len(), 2);
2656        assert_eq!(events[0].seq, 0);
2657        assert_eq!(events[1].seq, 1);
2658    }
2659
2660    #[test]
2661    fn append_event_dedup() {
2662        let dir = TempDir::new().unwrap();
2663        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2664        store.upsert_session(&make_session("s5")).unwrap();
2665        store.append_event(&make_event("s5", 0)).unwrap();
2666        // Duplicate — should be silently ignored
2667        store.append_event(&make_event("s5", 0)).unwrap();
2668        let events = store.list_events_for_session("s5").unwrap();
2669        assert_eq!(events.len(), 1);
2670    }
2671
2672    #[test]
2673    fn upsert_idempotent() {
2674        let dir = TempDir::new().unwrap();
2675        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2676        let mut s = make_session("s3");
2677        store.upsert_session(&s).unwrap();
2678        s.status = SessionStatus::Running;
2679        store.upsert_session(&s).unwrap();
2680
2681        let got = store.get_session("s3").unwrap().unwrap();
2682        assert_eq!(got.status, SessionStatus::Running);
2683    }
2684
2685    #[test]
2686    fn append_event_indexes_path_from_payload() {
2687        let dir = TempDir::new().unwrap();
2688        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2689        store.upsert_session(&make_session("sx")).unwrap();
2690        let mut ev = make_event("sx", 0);
2691        ev.payload = json!({"input": {"path": "src/lib.rs"}});
2692        store.append_event(&ev).unwrap();
2693        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2694        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2695    }
2696
2697    #[test]
2698    fn update_session_status_changes_status() {
2699        let dir = TempDir::new().unwrap();
2700        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2701        store.upsert_session(&make_session("s6")).unwrap();
2702        store
2703            .update_session_status("s6", SessionStatus::Running)
2704            .unwrap();
2705        let got = store.get_session("s6").unwrap().unwrap();
2706        assert_eq!(got.status, SessionStatus::Running);
2707    }
2708
2709    #[test]
2710    fn prune_sessions_removes_old_rows_and_keeps_recent() {
2711        let dir = TempDir::new().unwrap();
2712        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2713        let mut old = make_session("old");
2714        old.started_at_ms = 1_000;
2715        let mut new = make_session("new");
2716        new.started_at_ms = 9_000_000_000_000;
2717        store.upsert_session(&old).unwrap();
2718        store.upsert_session(&new).unwrap();
2719        store.append_event(&make_event("old", 0)).unwrap();
2720
2721        let stats = store.prune_sessions_started_before(5_000).unwrap();
2722        assert_eq!(
2723            stats,
2724            PruneStats {
2725                sessions_removed: 1,
2726                events_removed: 1,
2727            }
2728        );
2729        assert!(store.get_session("old").unwrap().is_none());
2730        assert!(store.get_session("new").unwrap().is_some());
2731        let sessions = store.list_sessions("/ws").unwrap();
2732        assert_eq!(sessions.len(), 1);
2733        assert_eq!(sessions[0].id, "new");
2734    }
2735
2736    #[test]
2737    fn append_event_indexes_rules_from_payload() {
2738        let dir = TempDir::new().unwrap();
2739        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2740        store.upsert_session(&make_session("sr")).unwrap();
2741        let mut ev = make_event("sr", 0);
2742        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2743        store.append_event(&ev).unwrap();
2744        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2745        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2746    }
2747
2748    #[test]
2749    fn guidance_report_counts_skill_and_rule_sessions() {
2750        let dir = TempDir::new().unwrap();
2751        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2752        store.upsert_session(&make_session("sx")).unwrap();
2753        let mut ev = make_event("sx", 0);
2754        ev.payload =
2755            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2756        ev.cost_usd_e6 = Some(500_000);
2757        store.append_event(&ev).unwrap();
2758
2759        let mut skill_slugs = HashSet::new();
2760        skill_slugs.insert("tdd".into());
2761        let mut rule_slugs = HashSet::new();
2762        rule_slugs.insert("style".into());
2763
2764        let rep = store
2765            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2766            .unwrap();
2767        assert_eq!(rep.sessions_in_window, 1);
2768        let tdd = rep
2769            .rows
2770            .iter()
2771            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2772            .unwrap();
2773        assert_eq!(tdd.sessions, 1);
2774        assert!(tdd.on_disk);
2775        let style = rep
2776            .rows
2777            .iter()
2778            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2779            .unwrap();
2780        assert_eq!(style.sessions, 1);
2781        assert!(style.on_disk);
2782    }
2783
2784    #[test]
2785    fn prune_sessions_removes_rules_used_rows() {
2786        let dir = TempDir::new().unwrap();
2787        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2788        let mut old = make_session("old_r");
2789        old.started_at_ms = 1_000;
2790        store.upsert_session(&old).unwrap();
2791        let mut ev = make_event("old_r", 0);
2792        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2793        store.append_event(&ev).unwrap();
2794
2795        store.prune_sessions_started_before(5_000).unwrap();
2796        let n: i64 = store
2797            .conn
2798            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2799            .unwrap();
2800        assert_eq!(n, 0);
2801    }
2802}