Skip to main content

kaizen/store/
sqlite.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
9use crate::store::tool_span_index::{
10    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
11};
12use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
13use crate::sync::context::SyncIngestContext;
14use crate::sync::outbound::outbound_event_from_row;
15use crate::sync::redact::redact_payload;
16use crate::sync::smart::enqueue_tool_spans_for_session;
17use anyhow::{Context, Result};
18use rusqlite::types::Value;
19use rusqlite::{
20    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
21};
22use std::cell::RefCell;
23use std::collections::{HashMap, HashSet};
24use std::path::{Path, PathBuf};
25
26/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
27/// Rows below this use `sessions.started_at_ms` for time-window matching.
28const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
29const DEFAULT_MMAP_MB: u64 = 256;
30const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
31    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
32    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
33    repo_file_count, repo_total_loc FROM sessions";
34
35const MIGRATIONS: &[&str] = &[
36    "CREATE TABLE IF NOT EXISTS sessions (
37        id TEXT PRIMARY KEY,
38        agent TEXT NOT NULL,
39        model TEXT,
40        workspace TEXT NOT NULL,
41        started_at_ms INTEGER NOT NULL,
42        ended_at_ms INTEGER,
43        status TEXT NOT NULL,
44        trace_path TEXT NOT NULL
45    )",
46    "CREATE TABLE IF NOT EXISTS events (
47        id INTEGER PRIMARY KEY AUTOINCREMENT,
48        session_id TEXT NOT NULL,
49        seq INTEGER NOT NULL,
50        ts_ms INTEGER NOT NULL,
51        kind TEXT NOT NULL,
52        source TEXT NOT NULL,
53        tool TEXT,
54        tokens_in INTEGER,
55        tokens_out INTEGER,
56        cost_usd_e6 INTEGER,
57        payload TEXT NOT NULL
58    )",
59    "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
60    "CREATE TABLE IF NOT EXISTS files_touched (
61        id INTEGER PRIMARY KEY AUTOINCREMENT,
62        session_id TEXT NOT NULL,
63        path TEXT NOT NULL
64    )",
65    "CREATE TABLE IF NOT EXISTS skills_used (
66        id INTEGER PRIMARY KEY AUTOINCREMENT,
67        session_id TEXT NOT NULL,
68        skill TEXT NOT NULL
69    )",
70    "CREATE TABLE IF NOT EXISTS sync_outbox (
71        id INTEGER PRIMARY KEY AUTOINCREMENT,
72        session_id TEXT NOT NULL,
73        payload TEXT NOT NULL,
74        sent INTEGER NOT NULL DEFAULT 0
75    )",
76    "CREATE TABLE IF NOT EXISTS experiments (
77        id TEXT PRIMARY KEY,
78        name TEXT NOT NULL,
79        created_at_ms INTEGER NOT NULL,
80        metadata TEXT NOT NULL DEFAULT '{}'
81    )",
82    "CREATE TABLE IF NOT EXISTS experiment_tags (
83        experiment_id TEXT NOT NULL,
84        session_id TEXT NOT NULL,
85        variant TEXT NOT NULL,
86        PRIMARY KEY (experiment_id, session_id)
87    )",
88    "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
89    "CREATE TABLE IF NOT EXISTS sync_state (
90        k TEXT PRIMARY KEY,
91        v TEXT NOT NULL
92    )",
93    "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
94    "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
95    "CREATE TABLE IF NOT EXISTS tool_spans (
96        span_id TEXT PRIMARY KEY,
97        session_id TEXT NOT NULL,
98        tool TEXT,
99        tool_call_id TEXT,
100        status TEXT NOT NULL,
101        started_at_ms INTEGER,
102        ended_at_ms INTEGER,
103        lead_time_ms INTEGER,
104        tokens_in INTEGER,
105        tokens_out INTEGER,
106        reasoning_tokens INTEGER,
107        cost_usd_e6 INTEGER,
108        paths_json TEXT NOT NULL DEFAULT '[]'
109    )",
110    "CREATE TABLE IF NOT EXISTS tool_span_paths (
111        span_id TEXT NOT NULL,
112        path TEXT NOT NULL,
113        PRIMARY KEY (span_id, path)
114    )",
115    "CREATE TABLE IF NOT EXISTS session_repo_binding (
116        session_id TEXT PRIMARY KEY,
117        start_commit TEXT,
118        end_commit TEXT,
119        branch TEXT,
120        dirty_start INTEGER,
121        dirty_end INTEGER,
122        repo_binding_source TEXT NOT NULL DEFAULT ''
123    )",
124    "CREATE TABLE IF NOT EXISTS repo_snapshots (
125        id TEXT PRIMARY KEY,
126        workspace TEXT NOT NULL,
127        head_commit TEXT,
128        dirty_fingerprint TEXT NOT NULL,
129        analyzer_version TEXT NOT NULL,
130        indexed_at_ms INTEGER NOT NULL,
131        dirty INTEGER NOT NULL DEFAULT 0,
132        graph_path TEXT NOT NULL
133    )",
134    "CREATE TABLE IF NOT EXISTS file_facts (
135        snapshot_id TEXT NOT NULL,
136        path TEXT NOT NULL,
137        language TEXT NOT NULL,
138        bytes INTEGER NOT NULL,
139        loc INTEGER NOT NULL,
140        sloc INTEGER NOT NULL,
141        complexity_total INTEGER NOT NULL,
142        max_fn_complexity INTEGER NOT NULL,
143        symbol_count INTEGER NOT NULL,
144        import_count INTEGER NOT NULL,
145        fan_in INTEGER NOT NULL,
146        fan_out INTEGER NOT NULL,
147        churn_30d INTEGER NOT NULL,
148        churn_90d INTEGER NOT NULL,
149        authors_90d INTEGER NOT NULL,
150        last_changed_ms INTEGER,
151        PRIMARY KEY (snapshot_id, path)
152    )",
153    "CREATE TABLE IF NOT EXISTS repo_edges (
154        snapshot_id TEXT NOT NULL,
155        from_id TEXT NOT NULL,
156        to_id TEXT NOT NULL,
157        kind TEXT NOT NULL,
158        weight INTEGER NOT NULL,
159        PRIMARY KEY (snapshot_id, from_id, to_id, kind)
160    )",
161    // Speed workspace-scoped `insights` / `summary` (sessions filter before joining events)
162    "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
163    // `ORDER BY started_at_ms` for a workspace (list_sessions, recent_sessions_3)
164    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
165    "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
166        ON sessions(workspace, started_at_ms DESC, id ASC)",
167    "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
168        ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
169    "CREATE TABLE IF NOT EXISTS rules_used (
170        id INTEGER PRIMARY KEY AUTOINCREMENT,
171        session_id TEXT NOT NULL,
172        rule TEXT NOT NULL
173    )",
174    "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
175    // Provider pull cache (single-row state + per-kind rows; atomic refresh = txn + clear + insert)
176    "CREATE TABLE IF NOT EXISTS remote_pull_state (
177        id INTEGER PRIMARY KEY CHECK (id = 1),
178        query_provider TEXT NOT NULL DEFAULT 'none',
179        cursor_json TEXT NOT NULL DEFAULT '',
180        last_success_ms INTEGER
181    )",
182    "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
183    "CREATE TABLE IF NOT EXISTS remote_sessions (
184        team_id TEXT NOT NULL,
185        workspace_hash TEXT NOT NULL,
186        session_id_hash TEXT NOT NULL,
187        json TEXT NOT NULL,
188        PRIMARY KEY (team_id, workspace_hash, session_id_hash)
189    )",
190    "CREATE TABLE IF NOT EXISTS remote_events (
191        team_id TEXT NOT NULL,
192        workspace_hash TEXT NOT NULL,
193        session_id_hash TEXT NOT NULL,
194        event_seq INTEGER NOT NULL,
195        json TEXT NOT NULL,
196        PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
197    )",
198    "CREATE TABLE IF NOT EXISTS remote_tool_spans (
199        team_id TEXT NOT NULL,
200        workspace_hash TEXT NOT NULL,
201        span_id_hash TEXT NOT NULL,
202        json TEXT NOT NULL,
203        PRIMARY KEY (team_id, workspace_hash, span_id_hash)
204    )",
205    "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
206        team_id TEXT NOT NULL,
207        workspace_hash TEXT NOT NULL,
208        snapshot_id_hash TEXT NOT NULL,
209        chunk_index INTEGER NOT NULL,
210        json TEXT NOT NULL,
211        PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
212    )",
213    "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
214        team_id TEXT NOT NULL,
215        workspace_hash TEXT NOT NULL,
216        fact_key TEXT NOT NULL,
217        json TEXT NOT NULL,
218        PRIMARY KEY (team_id, workspace_hash, fact_key)
219    )",
220    "CREATE TABLE IF NOT EXISTS session_evals (
221        id            TEXT    PRIMARY KEY,
222        session_id    TEXT    NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
223        judge_model   TEXT    NOT NULL,
224        rubric_id     TEXT    NOT NULL,
225        score         REAL    NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
226        rationale     TEXT    NOT NULL,
227        flagged       INTEGER NOT NULL DEFAULT 0,
228        created_at_ms INTEGER NOT NULL
229    );
230    CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
231    CREATE INDEX IF NOT EXISTS session_evals_rubric  ON session_evals(rubric_id, score)",
232    "CREATE TABLE IF NOT EXISTS prompt_snapshots (
233        fingerprint   TEXT    PRIMARY KEY,
234        captured_at_ms INTEGER NOT NULL,
235        files_json    TEXT    NOT NULL,
236        total_bytes   INTEGER NOT NULL
237    )",
238    "CREATE TABLE IF NOT EXISTS session_feedback (
239        id TEXT PRIMARY KEY,
240        session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
241        score INTEGER CHECK(score BETWEEN 1 AND 5),
242        label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
243        note TEXT,
244        created_at_ms INTEGER NOT NULL
245    );
246    CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
247    CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
248    "CREATE TABLE IF NOT EXISTS session_outcomes (
249        session_id TEXT PRIMARY KEY NOT NULL,
250        test_passed INTEGER,
251        test_failed INTEGER,
252        test_skipped INTEGER,
253        build_ok INTEGER,
254        lint_errors INTEGER,
255        revert_lines_14d INTEGER,
256        pr_open INTEGER,
257        ci_ok INTEGER,
258        measured_at_ms INTEGER NOT NULL,
259        measure_error TEXT
260    )",
261    "CREATE TABLE IF NOT EXISTS session_samples (
262        session_id TEXT NOT NULL,
263        ts_ms INTEGER NOT NULL,
264        pid INTEGER NOT NULL,
265        cpu_percent REAL,
266        rss_bytes INTEGER,
267        PRIMARY KEY (session_id, ts_ms, pid)
268    )",
269    "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
270    "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
271    "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
272    "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
273    "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
274    "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
275    "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
276];
277
278/// Per-workspace activity dashboard stats.
279#[derive(Clone)]
280pub struct InsightsStats {
281    pub total_sessions: u64,
282    pub running_sessions: u64,
283    pub total_events: u64,
284    /// (day label e.g. "Mon", count) last 7 days oldest first
285    pub sessions_by_day: Vec<(String, u64)>,
286    /// Recent sessions DESC by started_at, max 3; paired with event count
287    pub recent: Vec<(SessionRecord, u64)>,
288    /// Top tools by event count, max 5
289    pub top_tools: Vec<(String, u64)>,
290    pub total_cost_usd_e6: i64,
291    pub sessions_with_cost: u64,
292}
293
294/// Sync daemon / outbox status for `kaizen sync status`.
295pub struct SyncStatusSnapshot {
296    pub pending_outbox: u64,
297    pub last_success_ms: Option<u64>,
298    pub last_error: Option<String>,
299    pub consecutive_failures: u32,
300}
301
302/// Aggregate stats across sessions + events for a workspace.
303#[derive(serde::Serialize)]
304pub struct SummaryStats {
305    pub session_count: u64,
306    pub total_cost_usd_e6: i64,
307    pub by_agent: Vec<(String, u64)>,
308    pub by_model: Vec<(String, u64)>,
309    pub top_tools: Vec<(String, u64)>,
310}
311
312/// Skill vs Cursor rule for [`GuidancePerfRow`].
313#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
314#[serde(rename_all = "lowercase")]
315pub enum GuidanceKind {
316    Skill,
317    Rule,
318}
319
320/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
321#[derive(Clone, Debug, serde::Serialize)]
322pub struct GuidancePerfRow {
323    pub kind: GuidanceKind,
324    pub id: String,
325    pub sessions: u64,
326    pub sessions_pct: f64,
327    pub total_cost_usd_e6: i64,
328    pub avg_cost_per_session_usd: Option<f64>,
329    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
330    pub on_disk: bool,
331}
332
333/// Aggregated skill/rule adoption and cost proxy for a time window.
334#[derive(Clone, Debug, serde::Serialize)]
335pub struct GuidanceReport {
336    pub workspace: String,
337    pub window_start_ms: u64,
338    pub window_end_ms: u64,
339    pub sessions_in_window: u64,
340    pub workspace_avg_cost_per_session_usd: Option<f64>,
341    pub rows: Vec<GuidancePerfRow>,
342}
343
344/// Result of [`Store::prune_sessions_started_before`].
345#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
346pub struct PruneStats {
347    pub sessions_removed: u64,
348    pub events_removed: u64,
349}
350
351/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
352#[derive(Debug, Clone, Eq, PartialEq)]
353pub struct SessionOutcomeRow {
354    pub session_id: String,
355    pub test_passed: Option<i64>,
356    pub test_failed: Option<i64>,
357    pub test_skipped: Option<i64>,
358    pub build_ok: Option<bool>,
359    pub lint_errors: Option<i64>,
360    pub revert_lines_14d: Option<i64>,
361    pub pr_open: Option<i64>,
362    pub ci_ok: Option<bool>,
363    pub measured_at_ms: u64,
364    pub measure_error: Option<String>,
365}
366
367/// Aggregated process samples for retro (Tier D).
368#[derive(Debug, Clone)]
369pub struct SessionSampleAgg {
370    pub session_id: String,
371    pub sample_count: u64,
372    pub max_cpu_percent: f64,
373    pub max_rss_bytes: u64,
374}
375
376/// `sync_state` keys for agent rescan throttling and auto-prune.
377pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
378pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
379pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
380
381pub struct ToolSpanSyncRow {
382    pub span_id: String,
383    pub session_id: String,
384    pub tool: Option<String>,
385    pub tool_call_id: Option<String>,
386    pub status: String,
387    pub started_at_ms: Option<u64>,
388    pub ended_at_ms: Option<u64>,
389    pub lead_time_ms: Option<u64>,
390    pub tokens_in: Option<u32>,
391    pub tokens_out: Option<u32>,
392    pub reasoning_tokens: Option<u32>,
393    pub cost_usd_e6: Option<i64>,
394    pub paths: Vec<String>,
395}
396
397#[derive(Debug, Clone, Copy, Eq, PartialEq)]
398pub enum StoreOpenMode {
399    ReadWrite,
400    ReadOnlyQuery,
401}
402
403#[derive(Debug, Clone)]
404pub struct SessionStatusRow {
405    pub id: String,
406    pub status: SessionStatus,
407    pub ended_at_ms: Option<u64>,
408}
409
410#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
411pub struct SessionFilter {
412    pub agent_prefix: Option<String>,
413    pub status: Option<SessionStatus>,
414    pub since_ms: Option<u64>,
415}
416
417#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
418pub struct SessionPage {
419    pub rows: Vec<SessionRecord>,
420    pub total: usize,
421    pub next_offset: Option<usize>,
422}
423
424#[derive(Clone)]
425struct SpanTreeCacheEntry {
426    session_id: String,
427    last_event_seq: Option<u64>,
428    nodes: Vec<crate::store::span_tree::SpanNode>,
429}
430
431pub struct Store {
432    conn: Connection,
433    root: PathBuf,
434    hot_log: RefCell<Option<HotLog>>,
435    search_writer: RefCell<Option<crate::search::PendingWriter>>,
436    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
437    projector: RefCell<Projector>,
438}
439
440impl Store {
441    pub(crate) fn conn(&self) -> &Connection {
442        &self.conn
443    }
444
445    pub fn open(path: &Path) -> Result<Self> {
446        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
447    }
448
449    pub fn open_read_only(path: &Path) -> Result<Self> {
450        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
451    }
452
453    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
454        if let Some(parent) = path.parent() {
455            std::fs::create_dir_all(parent)?;
456        }
457        let conn = match mode {
458            StoreOpenMode::ReadWrite => Connection::open(path),
459            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
460                path,
461                OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
462            ),
463        }
464        .with_context(|| format!("open db: {}", path.display()))?;
465        apply_pragmas(&conn, mode)?;
466        if mode == StoreOpenMode::ReadWrite {
467            for sql in MIGRATIONS {
468                conn.execute_batch(sql)?;
469            }
470            ensure_schema_columns(&conn)?;
471        }
472        let store = Self {
473            conn,
474            root: path
475                .parent()
476                .unwrap_or_else(|| Path::new("."))
477                .to_path_buf(),
478            hot_log: RefCell::new(None),
479            search_writer: RefCell::new(None),
480            span_tree_cache: RefCell::new(None),
481            projector: RefCell::new(Projector::default()),
482        };
483        if mode == StoreOpenMode::ReadWrite {
484            store.warm_projector()?;
485        }
486        Ok(store)
487    }
488
489    fn invalidate_span_tree_cache(&self) {
490        *self.span_tree_cache.borrow_mut() = None;
491    }
492
493    fn warm_projector(&self) -> Result<()> {
494        let ids = self.running_session_ids()?;
495        let mut projector = self.projector.borrow_mut();
496        for id in ids {
497            for event in self.list_events_for_session(&id)? {
498                let _ = projector.apply(&event);
499            }
500        }
501        Ok(())
502    }
503
504    pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
505        self.conn.execute(
506            "INSERT INTO sessions (
507                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
508                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
509                prompt_fingerprint, parent_session_id, agent_version, os, arch,
510                repo_file_count, repo_total_loc
511             )
512             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
513                ?16, ?17, ?18, ?19, ?20, ?21)
514             ON CONFLICT(id) DO UPDATE SET
515               agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
516               started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
517               status=excluded.status, trace_path=excluded.trace_path,
518               start_commit=excluded.start_commit, end_commit=excluded.end_commit,
519               branch=excluded.branch, dirty_start=excluded.dirty_start,
520               dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
521               prompt_fingerprint=excluded.prompt_fingerprint,
522               parent_session_id=excluded.parent_session_id,
523               agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
524               repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
525            params![
526                s.id,
527                s.agent,
528                s.model,
529                s.workspace,
530                s.started_at_ms as i64,
531                s.ended_at_ms.map(|v| v as i64),
532                format!("{:?}", s.status),
533                s.trace_path,
534                s.start_commit,
535                s.end_commit,
536                s.branch,
537                s.dirty_start.map(bool_to_i64),
538                s.dirty_end.map(bool_to_i64),
539                s.repo_binding_source.clone().unwrap_or_default(),
540                s.prompt_fingerprint.as_deref(),
541                s.parent_session_id.as_deref(),
542                s.agent_version.as_deref(),
543                s.os.as_deref(),
544                s.arch.as_deref(),
545                s.repo_file_count.map(|v| v as i64),
546                s.repo_total_loc.map(|v| v as i64),
547            ],
548        )?;
549        self.conn.execute(
550            "INSERT INTO session_repo_binding (
551                session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
552             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
553             ON CONFLICT(session_id) DO UPDATE SET
554                start_commit=excluded.start_commit,
555                end_commit=excluded.end_commit,
556                branch=excluded.branch,
557                dirty_start=excluded.dirty_start,
558                dirty_end=excluded.dirty_end,
559                repo_binding_source=excluded.repo_binding_source",
560            params![
561                s.id,
562                s.start_commit,
563                s.end_commit,
564                s.branch,
565                s.dirty_start.map(bool_to_i64),
566                s.dirty_end.map(bool_to_i64),
567                s.repo_binding_source.clone().unwrap_or_default(),
568            ],
569        )?;
570        Ok(())
571    }
572
573    /// Insert a minimal session row if none exists. Used by hook ingestion when
574    /// the first observed event is not `SessionStart` (hooks installed mid-session).
575    pub fn ensure_session_stub(
576        &self,
577        id: &str,
578        agent: &str,
579        workspace: &str,
580        started_at_ms: u64,
581    ) -> Result<()> {
582        self.conn.execute(
583            "INSERT OR IGNORE INTO sessions (
584                id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
585                start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
586                prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
587             ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
588                NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
589            params![id, agent, workspace, started_at_ms as i64],
590        )?;
591        Ok(())
592    }
593
594    /// Next `seq` for a new event in this session (0 when there are no events yet).
595    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
596        let n: i64 = self.conn.query_row(
597            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
598            [session_id],
599            |r| r.get(0),
600        )?;
601        Ok(n as u64)
602    }
603
604    pub fn append_event(&self, e: &Event) -> Result<()> {
605        self.append_event_with_sync(e, None)
606    }
607
608    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
609    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
610        let last_before = if projector_legacy_mode() {
611            None
612        } else {
613            self.last_event_seq_for_session(&e.session_id)?
614        };
615        let payload = serde_json::to_string(&e.payload)?;
616        self.conn.execute(
617            "INSERT INTO events (
618                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
619                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
620                stop_reason, latency_ms, ttft_ms, retry_count,
621                context_used_tokens, context_max_tokens,
622                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
623             ) VALUES (
624                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
625                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
626             )
627             ON CONFLICT(session_id, seq) DO UPDATE SET
628                ts_ms = excluded.ts_ms,
629                ts_exact = excluded.ts_exact,
630                kind = excluded.kind,
631                source = excluded.source,
632                tool = excluded.tool,
633                tool_call_id = excluded.tool_call_id,
634                tokens_in = excluded.tokens_in,
635                tokens_out = excluded.tokens_out,
636                reasoning_tokens = excluded.reasoning_tokens,
637                cost_usd_e6 = excluded.cost_usd_e6,
638                payload = excluded.payload,
639                stop_reason = excluded.stop_reason,
640                latency_ms = excluded.latency_ms,
641                ttft_ms = excluded.ttft_ms,
642                retry_count = excluded.retry_count,
643                context_used_tokens = excluded.context_used_tokens,
644                context_max_tokens = excluded.context_max_tokens,
645                cache_creation_tokens = excluded.cache_creation_tokens,
646                cache_read_tokens = excluded.cache_read_tokens,
647                system_prompt_tokens = excluded.system_prompt_tokens",
648            params![
649                e.session_id,
650                e.seq as i64,
651                e.ts_ms as i64,
652                bool_to_i64(e.ts_exact),
653                format!("{:?}", e.kind),
654                format!("{:?}", e.source),
655                e.tool,
656                e.tool_call_id,
657                e.tokens_in.map(|v| v as i64),
658                e.tokens_out.map(|v| v as i64),
659                e.reasoning_tokens.map(|v| v as i64),
660                e.cost_usd_e6,
661                payload,
662                e.stop_reason,
663                e.latency_ms.map(|v| v as i64),
664                e.ttft_ms.map(|v| v as i64),
665                e.retry_count.map(|v| v as i64),
666                e.context_used_tokens.map(|v| v as i64),
667                e.context_max_tokens.map(|v| v as i64),
668                e.cache_creation_tokens.map(|v| v as i64),
669                e.cache_read_tokens.map(|v| v as i64),
670                e.system_prompt_tokens.map(|v| v as i64),
671            ],
672        )?;
673        if self.conn.changes() == 0 {
674            return Ok(());
675        }
676        self.append_hot_event(e)?;
677        if projector_legacy_mode() {
678            index_event_derived(&self.conn, e)?;
679            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
680            self.invalidate_span_tree_cache();
681        } else if last_before.is_some_and(|last| e.seq <= last) {
682            self.replay_projector_session(&e.session_id)?;
683        } else {
684            let deltas = self.projector.borrow_mut().apply(e);
685            self.apply_projector_events(&deltas)?;
686            let expired = self
687                .projector
688                .borrow_mut()
689                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
690            self.apply_projector_events(&expired)?;
691            if is_stop_event(e) {
692                let flushed = self
693                    .projector
694                    .borrow_mut()
695                    .flush_session(&e.session_id, e.ts_ms);
696                self.apply_projector_events(&flushed)?;
697            }
698            self.invalidate_span_tree_cache();
699        }
700        self.append_search_event(e);
701        let Some(ctx) = ctx else {
702            return Ok(());
703        };
704        let sync = &ctx.sync;
705        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
706            return Ok(());
707        }
708        let Some(salt) = try_team_salt(sync) else {
709            tracing::warn!(
710                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
711            );
712            return Ok(());
713        };
714        if sync.sample_rate < 1.0 {
715            let u: f64 = rand::random();
716            if u > sync.sample_rate {
717                return Ok(());
718            }
719        }
720        let Some(session) = self.get_session(&e.session_id)? else {
721            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
722            return Ok(());
723        };
724        let mut outbound = outbound_event_from_row(e, &session, &salt);
725        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
726        let row = serde_json::to_string(&outbound)?;
727        self.outbox()?.append(&e.session_id, "events", &row)?;
728        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
729        Ok(())
730    }
731
732    fn append_hot_event(&self, e: &Event) -> Result<()> {
733        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
734            return Ok(());
735        }
736        let mut slot = self.hot_log.borrow_mut();
737        if slot.is_none() {
738            *slot = Some(HotLog::open(&self.root)?);
739        }
740        if let Some(log) = slot.as_mut() {
741            log.append(e)?;
742        }
743        Ok(())
744    }
745
746    fn append_search_event(&self, e: &Event) {
747        if let Err(err) = self.try_append_search_event(e) {
748            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
749            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
750        }
751    }
752
753    fn try_append_search_event(&self, e: &Event) -> Result<()> {
754        let Some(session) = self.get_session(&e.session_id)? else {
755            return Ok(());
756        };
757        let workspace = PathBuf::from(&session.workspace);
758        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
759        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
760        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
761            return Ok(());
762        };
763        let mut slot = self.search_writer.borrow_mut();
764        if slot.is_none() {
765            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
766        }
767        slot.as_mut().expect("writer").add(&doc)
768    }
769
770    pub fn flush_search(&self) -> Result<()> {
771        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
772            writer.commit()?;
773        }
774        Ok(())
775    }
776
777    fn outbox(&self) -> Result<Outbox> {
778        Outbox::open(&self.root)
779    }
780
781    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
782        if projector_legacy_mode() {
783            rebuild_tool_spans_for_session(&self.conn, session_id)?;
784            self.invalidate_span_tree_cache();
785            return Ok(());
786        }
787        let deltas = self
788            .projector
789            .borrow_mut()
790            .flush_session(session_id, now_ms);
791        if self.apply_projector_events(&deltas)? {
792            self.invalidate_span_tree_cache();
793        }
794        Ok(())
795    }
796
797    fn replay_projector_session(&self, session_id: &str) -> Result<()> {
798        clear_session_spans(&self.conn, session_id)?;
799        self.projector.borrow_mut().reset_session(session_id);
800        let events = self.list_events_for_session(session_id)?;
801        let mut changed = false;
802        for event in &events {
803            let deltas = self.projector.borrow_mut().apply(event);
804            changed |= self.apply_projector_events(&deltas)?;
805        }
806        if self
807            .get_session(session_id)?
808            .is_some_and(|session| session.status == SessionStatus::Done)
809        {
810            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
811            let deltas = self
812                .projector
813                .borrow_mut()
814                .flush_session(session_id, now_ms);
815            changed |= self.apply_projector_events(&deltas)?;
816        }
817        if changed {
818            self.invalidate_span_tree_cache();
819        }
820        Ok(())
821    }
822
823    fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
824        let mut changed = false;
825        for delta in deltas {
826            match delta {
827                ProjectorEvent::SpanClosed(span, sample) => {
828                    upsert_tool_span_record(&self.conn, span)?;
829                    tracing::debug!(
830                        session_id = %sample.session_id,
831                        span_id = %sample.span_id,
832                        tool = ?sample.tool,
833                        lead_time_ms = ?sample.lead_time_ms,
834                        tokens_in = ?sample.tokens_in,
835                        tokens_out = ?sample.tokens_out,
836                        reasoning_tokens = ?sample.reasoning_tokens,
837                        cost_usd_e6 = ?sample.cost_usd_e6,
838                        paths = ?sample.paths,
839                        "tool span closed"
840                    );
841                    changed = true;
842                }
843                ProjectorEvent::SpanPatched(span) => {
844                    upsert_tool_span_record(&self.conn, span)?;
845                    changed = true;
846                }
847                ProjectorEvent::FileTouched { session, path } => {
848                    self.conn.execute(
849                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
850                        params![session, path],
851                    )?;
852                    changed = true;
853                }
854                ProjectorEvent::SkillUsed { session, skill } => {
855                    self.conn.execute(
856                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
857                        params![session, skill],
858                    )?;
859                    changed = true;
860                }
861                ProjectorEvent::RuleUsed { session, rule } => {
862                    self.conn.execute(
863                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
864                        params![session, rule],
865                    )?;
866                    changed = true;
867                }
868            }
869        }
870        Ok(changed)
871    }
872
873    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
874        let rows = self.outbox()?.list_pending(limit)?;
875        if !rows.is_empty() {
876            return Ok(rows);
877        }
878        let mut stmt = self.conn.prepare(
879            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
880        )?;
881        let rows = stmt.query_map(params![limit as i64], |row| {
882            Ok((
883                row.get::<_, i64>(0)?,
884                row.get::<_, String>(1)?,
885                row.get::<_, String>(2)?,
886            ))
887        })?;
888        let mut out = Vec::new();
889        for r in rows {
890            out.push(r?);
891        }
892        Ok(out)
893    }
894
895    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
896        self.outbox()?.delete_ids(ids)?;
897        for id in ids {
898            self.conn
899                .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
900        }
901        Ok(())
902    }
903
904    pub fn replace_outbox_rows(
905        &self,
906        owner_id: &str,
907        kind: &str,
908        payloads: &[String],
909    ) -> Result<()> {
910        self.outbox()?.replace(owner_id, kind, payloads)?;
911        self.conn.execute(
912            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
913            params![owner_id, kind],
914        )?;
915        for payload in payloads {
916            self.conn.execute(
917                "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
918                params![owner_id, kind, payload],
919            )?;
920        }
921        Ok(())
922    }
923
924    pub fn outbox_pending_count(&self) -> Result<u64> {
925        let redb = self.outbox()?.pending_count()?;
926        if redb > 0 {
927            return Ok(redb);
928        }
929        let c: i64 =
930            self.conn
931                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
932                    r.get(0)
933                })?;
934        Ok(c as u64)
935    }
936
937    pub fn set_sync_state_ok(&self) -> Result<()> {
938        let now = now_ms().to_string();
939        self.conn.execute(
940            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
941             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
942            params![now],
943        )?;
944        self.conn.execute(
945            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
946             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
947            [],
948        )?;
949        self.conn
950            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
951        Ok(())
952    }
953
954    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
955        let prev: i64 = self
956            .conn
957            .query_row(
958                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
959                [],
960                |r| {
961                    let s: String = r.get(0)?;
962                    Ok(s.parse::<i64>().unwrap_or(0))
963                },
964            )
965            .optional()?
966            .unwrap_or(0);
967        let next = prev.saturating_add(1);
968        self.conn.execute(
969            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
970             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
971            params![msg],
972        )?;
973        self.conn.execute(
974            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
975             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
976            params![next.to_string()],
977        )?;
978        Ok(())
979    }
980
981    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
982        let pending_outbox = self.outbox_pending_count()?;
983        let last_success_ms = self
984            .conn
985            .query_row(
986                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
987                [],
988                |r| r.get::<_, String>(0),
989            )
990            .optional()?
991            .and_then(|s| s.parse().ok());
992        let last_error = self
993            .conn
994            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
995                r.get::<_, String>(0)
996            })
997            .optional()?;
998        let consecutive_failures = self
999            .conn
1000            .query_row(
1001                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1002                [],
1003                |r| r.get::<_, String>(0),
1004            )
1005            .optional()?
1006            .and_then(|s| s.parse().ok())
1007            .unwrap_or(0);
1008        Ok(SyncStatusSnapshot {
1009            pending_outbox,
1010            last_success_ms,
1011            last_error,
1012            consecutive_failures,
1013        })
1014    }
1015
1016    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1017        let row: Option<String> = self
1018            .conn
1019            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1020                r.get::<_, String>(0)
1021            })
1022            .optional()?;
1023        Ok(row.and_then(|s| s.parse().ok()))
1024    }
1025
1026    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1027        self.conn.execute(
1028            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1029             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1030            params![key, v.to_string()],
1031        )?;
1032        Ok(())
1033    }
1034
1035    /// Delete sessions with `started_at_ms` strictly before `cutoff_ms` and all dependent rows.
1036    pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1037        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1038        let old_ids = old_session_ids(&tx, cutoff_ms)?;
1039        let sessions_to_remove: i64 = tx.query_row(
1040            "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1041            params![cutoff_ms],
1042            |r| r.get(0),
1043        )?;
1044        let events_to_remove: i64 = tx.query_row(
1045            "SELECT COUNT(*) FROM events WHERE session_id IN \
1046             (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1047            params![cutoff_ms],
1048            |r| r.get(0),
1049        )?;
1050
1051        let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1052        tx.execute(
1053            &format!(
1054                "DELETE FROM tool_span_paths WHERE span_id IN \
1055                 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1056            ),
1057            params![cutoff_ms],
1058        )?;
1059        tx.execute(
1060            &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1061            params![cutoff_ms],
1062        )?;
1063        tx.execute(
1064            &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1065            params![cutoff_ms],
1066        )?;
1067        tx.execute(
1068            &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1069            params![cutoff_ms],
1070        )?;
1071        tx.execute(
1072            &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1073            params![cutoff_ms],
1074        )?;
1075        tx.execute(
1076            &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1077            params![cutoff_ms],
1078        )?;
1079        tx.execute(
1080            &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1081            params![cutoff_ms],
1082        )?;
1083        tx.execute(
1084            &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1085            params![cutoff_ms],
1086        )?;
1087        tx.execute(
1088            &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1089            params![cutoff_ms],
1090        )?;
1091        tx.execute(
1092            &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1093            params![cutoff_ms],
1094        )?;
1095        tx.execute(
1096            &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1097            params![cutoff_ms],
1098        )?;
1099        tx.execute(
1100            "DELETE FROM sessions WHERE started_at_ms < ?1",
1101            params![cutoff_ms],
1102        )?;
1103        tx.commit()?;
1104        if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1105            let _ = writer.commit();
1106        }
1107        if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1108            tracing::warn!("search prune skipped: {err:#}");
1109            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1110        }
1111        self.invalidate_span_tree_cache();
1112        Ok(PruneStats {
1113            sessions_removed: sessions_to_remove as u64,
1114            events_removed: events_to_remove as u64,
1115        })
1116    }
1117
1118    /// Reclaim file space after large deletes (exclusive lock; can be slow).
1119    pub fn vacuum(&self) -> Result<()> {
1120        self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1121        Ok(())
1122    }
1123
1124    pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1125        Ok(self
1126            .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1127            .rows)
1128    }
1129
1130    pub fn list_sessions_page(
1131        &self,
1132        workspace: &str,
1133        offset: usize,
1134        limit: usize,
1135        filter: SessionFilter,
1136    ) -> Result<SessionPage> {
1137        let (where_sql, args) = session_filter_sql(workspace, &filter);
1138        let total = self.query_session_page_count(&where_sql, &args)?;
1139        let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1140        let next = offset.saturating_add(rows.len());
1141        Ok(SessionPage {
1142            rows,
1143            total,
1144            next_offset: (next < total).then_some(next),
1145        })
1146    }
1147
1148    fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1149        let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1150        let total: i64 = self
1151            .conn
1152            .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1153        Ok(total as usize)
1154    }
1155
1156    fn query_session_page_rows(
1157        &self,
1158        where_sql: &str,
1159        args: &[Value],
1160        offset: usize,
1161        limit: usize,
1162    ) -> Result<Vec<SessionRecord>> {
1163        let sql = format!(
1164            "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1165        );
1166        let mut values = args.to_vec();
1167        values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1168        values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1169        let mut stmt = self.conn.prepare(&sql)?;
1170        let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1171        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1172    }
1173
1174    pub fn list_sessions_started_after(
1175        &self,
1176        workspace: &str,
1177        after_started_at_ms: u64,
1178    ) -> Result<Vec<SessionRecord>> {
1179        let mut stmt = self.conn.prepare(
1180            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1181                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1182                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1183                    repo_file_count, repo_total_loc
1184             FROM sessions
1185             WHERE workspace = ?1 AND started_at_ms > ?2
1186             ORDER BY started_at_ms DESC, id ASC",
1187        )?;
1188        let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1189        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1190    }
1191
1192    pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1193        if ids.is_empty() {
1194            return Ok(Vec::new());
1195        }
1196        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1197        let sql =
1198            format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1199        let mut stmt = self.conn.prepare(&sql)?;
1200        let params: Vec<&dyn rusqlite::ToSql> =
1201            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1202        let rows = stmt.query_map(params.as_slice(), |r| {
1203            let status: String = r.get(1)?;
1204            Ok(SessionStatusRow {
1205                id: r.get(0)?,
1206                status: status_from_str(&status),
1207                ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1208            })
1209        })?;
1210        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1211    }
1212
1213    fn running_session_ids(&self) -> Result<Vec<String>> {
1214        let mut stmt = self
1215            .conn
1216            .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1217        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1218        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1219    }
1220
1221    pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1222        let session_count: i64 = self.conn.query_row(
1223            "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1224            params![workspace],
1225            |r| r.get(0),
1226        )?;
1227
1228        let total_cost: i64 = self.conn.query_row(
1229            "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1230             JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1231            params![workspace],
1232            |r| r.get(0),
1233        )?;
1234
1235        let mut stmt = self.conn.prepare(
1236            "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1237        )?;
1238        let by_agent: Vec<(String, u64)> = stmt
1239            .query_map(params![workspace], |r| {
1240                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1241            })?
1242            .filter_map(|r| r.ok())
1243            .collect();
1244
1245        let mut stmt = self.conn.prepare(
1246            "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1247        )?;
1248        let by_model: Vec<(String, u64)> = stmt
1249            .query_map(params![workspace], |r| {
1250                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1251            })?
1252            .filter_map(|r| r.ok())
1253            .collect();
1254
1255        let mut stmt = self.conn.prepare(
1256            "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1257             WHERE s.workspace = ?1 AND tool IS NOT NULL
1258             GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1259        )?;
1260        let top_tools: Vec<(String, u64)> = stmt
1261            .query_map(params![workspace], |r| {
1262                Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1263            })?
1264            .filter_map(|r| r.ok())
1265            .collect();
1266
1267        Ok(SummaryStats {
1268            session_count: session_count as u64,
1269            total_cost_usd_e6: total_cost,
1270            by_agent,
1271            by_model,
1272            top_tools,
1273        })
1274    }
1275
1276    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1277        self.list_events_page(session_id, 0, i64::MAX as usize)
1278    }
1279
1280    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1281        let mut stmt = self.conn.prepare(
1282            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1283                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1284                    stop_reason, latency_ms, ttft_ms, retry_count,
1285                    context_used_tokens, context_max_tokens,
1286                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1287             FROM events WHERE session_id = ?1 AND seq = ?2",
1288        )?;
1289        stmt.query_row(params![session_id, seq as i64], event_row)
1290            .optional()
1291            .map_err(Into::into)
1292    }
1293
1294    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1295        let mut out = Vec::new();
1296        for session in self.list_sessions(workspace)? {
1297            for event in self.list_events_for_session(&session.id)? {
1298                out.push((session.clone(), event));
1299            }
1300        }
1301        out.sort_by(|a, b| {
1302            (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1303        });
1304        Ok(out)
1305    }
1306
1307    pub fn list_events_page(
1308        &self,
1309        session_id: &str,
1310        after_seq: u64,
1311        limit: usize,
1312    ) -> Result<Vec<Event>> {
1313        let mut stmt = self.conn.prepare(
1314            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1315                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1316                    stop_reason, latency_ms, ttft_ms, retry_count,
1317                    context_used_tokens, context_max_tokens,
1318                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1319             FROM events
1320             WHERE session_id = ?1 AND seq >= ?2
1321             ORDER BY seq ASC LIMIT ?3",
1322        )?;
1323        let rows = stmt.query_map(
1324            params![
1325                session_id,
1326                after_seq as i64,
1327                limit.min(i64::MAX as usize) as i64
1328            ],
1329            event_row,
1330        )?;
1331        let mut events = Vec::new();
1332        for row in rows {
1333            events.push(row?);
1334        }
1335        Ok(events)
1336    }
1337
1338    /// Update only status for existing session.
1339    pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1340        self.conn.execute(
1341            "UPDATE sessions SET status = ?1 WHERE id = ?2",
1342            params![format!("{:?}", status), id],
1343        )?;
1344        Ok(())
1345    }
1346
1347    /// Workspace activity dashboard — feeds `cmd_insights`.
1348    pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1349        let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1350        Ok(InsightsStats {
1351            total_sessions: count_q(
1352                &self.conn,
1353                "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1354                workspace,
1355            )?,
1356            running_sessions: count_q(
1357                &self.conn,
1358                "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1359                workspace,
1360            )?,
1361            total_events: count_q(
1362                &self.conn,
1363                "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1364                workspace,
1365            )?,
1366            sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1367            recent: recent_sessions_3(&self.conn, workspace)?,
1368            top_tools: top_tools_5(&self.conn, workspace)?,
1369            total_cost_usd_e6,
1370            sessions_with_cost,
1371        })
1372    }
1373
1374    /// Events in `[start_ms, end_ms]` for a workspace, with session metadata per row.
1375    pub fn retro_events_in_window(
1376        &self,
1377        workspace: &str,
1378        start_ms: u64,
1379        end_ms: u64,
1380    ) -> Result<Vec<(SessionRecord, Event)>> {
1381        let mut stmt = self.conn.prepare(
1382            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1383                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1384                    s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1385                    s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1386                    s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1387                    s.repo_file_count, s.repo_total_loc,
1388                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1389                    e.context_used_tokens, e.context_max_tokens,
1390                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1391             FROM events e
1392             JOIN sessions s ON s.id = e.session_id
1393             WHERE s.workspace = ?1
1394               AND (
1395                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1396                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1397               )
1398             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1399        )?;
1400        let rows = stmt.query_map(
1401            params![
1402                workspace,
1403                start_ms as i64,
1404                end_ms as i64,
1405                SYNTHETIC_TS_CEILING_MS,
1406            ],
1407            |row| {
1408                let payload_str: String = row.get(12)?;
1409                let status_str: String = row.get(19)?;
1410                Ok((
1411                    SessionRecord {
1412                        id: row.get(13)?,
1413                        agent: row.get(14)?,
1414                        model: row.get(15)?,
1415                        workspace: row.get(16)?,
1416                        started_at_ms: row.get::<_, i64>(17)? as u64,
1417                        ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1418                        status: status_from_str(&status_str),
1419                        trace_path: row.get(20)?,
1420                        start_commit: row.get(21)?,
1421                        end_commit: row.get(22)?,
1422                        branch: row.get(23)?,
1423                        dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1424                        dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1425                        repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1426                        prompt_fingerprint: row.get(27)?,
1427                        parent_session_id: row.get(28)?,
1428                        agent_version: row.get(29)?,
1429                        os: row.get(30)?,
1430                        arch: row.get(31)?,
1431                        repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1432                        repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1433                    },
1434                    Event {
1435                        session_id: row.get(0)?,
1436                        seq: row.get::<_, i64>(1)? as u64,
1437                        ts_ms: row.get::<_, i64>(2)? as u64,
1438                        ts_exact: row.get::<_, i64>(3)? != 0,
1439                        kind: kind_from_str(&row.get::<_, String>(4)?),
1440                        source: source_from_str(&row.get::<_, String>(5)?),
1441                        tool: row.get(6)?,
1442                        tool_call_id: row.get(7)?,
1443                        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1444                        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1445                        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1446                        cost_usd_e6: row.get(11)?,
1447                        payload: serde_json::from_str(&payload_str)
1448                            .unwrap_or(serde_json::Value::Null),
1449                        stop_reason: row.get(34)?,
1450                        latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1451                        ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1452                        retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1453                        context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1454                        context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1455                        cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1456                        cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1457                        system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1458                    },
1459                ))
1460            },
1461        )?;
1462
1463        let mut out = Vec::new();
1464        for r in rows {
1465            out.push(r?);
1466        }
1467        Ok(out)
1468    }
1469
1470    /// Distinct `(session_id, path)` for sessions with activity in the time window.
1471    pub fn files_touched_in_window(
1472        &self,
1473        workspace: &str,
1474        start_ms: u64,
1475        end_ms: u64,
1476    ) -> Result<Vec<(String, String)>> {
1477        let mut stmt = self.conn.prepare(
1478            "SELECT DISTINCT ft.session_id, ft.path
1479             FROM files_touched ft
1480             JOIN sessions s ON s.id = ft.session_id
1481             WHERE s.workspace = ?1
1482               AND EXISTS (
1483                 SELECT 1 FROM events e
1484                 JOIN sessions ss ON ss.id = e.session_id
1485                 WHERE e.session_id = ft.session_id
1486                   AND (
1487                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1488                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1489                   )
1490               )
1491             ORDER BY ft.session_id, ft.path",
1492        )?;
1493        let out: Vec<(String, String)> = stmt
1494            .query_map(
1495                params![
1496                    workspace,
1497                    start_ms as i64,
1498                    end_ms as i64,
1499                    SYNTHETIC_TS_CEILING_MS,
1500                ],
1501                |r| Ok((r.get(0)?, r.get(1)?)),
1502            )?
1503            .filter_map(|r| r.ok())
1504            .collect();
1505        Ok(out)
1506    }
1507
1508    /// Distinct skill slugs referenced in `skills_used` for a workspace since `since_ms`
1509    /// (any session with an indexed skill row; join events optional — use row existence).
1510    pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1511        let mut stmt = self.conn.prepare(
1512            "SELECT DISTINCT su.skill
1513             FROM skills_used su
1514             JOIN sessions s ON s.id = su.session_id
1515             WHERE s.workspace = ?1
1516               AND EXISTS (
1517                 SELECT 1 FROM events e
1518                 JOIN sessions ss ON ss.id = e.session_id
1519                 WHERE e.session_id = su.session_id
1520                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1521               )
1522             ORDER BY su.skill",
1523        )?;
1524        let out: Vec<String> = stmt
1525            .query_map(
1526                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1527                |r| r.get::<_, String>(0),
1528            )?
1529            .filter_map(|r| r.ok())
1530            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1531            .collect();
1532        Ok(out)
1533    }
1534
1535    /// Distinct `(session_id, skill)` for sessions with activity in the time window.
1536    pub fn skills_used_in_window(
1537        &self,
1538        workspace: &str,
1539        start_ms: u64,
1540        end_ms: u64,
1541    ) -> Result<Vec<(String, String)>> {
1542        let mut stmt = self.conn.prepare(
1543            "SELECT DISTINCT su.session_id, su.skill
1544             FROM skills_used su
1545             JOIN sessions s ON s.id = su.session_id
1546             WHERE s.workspace = ?1
1547               AND EXISTS (
1548                 SELECT 1 FROM events e
1549                 JOIN sessions ss ON ss.id = e.session_id
1550                 WHERE e.session_id = su.session_id
1551                   AND (
1552                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1553                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1554                   )
1555               )
1556             ORDER BY su.session_id, su.skill",
1557        )?;
1558        let out: Vec<(String, String)> = stmt
1559            .query_map(
1560                params![
1561                    workspace,
1562                    start_ms as i64,
1563                    end_ms as i64,
1564                    SYNTHETIC_TS_CEILING_MS,
1565                ],
1566                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1567            )?
1568            .filter_map(|r| r.ok())
1569            .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1570            .collect();
1571        Ok(out)
1572    }
1573
1574    /// Distinct rule stems referenced in `rules_used` for a workspace since `since_ms`.
1575    pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1576        let mut stmt = self.conn.prepare(
1577            "SELECT DISTINCT ru.rule
1578             FROM rules_used ru
1579             JOIN sessions s ON s.id = ru.session_id
1580             WHERE s.workspace = ?1
1581               AND EXISTS (
1582                 SELECT 1 FROM events e
1583                 JOIN sessions ss ON ss.id = e.session_id
1584                 WHERE e.session_id = ru.session_id
1585                   AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1586               )
1587             ORDER BY ru.rule",
1588        )?;
1589        let out: Vec<String> = stmt
1590            .query_map(
1591                params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1592                |r| r.get::<_, String>(0),
1593            )?
1594            .filter_map(|r| r.ok())
1595            .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1596            .collect();
1597        Ok(out)
1598    }
1599
1600    /// Distinct `(session_id, rule)` for sessions with activity in the time window.
1601    pub fn rules_used_in_window(
1602        &self,
1603        workspace: &str,
1604        start_ms: u64,
1605        end_ms: u64,
1606    ) -> Result<Vec<(String, String)>> {
1607        let mut stmt = self.conn.prepare(
1608            "SELECT DISTINCT ru.session_id, ru.rule
1609             FROM rules_used ru
1610             JOIN sessions s ON s.id = ru.session_id
1611             WHERE s.workspace = ?1
1612               AND EXISTS (
1613                 SELECT 1 FROM events e
1614                 JOIN sessions ss ON ss.id = e.session_id
1615                 WHERE e.session_id = ru.session_id
1616                   AND (
1617                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1618                     OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1619                   )
1620               )
1621             ORDER BY ru.session_id, ru.rule",
1622        )?;
1623        let out: Vec<(String, String)> = stmt
1624            .query_map(
1625                params![
1626                    workspace,
1627                    start_ms as i64,
1628                    end_ms as i64,
1629                    SYNTHETIC_TS_CEILING_MS,
1630                ],
1631                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1632            )?
1633            .filter_map(|r| r.ok())
1634            .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1635            .collect();
1636        Ok(out)
1637    }
1638
1639    /// Sessions with at least one event timestamp falling in `[start_ms, end_ms]` (same rules as retro window).
1640    pub fn sessions_active_in_window(
1641        &self,
1642        workspace: &str,
1643        start_ms: u64,
1644        end_ms: u64,
1645    ) -> Result<HashSet<String>> {
1646        let mut stmt = self.conn.prepare(
1647            "SELECT DISTINCT s.id
1648             FROM sessions s
1649             WHERE s.workspace = ?1
1650               AND EXISTS (
1651                 SELECT 1 FROM events e
1652                 WHERE e.session_id = s.id
1653                   AND (
1654                     (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1655                     OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1656                   )
1657               )",
1658        )?;
1659        let out: HashSet<String> = stmt
1660            .query_map(
1661                params![
1662                    workspace,
1663                    start_ms as i64,
1664                    end_ms as i64,
1665                    SYNTHETIC_TS_CEILING_MS,
1666                ],
1667                |r| r.get(0),
1668            )?
1669            .filter_map(|r| r.ok())
1670            .collect();
1671        Ok(out)
1672    }
1673
1674    /// Per-session sum of `cost_usd_e6` for events in the window (missing costs treated as 0).
1675    pub fn session_costs_usd_e6_in_window(
1676        &self,
1677        workspace: &str,
1678        start_ms: u64,
1679        end_ms: u64,
1680    ) -> Result<HashMap<String, i64>> {
1681        let mut stmt = self.conn.prepare(
1682            "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1683             FROM events e
1684             JOIN sessions s ON s.id = e.session_id
1685             WHERE s.workspace = ?1
1686               AND (
1687                 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1688                 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1689               )
1690             GROUP BY e.session_id",
1691        )?;
1692        let rows: Vec<(String, i64)> = stmt
1693            .query_map(
1694                params![
1695                    workspace,
1696                    start_ms as i64,
1697                    end_ms as i64,
1698                    SYNTHETIC_TS_CEILING_MS,
1699                ],
1700                |r| Ok((r.get(0)?, r.get(1)?)),
1701            )?
1702            .filter_map(|r| r.ok())
1703            .collect();
1704        Ok(rows.into_iter().collect())
1705    }
1706
1707    /// Skill/rule adoption and cost proxy vs workspace average (observed payload references only).
1708    pub fn guidance_report(
1709        &self,
1710        workspace: &str,
1711        window_start_ms: u64,
1712        window_end_ms: u64,
1713        skill_slugs_on_disk: &HashSet<String>,
1714        rule_slugs_on_disk: &HashSet<String>,
1715    ) -> Result<GuidanceReport> {
1716        let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1717        let denom = active.len() as u64;
1718        let costs =
1719            self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1720
1721        let workspace_avg_cost_per_session_usd = if denom > 0 {
1722            let total_e6: i64 = active
1723                .iter()
1724                .map(|sid| costs.get(sid).copied().unwrap_or(0))
1725                .sum();
1726            Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1727        } else {
1728            None
1729        };
1730
1731        let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1732        for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1733            skill_sessions.entry(skill).or_default().insert(sid);
1734        }
1735        let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1736        for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1737            rule_sessions.entry(rule).or_default().insert(sid);
1738        }
1739
1740        let mut rows: Vec<GuidancePerfRow> = Vec::new();
1741
1742        let mut push_row =
1743            |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1744                let sessions = sids.len() as u64;
1745                let sessions_pct = if denom > 0 {
1746                    sessions as f64 * 100.0 / denom as f64
1747                } else {
1748                    0.0
1749                };
1750                let total_cost_usd_e6: i64 = sids
1751                    .iter()
1752                    .map(|sid| costs.get(sid).copied().unwrap_or(0))
1753                    .sum();
1754                let avg_cost_per_session_usd = if sessions > 0 {
1755                    Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1756                } else {
1757                    None
1758                };
1759                let vs_workspace_avg_cost_per_session_usd =
1760                    match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1761                        (Some(avg), Some(w)) => Some(avg - w),
1762                        _ => None,
1763                    };
1764                rows.push(GuidancePerfRow {
1765                    kind,
1766                    id,
1767                    sessions,
1768                    sessions_pct,
1769                    total_cost_usd_e6,
1770                    avg_cost_per_session_usd,
1771                    vs_workspace_avg_cost_per_session_usd,
1772                    on_disk,
1773                });
1774            };
1775
1776        let mut seen_skills: HashSet<String> = HashSet::new();
1777        for (id, sids) in &skill_sessions {
1778            seen_skills.insert(id.clone());
1779            push_row(
1780                GuidanceKind::Skill,
1781                id.clone(),
1782                sids,
1783                skill_slugs_on_disk.contains(id),
1784            );
1785        }
1786        for slug in skill_slugs_on_disk {
1787            if seen_skills.contains(slug) {
1788                continue;
1789            }
1790            push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1791        }
1792
1793        let mut seen_rules: HashSet<String> = HashSet::new();
1794        for (id, sids) in &rule_sessions {
1795            seen_rules.insert(id.clone());
1796            push_row(
1797                GuidanceKind::Rule,
1798                id.clone(),
1799                sids,
1800                rule_slugs_on_disk.contains(id),
1801            );
1802        }
1803        for slug in rule_slugs_on_disk {
1804            if seen_rules.contains(slug) {
1805                continue;
1806            }
1807            push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1808        }
1809
1810        rows.sort_by(|a, b| {
1811            b.sessions
1812                .cmp(&a.sessions)
1813                .then_with(|| a.kind.cmp(&b.kind))
1814                .then_with(|| a.id.cmp(&b.id))
1815        });
1816
1817        Ok(GuidanceReport {
1818            workspace: workspace.to_string(),
1819            window_start_ms,
1820            window_end_ms,
1821            sessions_in_window: denom,
1822            workspace_avg_cost_per_session_usd,
1823            rows,
1824        })
1825    }
1826
1827    pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1828        let mut stmt = self.conn.prepare(
1829            "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1830                    start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1831                    prompt_fingerprint, parent_session_id, agent_version, os, arch,
1832                    repo_file_count, repo_total_loc
1833             FROM sessions WHERE id = ?1",
1834        )?;
1835        let mut rows = stmt.query_map(params![id], |row| {
1836            Ok((
1837                row.get::<_, String>(0)?,
1838                row.get::<_, String>(1)?,
1839                row.get::<_, Option<String>>(2)?,
1840                row.get::<_, String>(3)?,
1841                row.get::<_, i64>(4)?,
1842                row.get::<_, Option<i64>>(5)?,
1843                row.get::<_, String>(6)?,
1844                row.get::<_, String>(7)?,
1845                row.get::<_, Option<String>>(8)?,
1846                row.get::<_, Option<String>>(9)?,
1847                row.get::<_, Option<String>>(10)?,
1848                row.get::<_, Option<i64>>(11)?,
1849                row.get::<_, Option<i64>>(12)?,
1850                row.get::<_, String>(13)?,
1851                row.get::<_, Option<String>>(14)?,
1852                row.get::<_, Option<String>>(15)?,
1853                row.get::<_, Option<String>>(16)?,
1854                row.get::<_, Option<String>>(17)?,
1855                row.get::<_, Option<String>>(18)?,
1856                row.get::<_, Option<i64>>(19)?,
1857                row.get::<_, Option<i64>>(20)?,
1858            ))
1859        })?;
1860
1861        if let Some(row) = rows.next() {
1862            let (
1863                id,
1864                agent,
1865                model,
1866                workspace,
1867                started,
1868                ended,
1869                status_str,
1870                trace,
1871                start_commit,
1872                end_commit,
1873                branch,
1874                dirty_start,
1875                dirty_end,
1876                source,
1877                prompt_fingerprint,
1878                parent_session_id,
1879                agent_version,
1880                os,
1881                arch,
1882                repo_file_count,
1883                repo_total_loc,
1884            ) = row?;
1885            Ok(Some(SessionRecord {
1886                id,
1887                agent,
1888                model,
1889                workspace,
1890                started_at_ms: started as u64,
1891                ended_at_ms: ended.map(|v| v as u64),
1892                status: status_from_str(&status_str),
1893                trace_path: trace,
1894                start_commit,
1895                end_commit,
1896                branch,
1897                dirty_start: dirty_start.map(i64_to_bool),
1898                dirty_end: dirty_end.map(i64_to_bool),
1899                repo_binding_source: empty_to_none(source),
1900                prompt_fingerprint,
1901                parent_session_id,
1902                agent_version,
1903                os,
1904                arch,
1905                repo_file_count: repo_file_count.map(|v| v as u32),
1906                repo_total_loc: repo_total_loc.map(|v| v as u64),
1907            }))
1908        } else {
1909            Ok(None)
1910        }
1911    }
1912
1913    pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1914        let mut stmt = self.conn.prepare(
1915            "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1916                    indexed_at_ms, dirty, graph_path
1917             FROM repo_snapshots WHERE workspace = ?1
1918             ORDER BY indexed_at_ms DESC LIMIT 1",
1919        )?;
1920        let mut rows = stmt.query_map(params![workspace], |row| {
1921            Ok(RepoSnapshotRecord {
1922                id: row.get(0)?,
1923                workspace: row.get(1)?,
1924                head_commit: row.get(2)?,
1925                dirty_fingerprint: row.get(3)?,
1926                analyzer_version: row.get(4)?,
1927                indexed_at_ms: row.get::<_, i64>(5)? as u64,
1928                dirty: row.get::<_, i64>(6)? != 0,
1929                graph_path: row.get(7)?,
1930            })
1931        })?;
1932        Ok(rows.next().transpose()?)
1933    }
1934
1935    pub fn save_repo_snapshot(
1936        &self,
1937        snapshot: &RepoSnapshotRecord,
1938        facts: &[FileFact],
1939        edges: &[RepoEdge],
1940    ) -> Result<()> {
1941        self.conn.execute(
1942            "INSERT INTO repo_snapshots (
1943                id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1944                indexed_at_ms, dirty, graph_path
1945             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1946             ON CONFLICT(id) DO UPDATE SET
1947                workspace=excluded.workspace,
1948                head_commit=excluded.head_commit,
1949                dirty_fingerprint=excluded.dirty_fingerprint,
1950                analyzer_version=excluded.analyzer_version,
1951                indexed_at_ms=excluded.indexed_at_ms,
1952                dirty=excluded.dirty,
1953                graph_path=excluded.graph_path",
1954            params![
1955                snapshot.id,
1956                snapshot.workspace,
1957                snapshot.head_commit,
1958                snapshot.dirty_fingerprint,
1959                snapshot.analyzer_version,
1960                snapshot.indexed_at_ms as i64,
1961                bool_to_i64(snapshot.dirty),
1962                snapshot.graph_path,
1963            ],
1964        )?;
1965        self.conn.execute(
1966            "DELETE FROM file_facts WHERE snapshot_id = ?1",
1967            params![snapshot.id],
1968        )?;
1969        self.conn.execute(
1970            "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1971            params![snapshot.id],
1972        )?;
1973        for fact in facts {
1974            self.conn.execute(
1975                "INSERT INTO file_facts (
1976                    snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1977                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1978                    churn_30d, churn_90d, authors_90d, last_changed_ms
1979                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1980                params![
1981                    fact.snapshot_id,
1982                    fact.path,
1983                    fact.language,
1984                    fact.bytes as i64,
1985                    fact.loc as i64,
1986                    fact.sloc as i64,
1987                    fact.complexity_total as i64,
1988                    fact.max_fn_complexity as i64,
1989                    fact.symbol_count as i64,
1990                    fact.import_count as i64,
1991                    fact.fan_in as i64,
1992                    fact.fan_out as i64,
1993                    fact.churn_30d as i64,
1994                    fact.churn_90d as i64,
1995                    fact.authors_90d as i64,
1996                    fact.last_changed_ms.map(|v| v as i64),
1997                ],
1998            )?;
1999        }
2000        for edge in edges {
2001            self.conn.execute(
2002                "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2003                 VALUES (?1, ?2, ?3, ?4, ?5)
2004                 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2005                 DO UPDATE SET weight = weight + excluded.weight",
2006                params![
2007                    snapshot.id,
2008                    edge.from_path,
2009                    edge.to_path,
2010                    edge.kind,
2011                    edge.weight as i64,
2012                ],
2013            )?;
2014        }
2015        Ok(())
2016    }
2017
2018    pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2019        let mut stmt = self.conn.prepare(
2020            "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2021                    max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2022                    churn_30d, churn_90d, authors_90d, last_changed_ms
2023             FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2024        )?;
2025        let rows = stmt.query_map(params![snapshot_id], |row| {
2026            Ok(FileFact {
2027                snapshot_id: row.get(0)?,
2028                path: row.get(1)?,
2029                language: row.get(2)?,
2030                bytes: row.get::<_, i64>(3)? as u64,
2031                loc: row.get::<_, i64>(4)? as u32,
2032                sloc: row.get::<_, i64>(5)? as u32,
2033                complexity_total: row.get::<_, i64>(6)? as u32,
2034                max_fn_complexity: row.get::<_, i64>(7)? as u32,
2035                symbol_count: row.get::<_, i64>(8)? as u32,
2036                import_count: row.get::<_, i64>(9)? as u32,
2037                fan_in: row.get::<_, i64>(10)? as u32,
2038                fan_out: row.get::<_, i64>(11)? as u32,
2039                churn_30d: row.get::<_, i64>(12)? as u32,
2040                churn_90d: row.get::<_, i64>(13)? as u32,
2041                authors_90d: row.get::<_, i64>(14)? as u32,
2042                last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2043            })
2044        })?;
2045        Ok(rows.filter_map(|row| row.ok()).collect())
2046    }
2047
2048    pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2049        let mut stmt = self.conn.prepare(
2050            "SELECT from_id, to_id, kind, weight
2051             FROM repo_edges WHERE snapshot_id = ?1
2052             ORDER BY kind, from_id, to_id",
2053        )?;
2054        let rows = stmt.query_map(params![snapshot_id], |row| {
2055            Ok(RepoEdge {
2056                from_path: row.get(0)?,
2057                to_path: row.get(1)?,
2058                kind: row.get(2)?,
2059                weight: row.get::<_, i64>(3)? as u32,
2060            })
2061        })?;
2062        Ok(rows.filter_map(|row| row.ok()).collect())
2063    }
2064
2065    pub fn tool_spans_in_window(
2066        &self,
2067        workspace: &str,
2068        start_ms: u64,
2069        end_ms: u64,
2070    ) -> Result<Vec<ToolSpanView>> {
2071        let mut stmt = self.conn.prepare(
2072            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2073                    reasoning_tokens, cost_usd_e6, paths_json,
2074                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2075             FROM (
2076                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2077                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2078                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2079                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2080                        ts.started_at_ms AS sort_ms
2081                 FROM tool_spans ts
2082                 JOIN sessions s ON s.id = ts.session_id
2083                 WHERE s.workspace = ?1
2084                   AND ts.started_at_ms >= ?2
2085                   AND ts.started_at_ms <= ?3
2086                 UNION ALL
2087                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2088                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2089                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2090                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2091                        ts.ended_at_ms AS sort_ms
2092                 FROM tool_spans ts
2093                 JOIN sessions s ON s.id = ts.session_id
2094                 WHERE s.workspace = ?1
2095                   AND ts.started_at_ms IS NULL
2096                   AND ts.ended_at_ms >= ?2
2097                   AND ts.ended_at_ms <= ?3
2098             )
2099             ORDER BY sort_ms DESC",
2100        )?;
2101        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2102            let paths_json: String = row.get(8)?;
2103            Ok(ToolSpanView {
2104                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2105                tool: row
2106                    .get::<_, Option<String>>(1)?
2107                    .unwrap_or_else(|| "unknown".into()),
2108                status: row.get(2)?,
2109                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2110                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2111                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2112                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2113                cost_usd_e6: row.get(7)?,
2114                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2115                parent_span_id: row.get(9)?,
2116                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2117                subtree_cost_usd_e6: row.get(11)?,
2118                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2119            })
2120        })?;
2121        Ok(rows.filter_map(|row| row.ok()).collect())
2122    }
2123
2124    pub fn session_span_tree(
2125        &self,
2126        session_id: &str,
2127    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2128        let last_event_seq = self.last_event_seq_for_session(session_id)?;
2129        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2130            && entry.session_id == session_id
2131            && entry.last_event_seq == last_event_seq
2132        {
2133            return Ok(entry.nodes.clone());
2134        }
2135        let mut stmt = self.conn.prepare(
2136            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2137                    reasoning_tokens, cost_usd_e6, paths_json,
2138                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2139             FROM tool_spans
2140             WHERE session_id = ?1
2141             ORDER BY depth ASC, started_at_ms ASC",
2142        )?;
2143        let rows = stmt.query_map(params![session_id], |row| {
2144            let paths_json: String = row.get(8)?;
2145            Ok(crate::metrics::types::ToolSpanView {
2146                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2147                tool: row
2148                    .get::<_, Option<String>>(1)?
2149                    .unwrap_or_else(|| "unknown".into()),
2150                status: row.get(2)?,
2151                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2152                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2153                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2154                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2155                cost_usd_e6: row.get(7)?,
2156                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2157                parent_span_id: row.get(9)?,
2158                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2159                subtree_cost_usd_e6: row.get(11)?,
2160                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2161            })
2162        })?;
2163        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2164        let nodes = crate::store::span_tree::build_tree(spans);
2165        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2166            session_id: session_id.to_string(),
2167            last_event_seq,
2168            nodes: nodes.clone(),
2169        });
2170        Ok(nodes)
2171    }
2172
2173    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2174        let seq = self
2175            .conn
2176            .query_row(
2177                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2178                params![session_id],
2179                |r| r.get::<_, Option<i64>>(0),
2180            )?
2181            .map(|v| v as u64);
2182        Ok(seq)
2183    }
2184
2185    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2186        let mut stmt = self.conn.prepare(
2187            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2188                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2189             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2190        )?;
2191        let rows = stmt.query_map(params![session_id], |row| {
2192            let paths_json: String = row.get(12)?;
2193            Ok(ToolSpanSyncRow {
2194                span_id: row.get(0)?,
2195                session_id: row.get(1)?,
2196                tool: row.get(2)?,
2197                tool_call_id: row.get(3)?,
2198                status: row.get(4)?,
2199                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2200                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2201                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2202                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2203                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2204                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2205                cost_usd_e6: row.get(11)?,
2206                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2207            })
2208        })?;
2209        Ok(rows.filter_map(|row| row.ok()).collect())
2210    }
2211
2212    pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2213        self.conn.execute(
2214            "INSERT OR REPLACE INTO session_evals
2215             (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2216             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2217            rusqlite::params![
2218                eval.id,
2219                eval.session_id,
2220                eval.judge_model,
2221                eval.rubric_id,
2222                eval.score,
2223                eval.rationale,
2224                eval.flagged as i64,
2225                eval.created_at_ms as i64,
2226            ],
2227        )?;
2228        Ok(())
2229    }
2230
2231    pub fn list_evals_in_window(
2232        &self,
2233        start_ms: u64,
2234        end_ms: u64,
2235    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2236        let mut stmt = self.conn.prepare(
2237            "SELECT id, session_id, judge_model, rubric_id, score,
2238                    rationale, flagged, created_at_ms
2239             FROM session_evals
2240             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2241             ORDER BY created_at_ms ASC",
2242        )?;
2243        let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2244            Ok(crate::eval::types::EvalRow {
2245                id: r.get(0)?,
2246                session_id: r.get(1)?,
2247                judge_model: r.get(2)?,
2248                rubric_id: r.get(3)?,
2249                score: r.get(4)?,
2250                rationale: r.get(5)?,
2251                flagged: r.get::<_, i64>(6)? != 0,
2252                created_at_ms: r.get::<_, i64>(7)? as u64,
2253            })
2254        })?;
2255        rows.collect()
2256    }
2257
2258    pub fn list_evals_for_session(
2259        &self,
2260        session_id: &str,
2261    ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2262        let mut stmt = self.conn.prepare(
2263            "SELECT id, session_id, judge_model, rubric_id, score,
2264                    rationale, flagged, created_at_ms
2265             FROM session_evals
2266             WHERE session_id = ?1
2267             ORDER BY created_at_ms DESC",
2268        )?;
2269        let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2270            Ok(crate::eval::types::EvalRow {
2271                id: r.get(0)?,
2272                session_id: r.get(1)?,
2273                judge_model: r.get(2)?,
2274                rubric_id: r.get(3)?,
2275                score: r.get(4)?,
2276                rationale: r.get(5)?,
2277                flagged: r.get::<_, i64>(6)? != 0,
2278                created_at_ms: r.get::<_, i64>(7)? as u64,
2279            })
2280        })?;
2281        rows.collect()
2282    }
2283
2284    pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2285        use crate::feedback::types::FeedbackLabel;
2286        self.conn.execute(
2287            "INSERT OR REPLACE INTO session_feedback
2288             (id, session_id, score, label, note, created_at_ms)
2289             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2290            rusqlite::params![
2291                r.id,
2292                r.session_id,
2293                r.score.as_ref().map(|s| s.0 as i64),
2294                r.label.as_ref().map(FeedbackLabel::to_db_str),
2295                r.note,
2296                r.created_at_ms as i64,
2297            ],
2298        )?;
2299        let payload = serde_json::to_string(r).unwrap_or_default();
2300        self.conn.execute(
2301            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2302             VALUES (?1, 'session_feedback', ?2, 0)",
2303            rusqlite::params![r.session_id, payload],
2304        )?;
2305        Ok(())
2306    }
2307
2308    pub fn list_feedback_in_window(
2309        &self,
2310        start_ms: u64,
2311        end_ms: u64,
2312    ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2313        let mut stmt = self.conn.prepare(
2314            "SELECT id, session_id, score, label, note, created_at_ms
2315             FROM session_feedback
2316             WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2317             ORDER BY created_at_ms ASC",
2318        )?;
2319        let rows = stmt.query_map(
2320            rusqlite::params![start_ms as i64, end_ms as i64],
2321            feedback_row,
2322        )?;
2323        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2324    }
2325
2326    pub fn feedback_for_sessions(
2327        &self,
2328        ids: &[String],
2329    ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2330        if ids.is_empty() {
2331            return Ok(std::collections::HashMap::new());
2332        }
2333        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2334        let sql = format!(
2335            "SELECT id, session_id, score, label, note, created_at_ms
2336             FROM session_feedback WHERE session_id IN ({placeholders})
2337             ORDER BY created_at_ms DESC"
2338        );
2339        let mut stmt = self.conn.prepare(&sql)?;
2340        let params: Vec<&dyn rusqlite::ToSql> =
2341            ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2342        let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2343        let mut map = std::collections::HashMap::new();
2344        for row in rows {
2345            let r = row?;
2346            map.entry(r.session_id.clone()).or_insert(r);
2347        }
2348        Ok(map)
2349    }
2350
2351    pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2352        self.conn.execute(
2353            "INSERT INTO session_outcomes (
2354                session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2355                revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2356            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2357            ON CONFLICT(session_id) DO UPDATE SET
2358                test_passed=excluded.test_passed,
2359                test_failed=excluded.test_failed,
2360                test_skipped=excluded.test_skipped,
2361                build_ok=excluded.build_ok,
2362                lint_errors=excluded.lint_errors,
2363                revert_lines_14d=excluded.revert_lines_14d,
2364                pr_open=excluded.pr_open,
2365                ci_ok=excluded.ci_ok,
2366                measured_at_ms=excluded.measured_at_ms,
2367                measure_error=excluded.measure_error",
2368            params![
2369                row.session_id,
2370                row.test_passed,
2371                row.test_failed,
2372                row.test_skipped,
2373                row.build_ok.map(bool_to_i64),
2374                row.lint_errors,
2375                row.revert_lines_14d,
2376                row.pr_open,
2377                row.ci_ok.map(bool_to_i64),
2378                row.measured_at_ms as i64,
2379                row.measure_error.as_deref(),
2380            ],
2381        )?;
2382        Ok(())
2383    }
2384
2385    pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2386        let mut stmt = self.conn.prepare(
2387            "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2388                    revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2389             FROM session_outcomes WHERE session_id = ?1",
2390        )?;
2391        let row = stmt
2392            .query_row(params![session_id], outcome_row)
2393            .optional()?;
2394        Ok(row)
2395    }
2396
2397    /// Outcomes for sessions in `workspace` whose `started_at` falls in the window.
2398    pub fn list_session_outcomes_in_window(
2399        &self,
2400        workspace: &str,
2401        start_ms: u64,
2402        end_ms: u64,
2403    ) -> Result<Vec<SessionOutcomeRow>> {
2404        let mut stmt = self.conn.prepare(
2405            "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2406                    o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2407             FROM session_outcomes o
2408             JOIN sessions s ON s.id = o.session_id
2409             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2410             ORDER BY o.measured_at_ms ASC",
2411        )?;
2412        let rows = stmt.query_map(
2413            params![workspace, start_ms as i64, end_ms as i64],
2414            outcome_row,
2415        )?;
2416        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2417    }
2418
2419    pub fn append_session_sample(
2420        &self,
2421        session_id: &str,
2422        ts_ms: u64,
2423        pid: u32,
2424        cpu_percent: Option<f64>,
2425        rss_bytes: Option<u64>,
2426    ) -> Result<()> {
2427        self.conn.execute(
2428            "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2429             VALUES (?1, ?2, ?3, ?4, ?5)",
2430            params![
2431                session_id,
2432                ts_ms as i64,
2433                pid as i64,
2434                cpu_percent,
2435                rss_bytes.map(|b| b as i64)
2436            ],
2437        )?;
2438        Ok(())
2439    }
2440
2441    /// Per-session maxima for retro heuristics.
2442    pub fn list_session_sample_aggs_in_window(
2443        &self,
2444        workspace: &str,
2445        start_ms: u64,
2446        end_ms: u64,
2447    ) -> Result<Vec<SessionSampleAgg>> {
2448        let mut stmt = self.conn.prepare(
2449            "SELECT ss.session_id, COUNT(*) AS n,
2450                    MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2451             FROM session_samples ss
2452             JOIN sessions s ON s.id = ss.session_id
2453             WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2454             GROUP BY ss.session_id",
2455        )?;
2456        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2457            let sid: String = r.get(0)?;
2458            let n: i64 = r.get(1)?;
2459            let max_cpu: Option<f64> = r.get(2)?;
2460            let max_rss: Option<i64> = r.get(3)?;
2461            Ok(SessionSampleAgg {
2462                session_id: sid,
2463                sample_count: n as u64,
2464                max_cpu_percent: max_cpu.unwrap_or(0.0),
2465                max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2466            })
2467        })?;
2468        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2469    }
2470
2471    pub fn list_sessions_for_eval(
2472        &self,
2473        since_ms: u64,
2474        min_cost_usd: f64,
2475    ) -> Result<Vec<crate::core::event::SessionRecord>> {
2476        let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2477        let mut stmt = self.conn.prepare(
2478            "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2479                    s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2480                    s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2481                    s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2482             FROM sessions s
2483             WHERE s.started_at_ms >= ?1
2484               AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2485               AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2486             ORDER BY s.started_at_ms DESC",
2487        )?;
2488        let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2489            Ok((
2490                r.get::<_, String>(0)?,
2491                r.get::<_, String>(1)?,
2492                r.get::<_, Option<String>>(2)?,
2493                r.get::<_, String>(3)?,
2494                r.get::<_, i64>(4)?,
2495                r.get::<_, Option<i64>>(5)?,
2496                r.get::<_, String>(6)?,
2497                r.get::<_, String>(7)?,
2498                r.get::<_, Option<String>>(8)?,
2499                r.get::<_, Option<String>>(9)?,
2500                r.get::<_, Option<String>>(10)?,
2501                r.get::<_, Option<i64>>(11)?,
2502                r.get::<_, Option<i64>>(12)?,
2503                r.get::<_, Option<String>>(13)?,
2504                r.get::<_, Option<String>>(14)?,
2505                r.get::<_, Option<String>>(15)?,
2506                r.get::<_, Option<String>>(16)?,
2507                r.get::<_, Option<String>>(17)?,
2508                r.get::<_, Option<String>>(18)?,
2509                r.get::<_, Option<i64>>(19)?,
2510                r.get::<_, Option<i64>>(20)?,
2511            ))
2512        })?;
2513        let mut out = Vec::new();
2514        for row in rows {
2515            let (
2516                id,
2517                agent,
2518                model,
2519                workspace,
2520                started,
2521                ended,
2522                status_str,
2523                trace,
2524                start_commit,
2525                end_commit,
2526                branch,
2527                dirty_start,
2528                dirty_end,
2529                source,
2530                prompt_fingerprint,
2531                parent_session_id,
2532                agent_version,
2533                os,
2534                arch,
2535                repo_file_count,
2536                repo_total_loc,
2537            ) = row?;
2538            out.push(crate::core::event::SessionRecord {
2539                id,
2540                agent,
2541                model,
2542                workspace,
2543                started_at_ms: started as u64,
2544                ended_at_ms: ended.map(|v| v as u64),
2545                status: status_from_str(&status_str),
2546                trace_path: trace,
2547                start_commit,
2548                end_commit,
2549                branch,
2550                dirty_start: dirty_start.map(i64_to_bool),
2551                dirty_end: dirty_end.map(i64_to_bool),
2552                repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2553                prompt_fingerprint,
2554                parent_session_id,
2555                agent_version,
2556                os,
2557                arch,
2558                repo_file_count: repo_file_count.map(|v| v as u32),
2559                repo_total_loc: repo_total_loc.map(|v| v as u64),
2560            });
2561        }
2562        Ok(out)
2563    }
2564
2565    pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2566        self.conn.execute(
2567            "INSERT OR IGNORE INTO prompt_snapshots
2568             (fingerprint, captured_at_ms, files_json, total_bytes)
2569             VALUES (?1, ?2, ?3, ?4)",
2570            params![
2571                snap.fingerprint,
2572                snap.captured_at_ms as i64,
2573                snap.files_json,
2574                snap.total_bytes as i64
2575            ],
2576        )?;
2577        Ok(())
2578    }
2579
2580    pub fn get_prompt_snapshot(
2581        &self,
2582        fingerprint: &str,
2583    ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2584        self.conn
2585            .query_row(
2586                "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2587                 FROM prompt_snapshots WHERE fingerprint = ?1",
2588                params![fingerprint],
2589                |r| {
2590                    Ok(crate::prompt::PromptSnapshot {
2591                        fingerprint: r.get(0)?,
2592                        captured_at_ms: r.get::<_, i64>(1)? as u64,
2593                        files_json: r.get(2)?,
2594                        total_bytes: r.get::<_, i64>(3)? as u64,
2595                    })
2596                },
2597            )
2598            .optional()
2599            .map_err(Into::into)
2600    }
2601
2602    pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2603        let mut stmt = self.conn.prepare(
2604            "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2605             FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2606        )?;
2607        let rows = stmt.query_map([], |r| {
2608            Ok(crate::prompt::PromptSnapshot {
2609                fingerprint: r.get(0)?,
2610                captured_at_ms: r.get::<_, i64>(1)? as u64,
2611                files_json: r.get(2)?,
2612                total_bytes: r.get::<_, i64>(3)? as u64,
2613            })
2614        })?;
2615        Ok(rows.filter_map(|r| r.ok()).collect())
2616    }
2617
2618    /// Sessions with a non-null prompt_fingerprint in the given window.
2619    pub fn sessions_with_prompt_fingerprint(
2620        &self,
2621        workspace: &str,
2622        start_ms: u64,
2623        end_ms: u64,
2624    ) -> Result<Vec<(String, String)>> {
2625        let mut stmt = self.conn.prepare(
2626            "SELECT id, prompt_fingerprint FROM sessions
2627             WHERE workspace = ?1
2628               AND started_at_ms >= ?2 AND started_at_ms < ?3
2629               AND prompt_fingerprint IS NOT NULL",
2630        )?;
2631        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2632            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2633        })?;
2634        Ok(rows.filter_map(|r| r.ok()).collect())
2635    }
2636}
2637
2638impl Drop for Store {
2639    fn drop(&mut self) {
2640        if let Some(writer) = self.search_writer.get_mut().as_mut() {
2641            let _ = writer.commit();
2642        }
2643    }
2644}
2645
2646fn now_ms() -> u64 {
2647    std::time::SystemTime::now()
2648        .duration_since(std::time::UNIX_EPOCH)
2649        .unwrap_or_default()
2650        .as_millis() as u64
2651}
2652
2653fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
2654    let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
2655    let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
2656    Ok(rows.filter_map(|r| r.ok()).collect())
2657}
2658
2659fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2660    raw.and_then(|s| s.trim().parse::<u64>().ok())
2661        .unwrap_or(DEFAULT_MMAP_MB)
2662        .saturating_mul(1024)
2663        .saturating_mul(1024)
2664        .min(i64::MAX as u64) as i64
2665}
2666
2667fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2668    let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2669    conn.execute_batch(&format!(
2670        "
2671        PRAGMA journal_mode=WAL;
2672        PRAGMA busy_timeout=5000;
2673        PRAGMA synchronous=NORMAL;
2674        PRAGMA cache_size=-65536;
2675        PRAGMA mmap_size={mmap_size};
2676        PRAGMA temp_store=MEMORY;
2677        PRAGMA wal_autocheckpoint=1000;
2678        "
2679    ))?;
2680    if mode == StoreOpenMode::ReadOnlyQuery {
2681        conn.execute_batch("PRAGMA query_only=ON;")?;
2682    }
2683    Ok(())
2684}
2685
2686fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2687    Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2688}
2689
2690fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2691    let status_str: String = row.get(6)?;
2692    Ok(SessionRecord {
2693        id: row.get(0)?,
2694        agent: row.get(1)?,
2695        model: row.get(2)?,
2696        workspace: row.get(3)?,
2697        started_at_ms: row.get::<_, i64>(4)? as u64,
2698        ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2699        status: status_from_str(&status_str),
2700        trace_path: row.get(7)?,
2701        start_commit: row.get(8)?,
2702        end_commit: row.get(9)?,
2703        branch: row.get(10)?,
2704        dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2705        dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2706        repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2707        prompt_fingerprint: row.get(14)?,
2708        parent_session_id: row.get(15)?,
2709        agent_version: row.get(16)?,
2710        os: row.get(17)?,
2711        arch: row.get(18)?,
2712        repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2713        repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2714    })
2715}
2716
2717fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2718    let payload_str: String = row.get(12)?;
2719    Ok(Event {
2720        session_id: row.get(0)?,
2721        seq: row.get::<_, i64>(1)? as u64,
2722        ts_ms: row.get::<_, i64>(2)? as u64,
2723        ts_exact: row.get::<_, i64>(3)? != 0,
2724        kind: kind_from_str(&row.get::<_, String>(4)?),
2725        source: source_from_str(&row.get::<_, String>(5)?),
2726        tool: row.get(6)?,
2727        tool_call_id: row.get(7)?,
2728        tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2729        tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2730        reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2731        cost_usd_e6: row.get(11)?,
2732        payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2733        stop_reason: row.get(13)?,
2734        latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2735        ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2736        retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2737        context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2738        context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2739        cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2740        cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2741        system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2742    })
2743}
2744
2745fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2746    let mut clauses = vec!["workspace = ?".to_string()];
2747    let mut args = vec![Value::Text(workspace.to_string())];
2748    if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2749        clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2750        args.push(Value::Text(format!("{}%", escape_like(prefix))));
2751    }
2752    if let Some(status) = &filter.status {
2753        clauses.push("status = ?".to_string());
2754        args.push(Value::Text(format!("{status:?}")));
2755    }
2756    if let Some(since_ms) = filter.since_ms {
2757        clauses.push("started_at_ms >= ?".to_string());
2758        args.push(Value::Integer(since_ms as i64));
2759    }
2760    (format!("WHERE {}", clauses.join(" AND ")), args)
2761}
2762
2763fn escape_like(raw: &str) -> String {
2764    raw.to_lowercase()
2765        .replace('\\', "\\\\")
2766        .replace('%', "\\%")
2767        .replace('_', "\\_")
2768}
2769
2770fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2771    let cost: i64 = conn.query_row(
2772        "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2773        params![workspace], |r| r.get(0),
2774    )?;
2775    let with_cost: i64 = conn.query_row(
2776        "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2777        params![workspace], |r| r.get(0),
2778    )?;
2779    Ok((cost, with_cost as u64))
2780}
2781
2782fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2783    let build_raw: Option<i64> = r.get(4)?;
2784    let ci_raw: Option<i64> = r.get(8)?;
2785    Ok(SessionOutcomeRow {
2786        session_id: r.get(0)?,
2787        test_passed: r.get(1)?,
2788        test_failed: r.get(2)?,
2789        test_skipped: r.get(3)?,
2790        build_ok: build_raw.map(|v| v != 0),
2791        lint_errors: r.get(5)?,
2792        revert_lines_14d: r.get(6)?,
2793        pr_open: r.get(7)?,
2794        ci_ok: ci_raw.map(|v| v != 0),
2795        measured_at_ms: r.get::<_, i64>(9)? as u64,
2796        measure_error: r.get(10)?,
2797    })
2798}
2799
2800fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2801    use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2802    let score = r
2803        .get::<_, Option<i64>>(2)?
2804        .and_then(|v| FeedbackScore::new(v as u8));
2805    let label = r
2806        .get::<_, Option<String>>(3)?
2807        .and_then(|s| FeedbackLabel::from_str_opt(&s));
2808    Ok(FeedbackRecord {
2809        id: r.get(0)?,
2810        session_id: r.get(1)?,
2811        score,
2812        label,
2813        note: r.get(4)?,
2814        created_at_ms: r.get::<_, i64>(5)? as u64,
2815    })
2816}
2817
2818fn day_label(day_idx: u64) -> &'static str {
2819    ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2820}
2821
2822fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2823    let week_ago = now.saturating_sub(7 * 86_400_000);
2824    let mut stmt = conn
2825        .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2826    let days: Vec<u64> = stmt
2827        .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2828        .filter_map(|r| r.ok())
2829        .map(|v| v as u64 / 86_400_000)
2830        .collect();
2831    let today = now / 86_400_000;
2832    Ok((0u64..7)
2833        .map(|i| {
2834            let d = today.saturating_sub(6 - i);
2835            (
2836                day_label(d).to_string(),
2837                days.iter().filter(|&&x| x == d).count() as u64,
2838            )
2839        })
2840        .collect())
2841}
2842
2843fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2844    let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2845               s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2846               s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2847               s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2848               COUNT(e.id) FROM sessions s \
2849               LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2850               GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2851    let mut stmt = conn.prepare(sql)?;
2852    let out: Vec<(SessionRecord, u64)> = stmt
2853        .query_map(params![workspace], |r| {
2854            let st: String = r.get(6)?;
2855            Ok((
2856                SessionRecord {
2857                    id: r.get(0)?,
2858                    agent: r.get(1)?,
2859                    model: r.get(2)?,
2860                    workspace: r.get(3)?,
2861                    started_at_ms: r.get::<_, i64>(4)? as u64,
2862                    ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2863                    status: status_from_str(&st),
2864                    trace_path: r.get(7)?,
2865                    start_commit: r.get(8)?,
2866                    end_commit: r.get(9)?,
2867                    branch: r.get(10)?,
2868                    dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2869                    dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2870                    repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2871                    prompt_fingerprint: r.get(14)?,
2872                    parent_session_id: r.get(15)?,
2873                    agent_version: r.get(16)?,
2874                    os: r.get(17)?,
2875                    arch: r.get(18)?,
2876                    repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2877                    repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2878                },
2879                r.get::<_, i64>(21)? as u64,
2880            ))
2881        })?
2882        .filter_map(|r| r.ok())
2883        .collect();
2884    Ok(out)
2885}
2886
2887fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2888    let mut stmt = conn.prepare(
2889        "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2890         WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2891    )?;
2892    let out: Vec<(String, u64)> = stmt
2893        .query_map(params![workspace], |r| {
2894            Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2895        })?
2896        .filter_map(|r| r.ok())
2897        .collect();
2898    Ok(out)
2899}
2900
2901fn status_from_str(s: &str) -> SessionStatus {
2902    match s {
2903        "Running" => SessionStatus::Running,
2904        "Waiting" => SessionStatus::Waiting,
2905        "Idle" => SessionStatus::Idle,
2906        _ => SessionStatus::Done,
2907    }
2908}
2909
2910fn projector_legacy_mode() -> bool {
2911    std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
2912}
2913
2914fn is_stop_event(e: &Event) -> bool {
2915    if !matches!(e.kind, EventKind::Hook) {
2916        return false;
2917    }
2918    e.payload
2919        .get("event")
2920        .and_then(|v| v.as_str())
2921        .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
2922        == Some("Stop")
2923}
2924
2925fn kind_from_str(s: &str) -> EventKind {
2926    match s {
2927        "ToolCall" => EventKind::ToolCall,
2928        "ToolResult" => EventKind::ToolResult,
2929        "Message" => EventKind::Message,
2930        "Error" => EventKind::Error,
2931        "Cost" => EventKind::Cost,
2932        "Hook" => EventKind::Hook,
2933        "Lifecycle" => EventKind::Lifecycle,
2934        _ => EventKind::Hook,
2935    }
2936}
2937
2938fn source_from_str(s: &str) -> EventSource {
2939    match s {
2940        "Tail" => EventSource::Tail,
2941        "Hook" => EventSource::Hook,
2942        _ => EventSource::Proxy,
2943    }
2944}
2945
2946fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2947    ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2948    ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2949    ensure_column(conn, "sessions", "branch", "TEXT")?;
2950    ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2951    ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2952    ensure_column(
2953        conn,
2954        "sessions",
2955        "repo_binding_source",
2956        "TEXT NOT NULL DEFAULT ''",
2957    )?;
2958    ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2959    ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2960    ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2961    ensure_column(conn, "events", "stop_reason", "TEXT")?;
2962    ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2963    ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2964    ensure_column(conn, "events", "retry_count", "INTEGER")?;
2965    ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2966    ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2967    ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2968    ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2969    ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2970    ensure_column(
2971        conn,
2972        "sync_outbox",
2973        "kind",
2974        "TEXT NOT NULL DEFAULT 'events'",
2975    )?;
2976    ensure_column(
2977        conn,
2978        "experiments",
2979        "state",
2980        "TEXT NOT NULL DEFAULT 'Draft'",
2981    )?;
2982    ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2983    ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2984    ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2985    ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2986    ensure_column(conn, "sessions", "os", "TEXT")?;
2987    ensure_column(conn, "sessions", "arch", "TEXT")?;
2988    ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2989    ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2990    ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2991    ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2992    ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2993    ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2994    conn.execute_batch(
2995        "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2996         CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2997    )?;
2998    Ok(())
2999}
3000
3001fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3002    if has_column(conn, table, column)? {
3003        return Ok(());
3004    }
3005    conn.execute(
3006        &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
3007        [],
3008    )?;
3009    Ok(())
3010}
3011
3012fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3013    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3014    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3015    Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3016}
3017
3018fn bool_to_i64(v: bool) -> i64 {
3019    if v { 1 } else { 0 }
3020}
3021
3022fn i64_to_bool(v: i64) -> bool {
3023    v != 0
3024}
3025
3026fn empty_to_none(s: String) -> Option<String> {
3027    if s.is_empty() { None } else { Some(s) }
3028}
3029
3030#[cfg(test)]
3031mod tests {
3032    use super::*;
3033    use serde_json::json;
3034    use std::collections::HashSet;
3035    use tempfile::TempDir;
3036
3037    fn make_session(id: &str) -> SessionRecord {
3038        SessionRecord {
3039            id: id.to_string(),
3040            agent: "cursor".to_string(),
3041            model: None,
3042            workspace: "/ws".to_string(),
3043            started_at_ms: 1000,
3044            ended_at_ms: None,
3045            status: SessionStatus::Done,
3046            trace_path: "/trace".to_string(),
3047            start_commit: None,
3048            end_commit: None,
3049            branch: None,
3050            dirty_start: None,
3051            dirty_end: None,
3052            repo_binding_source: None,
3053            prompt_fingerprint: None,
3054            parent_session_id: None,
3055            agent_version: None,
3056            os: None,
3057            arch: None,
3058            repo_file_count: None,
3059            repo_total_loc: None,
3060        }
3061    }
3062
3063    fn make_event(session_id: &str, seq: u64) -> Event {
3064        Event {
3065            session_id: session_id.to_string(),
3066            seq,
3067            ts_ms: 1000 + seq * 100,
3068            ts_exact: false,
3069            kind: EventKind::ToolCall,
3070            source: EventSource::Tail,
3071            tool: Some("read_file".to_string()),
3072            tool_call_id: Some(format!("call_{seq}")),
3073            tokens_in: None,
3074            tokens_out: None,
3075            reasoning_tokens: None,
3076            cost_usd_e6: None,
3077            stop_reason: None,
3078            latency_ms: None,
3079            ttft_ms: None,
3080            retry_count: None,
3081            context_used_tokens: None,
3082            context_max_tokens: None,
3083            cache_creation_tokens: None,
3084            cache_read_tokens: None,
3085            system_prompt_tokens: None,
3086            payload: json!({}),
3087        }
3088    }
3089
3090    #[test]
3091    fn open_and_wal_mode() {
3092        let dir = TempDir::new().unwrap();
3093        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3094        let mode: String = store
3095            .conn
3096            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3097            .unwrap();
3098        assert_eq!(mode, "wal");
3099    }
3100
3101    #[test]
3102    fn open_applies_phase0_pragmas() {
3103        let dir = TempDir::new().unwrap();
3104        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3105        let synchronous: i64 = store
3106            .conn
3107            .query_row("PRAGMA synchronous", [], |r| r.get(0))
3108            .unwrap();
3109        let cache_size: i64 = store
3110            .conn
3111            .query_row("PRAGMA cache_size", [], |r| r.get(0))
3112            .unwrap();
3113        let temp_store: i64 = store
3114            .conn
3115            .query_row("PRAGMA temp_store", [], |r| r.get(0))
3116            .unwrap();
3117        let wal_autocheckpoint: i64 = store
3118            .conn
3119            .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3120            .unwrap();
3121        assert_eq!(synchronous, 1);
3122        assert_eq!(cache_size, -65_536);
3123        assert_eq!(temp_store, 2);
3124        assert_eq!(wal_autocheckpoint, 1_000);
3125        assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3126    }
3127
3128    #[test]
3129    fn read_only_open_sets_query_only() {
3130        let dir = TempDir::new().unwrap();
3131        let db = dir.path().join("kaizen.db");
3132        Store::open(&db).unwrap();
3133        let store = Store::open_read_only(&db).unwrap();
3134        let query_only: i64 = store
3135            .conn
3136            .query_row("PRAGMA query_only", [], |r| r.get(0))
3137            .unwrap();
3138        assert_eq!(query_only, 1);
3139    }
3140
3141    #[test]
3142    fn phase0_indexes_exist() {
3143        let dir = TempDir::new().unwrap();
3144        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3145        for name in [
3146            "tool_spans_session_idx",
3147            "tool_spans_started_idx",
3148            "session_samples_ts_idx",
3149            "events_ts_idx",
3150            "feedback_session_idx",
3151        ] {
3152            let found: i64 = store
3153                .conn
3154                .query_row(
3155                    "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3156                    params![name],
3157                    |r| r.get(0),
3158                )
3159                .unwrap();
3160            assert_eq!(found, 1, "{name}");
3161        }
3162    }
3163
3164    #[test]
3165    fn upsert_and_get_session() {
3166        let dir = TempDir::new().unwrap();
3167        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3168        let s = make_session("s1");
3169        store.upsert_session(&s).unwrap();
3170
3171        let got = store.get_session("s1").unwrap().unwrap();
3172        assert_eq!(got.id, "s1");
3173        assert_eq!(got.status, SessionStatus::Done);
3174    }
3175
3176    #[test]
3177    fn append_and_list_events_round_trip() {
3178        let dir = TempDir::new().unwrap();
3179        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3180        let s = make_session("s2");
3181        store.upsert_session(&s).unwrap();
3182        store.append_event(&make_event("s2", 0)).unwrap();
3183        store.append_event(&make_event("s2", 1)).unwrap();
3184
3185        let sessions = store.list_sessions("/ws").unwrap();
3186        assert_eq!(sessions.len(), 1);
3187        assert_eq!(sessions[0].id, "s2");
3188    }
3189
3190    #[test]
3191    fn list_sessions_page_orders_and_counts() {
3192        let dir = TempDir::new().unwrap();
3193        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3194        let mut a = make_session("a");
3195        a.started_at_ms = 2_000;
3196        let mut b = make_session("b");
3197        b.started_at_ms = 2_000;
3198        let mut c = make_session("c");
3199        c.started_at_ms = 1_000;
3200        store.upsert_session(&c).unwrap();
3201        store.upsert_session(&b).unwrap();
3202        store.upsert_session(&a).unwrap();
3203
3204        let page = store
3205            .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3206            .unwrap();
3207        assert_eq!(page.total, 3);
3208        assert_eq!(page.next_offset, Some(2));
3209        assert_eq!(
3210            page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3211            vec!["a", "b"]
3212        );
3213
3214        let all = store.list_sessions("/ws").unwrap();
3215        assert_eq!(
3216            all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3217            vec!["a", "b", "c"]
3218        );
3219    }
3220
3221    #[test]
3222    fn list_sessions_page_filters_in_sql_shape() {
3223        let dir = TempDir::new().unwrap();
3224        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3225        let mut cursor = make_session("cursor");
3226        cursor.agent = "Cursor".into();
3227        cursor.started_at_ms = 2_000;
3228        cursor.status = SessionStatus::Running;
3229        let mut claude = make_session("claude");
3230        claude.agent = "claude".into();
3231        claude.started_at_ms = 3_000;
3232        store.upsert_session(&cursor).unwrap();
3233        store.upsert_session(&claude).unwrap();
3234
3235        let page = store
3236            .list_sessions_page(
3237                "/ws",
3238                0,
3239                10,
3240                SessionFilter {
3241                    agent_prefix: Some("cur".into()),
3242                    status: Some(SessionStatus::Running),
3243                    since_ms: Some(1_500),
3244                },
3245            )
3246            .unwrap();
3247        assert_eq!(page.total, 1);
3248        assert_eq!(page.rows[0].id, "cursor");
3249    }
3250
3251    #[test]
3252    fn incremental_session_helpers_find_new_rows_and_statuses() {
3253        let dir = TempDir::new().unwrap();
3254        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3255        let mut old = make_session("old");
3256        old.started_at_ms = 1_000;
3257        let mut new = make_session("new");
3258        new.started_at_ms = 2_000;
3259        new.status = SessionStatus::Running;
3260        store.upsert_session(&old).unwrap();
3261        store.upsert_session(&new).unwrap();
3262
3263        let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3264        assert_eq!(rows.len(), 1);
3265        assert_eq!(rows[0].id, "new");
3266
3267        store
3268            .update_session_status("new", SessionStatus::Done)
3269            .unwrap();
3270        let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3271        assert_eq!(statuses.len(), 1);
3272        assert_eq!(statuses[0].status, SessionStatus::Done);
3273    }
3274
3275    #[test]
3276    fn summary_stats_empty() {
3277        let dir = TempDir::new().unwrap();
3278        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3279        let stats = store.summary_stats("/ws").unwrap();
3280        assert_eq!(stats.session_count, 0);
3281        assert_eq!(stats.total_cost_usd_e6, 0);
3282    }
3283
3284    #[test]
3285    fn summary_stats_counts_sessions() {
3286        let dir = TempDir::new().unwrap();
3287        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3288        store.upsert_session(&make_session("a")).unwrap();
3289        store.upsert_session(&make_session("b")).unwrap();
3290        let stats = store.summary_stats("/ws").unwrap();
3291        assert_eq!(stats.session_count, 2);
3292        assert_eq!(stats.by_agent.len(), 1);
3293        assert_eq!(stats.by_agent[0].0, "cursor");
3294        assert_eq!(stats.by_agent[0].1, 2);
3295    }
3296
3297    #[test]
3298    fn list_events_for_session_round_trip() {
3299        let dir = TempDir::new().unwrap();
3300        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3301        store.upsert_session(&make_session("s4")).unwrap();
3302        store.append_event(&make_event("s4", 0)).unwrap();
3303        store.append_event(&make_event("s4", 1)).unwrap();
3304        let events = store.list_events_for_session("s4").unwrap();
3305        assert_eq!(events.len(), 2);
3306        assert_eq!(events[0].seq, 0);
3307        assert_eq!(events[1].seq, 1);
3308    }
3309
3310    #[test]
3311    fn list_events_page_uses_inclusive_seq_cursor() {
3312        let dir = TempDir::new().unwrap();
3313        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3314        store.upsert_session(&make_session("paged")).unwrap();
3315        for seq in 0..5 {
3316            store.append_event(&make_event("paged", seq)).unwrap();
3317        }
3318        let first = store.list_events_page("paged", 0, 2).unwrap();
3319        assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3320        let second = store
3321            .list_events_page("paged", first[1].seq + 1, 2)
3322            .unwrap();
3323        assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3324    }
3325
3326    #[test]
3327    fn append_event_dedup() {
3328        let dir = TempDir::new().unwrap();
3329        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3330        store.upsert_session(&make_session("s5")).unwrap();
3331        store.append_event(&make_event("s5", 0)).unwrap();
3332        // Duplicate — should be silently ignored
3333        store.append_event(&make_event("s5", 0)).unwrap();
3334        let events = store.list_events_for_session("s5").unwrap();
3335        assert_eq!(events.len(), 1);
3336    }
3337
3338    #[test]
3339    fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3340        let dir = TempDir::new().unwrap();
3341        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3342        assert!(store.session_span_tree("missing").unwrap().is_empty());
3343        assert!(store.span_tree_cache.borrow().is_some());
3344
3345        store.upsert_session(&make_session("tree")).unwrap();
3346        let call = make_event("tree", 0);
3347        store.append_event(&call).unwrap();
3348        assert!(store.span_tree_cache.borrow().is_none());
3349        assert!(store.session_span_tree("tree").unwrap().is_empty());
3350        assert!(store.span_tree_cache.borrow().is_some());
3351        let mut result = make_event("tree", 1);
3352        result.kind = EventKind::ToolResult;
3353        result.tool_call_id = call.tool_call_id.clone();
3354        store.append_event(&result).unwrap();
3355        assert!(store.span_tree_cache.borrow().is_none());
3356        let first = store.session_span_tree("tree").unwrap();
3357        assert_eq!(first.len(), 1);
3358        assert!(store.span_tree_cache.borrow().is_some());
3359        store.append_event(&make_event("tree", 2)).unwrap();
3360        assert!(store.span_tree_cache.borrow().is_none());
3361    }
3362
3363    #[test]
3364    fn tool_spans_in_window_uses_started_then_ended_fallback() {
3365        let dir = TempDir::new().unwrap();
3366        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3367        store.upsert_session(&make_session("spans")).unwrap();
3368        for (id, started, ended) in [
3369            ("started", Some(200_i64), None),
3370            ("fallback", None, Some(250_i64)),
3371            ("outside", Some(400_i64), None),
3372            ("too_old", None, Some(50_i64)),
3373            ("started_wins", Some(500_i64), Some(200_i64)),
3374        ] {
3375            store
3376                .conn
3377                .execute(
3378                    "INSERT INTO tool_spans
3379                     (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3380                     VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3381                    params![id, started, ended],
3382                )
3383                .unwrap();
3384        }
3385        let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3386        let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3387        assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3388    }
3389
3390    #[test]
3391    fn upsert_idempotent() {
3392        let dir = TempDir::new().unwrap();
3393        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3394        let mut s = make_session("s3");
3395        store.upsert_session(&s).unwrap();
3396        s.status = SessionStatus::Running;
3397        store.upsert_session(&s).unwrap();
3398
3399        let got = store.get_session("s3").unwrap().unwrap();
3400        assert_eq!(got.status, SessionStatus::Running);
3401    }
3402
3403    #[test]
3404    fn append_event_indexes_path_from_payload() {
3405        let dir = TempDir::new().unwrap();
3406        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3407        store.upsert_session(&make_session("sx")).unwrap();
3408        let mut ev = make_event("sx", 0);
3409        ev.payload = json!({"input": {"path": "src/lib.rs"}});
3410        store.append_event(&ev).unwrap();
3411        let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3412        assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3413    }
3414
3415    #[test]
3416    fn update_session_status_changes_status() {
3417        let dir = TempDir::new().unwrap();
3418        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3419        store.upsert_session(&make_session("s6")).unwrap();
3420        store
3421            .update_session_status("s6", SessionStatus::Running)
3422            .unwrap();
3423        let got = store.get_session("s6").unwrap().unwrap();
3424        assert_eq!(got.status, SessionStatus::Running);
3425    }
3426
3427    #[test]
3428    fn prune_sessions_removes_old_rows_and_keeps_recent() {
3429        let dir = TempDir::new().unwrap();
3430        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3431        let mut old = make_session("old");
3432        old.started_at_ms = 1_000;
3433        let mut new = make_session("new");
3434        new.started_at_ms = 9_000_000_000_000;
3435        store.upsert_session(&old).unwrap();
3436        store.upsert_session(&new).unwrap();
3437        store.append_event(&make_event("old", 0)).unwrap();
3438
3439        let stats = store.prune_sessions_started_before(5_000).unwrap();
3440        assert_eq!(
3441            stats,
3442            PruneStats {
3443                sessions_removed: 1,
3444                events_removed: 1,
3445            }
3446        );
3447        assert!(store.get_session("old").unwrap().is_none());
3448        assert!(store.get_session("new").unwrap().is_some());
3449        let sessions = store.list_sessions("/ws").unwrap();
3450        assert_eq!(sessions.len(), 1);
3451        assert_eq!(sessions[0].id, "new");
3452    }
3453
3454    #[test]
3455    fn append_event_indexes_rules_from_payload() {
3456        let dir = TempDir::new().unwrap();
3457        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3458        store.upsert_session(&make_session("sr")).unwrap();
3459        let mut ev = make_event("sr", 0);
3460        ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3461        store.append_event(&ev).unwrap();
3462        let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3463        assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3464    }
3465
3466    #[test]
3467    fn guidance_report_counts_skill_and_rule_sessions() {
3468        let dir = TempDir::new().unwrap();
3469        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3470        store.upsert_session(&make_session("sx")).unwrap();
3471        let mut ev = make_event("sx", 0);
3472        ev.payload =
3473            json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3474        ev.cost_usd_e6 = Some(500_000);
3475        store.append_event(&ev).unwrap();
3476
3477        let mut skill_slugs = HashSet::new();
3478        skill_slugs.insert("tdd".into());
3479        let mut rule_slugs = HashSet::new();
3480        rule_slugs.insert("style".into());
3481
3482        let rep = store
3483            .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3484            .unwrap();
3485        assert_eq!(rep.sessions_in_window, 1);
3486        let tdd = rep
3487            .rows
3488            .iter()
3489            .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3490            .unwrap();
3491        assert_eq!(tdd.sessions, 1);
3492        assert!(tdd.on_disk);
3493        let style = rep
3494            .rows
3495            .iter()
3496            .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3497            .unwrap();
3498        assert_eq!(style.sessions, 1);
3499        assert!(style.on_disk);
3500    }
3501
3502    #[test]
3503    fn prune_sessions_removes_rules_used_rows() {
3504        let dir = TempDir::new().unwrap();
3505        let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3506        let mut old = make_session("old_r");
3507        old.started_at_ms = 1_000;
3508        store.upsert_session(&old).unwrap();
3509        let mut ev = make_event("old_r", 0);
3510        ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3511        store.append_event(&ev).unwrap();
3512
3513        store.prune_sessions_started_before(5_000).unwrap();
3514        let n: i64 = store
3515            .conn
3516            .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3517            .unwrap();
3518        assert_eq!(n, 0);
3519    }
3520}