1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8 FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
16use crate::sync::context::SyncIngestContext;
17use crate::sync::outbound::outbound_event_from_row;
18use crate::sync::redact::redact_payload;
19use crate::sync::smart::enqueue_tool_spans_for_session;
20use anyhow::{Context, Result};
21use rusqlite::types::Value;
22use rusqlite::{
23 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
24};
25use std::cell::RefCell;
26use std::collections::{HashMap, HashSet};
27use std::path::{Path, PathBuf};
28
29const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
32const DEFAULT_MMAP_MB: u64 = 256;
33const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
34 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
35 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
36 repo_file_count, repo_total_loc FROM sessions";
37const PAIN_HOTSPOTS_SQL: &str = "
38 SELECT f.path,
39 COUNT(s.id) * f.complexity_total AS value,
40 f.complexity_total,
41 f.churn_30d
42 FROM file_facts f
43 LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
44 LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
45 AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
46 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
47 LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
48 WHERE f.snapshot_id = ?1
49 GROUP BY f.path, f.complexity_total, f.churn_30d
50 ORDER BY value DESC, f.path ASC
51 LIMIT 10";
52const TOOL_RANK_ROWS_SQL: &str = "
53 WITH scoped AS (
54 SELECT COALESCE(ts.tool, 'unknown') AS tool,
55 ts.lead_time_ms,
56 COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
57 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
58 COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
59 FROM tool_spans ts
60 JOIN sessions s ON s.id = ts.session_id
61 WHERE s.workspace = ?1
62 AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
63 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
64 ),
65 agg AS (
66 SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
67 SUM(reasoning_tokens) AS total_reasoning_tokens
68 FROM scoped GROUP BY tool
69 ),
70 lat AS (
71 SELECT tool, lead_time_ms,
72 ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
73 COUNT(*) OVER (PARTITION BY tool) AS n
74 FROM scoped WHERE lead_time_ms IS NOT NULL
75 ),
76 pct AS (
77 SELECT tool,
78 MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
79 MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
80 FROM lat GROUP BY tool
81 )
82 SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
83 agg.total_tokens, agg.total_reasoning_tokens
84 FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
85
86const MIGRATIONS: &[&str] = &[
87 "CREATE TABLE IF NOT EXISTS sessions (
88 id TEXT PRIMARY KEY,
89 agent TEXT NOT NULL,
90 model TEXT,
91 workspace TEXT NOT NULL,
92 started_at_ms INTEGER NOT NULL,
93 ended_at_ms INTEGER,
94 status TEXT NOT NULL,
95 trace_path TEXT NOT NULL
96 )",
97 "CREATE TABLE IF NOT EXISTS events (
98 id INTEGER PRIMARY KEY AUTOINCREMENT,
99 session_id TEXT NOT NULL,
100 seq INTEGER NOT NULL,
101 ts_ms INTEGER NOT NULL,
102 kind TEXT NOT NULL,
103 source TEXT NOT NULL,
104 tool TEXT,
105 tokens_in INTEGER,
106 tokens_out INTEGER,
107 cost_usd_e6 INTEGER,
108 payload TEXT NOT NULL
109 )",
110 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
111 "CREATE TABLE IF NOT EXISTS files_touched (
112 id INTEGER PRIMARY KEY AUTOINCREMENT,
113 session_id TEXT NOT NULL,
114 path TEXT NOT NULL
115 )",
116 "CREATE TABLE IF NOT EXISTS skills_used (
117 id INTEGER PRIMARY KEY AUTOINCREMENT,
118 session_id TEXT NOT NULL,
119 skill TEXT NOT NULL
120 )",
121 "CREATE TABLE IF NOT EXISTS sync_outbox (
122 id INTEGER PRIMARY KEY AUTOINCREMENT,
123 session_id TEXT NOT NULL,
124 payload TEXT NOT NULL,
125 sent INTEGER NOT NULL DEFAULT 0
126 )",
127 "CREATE TABLE IF NOT EXISTS experiments (
128 id TEXT PRIMARY KEY,
129 name TEXT NOT NULL,
130 created_at_ms INTEGER NOT NULL,
131 metadata TEXT NOT NULL DEFAULT '{}'
132 )",
133 "CREATE TABLE IF NOT EXISTS experiment_tags (
134 experiment_id TEXT NOT NULL,
135 session_id TEXT NOT NULL,
136 variant TEXT NOT NULL,
137 PRIMARY KEY (experiment_id, session_id)
138 )",
139 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
140 "CREATE TABLE IF NOT EXISTS sync_state (
141 k TEXT PRIMARY KEY,
142 v TEXT NOT NULL
143 )",
144 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
145 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
146 "CREATE TABLE IF NOT EXISTS tool_spans (
147 span_id TEXT PRIMARY KEY,
148 session_id TEXT NOT NULL,
149 tool TEXT,
150 tool_call_id TEXT,
151 status TEXT NOT NULL,
152 started_at_ms INTEGER,
153 ended_at_ms INTEGER,
154 lead_time_ms INTEGER,
155 tokens_in INTEGER,
156 tokens_out INTEGER,
157 reasoning_tokens INTEGER,
158 cost_usd_e6 INTEGER,
159 paths_json TEXT NOT NULL DEFAULT '[]'
160 )",
161 "CREATE TABLE IF NOT EXISTS tool_span_paths (
162 span_id TEXT NOT NULL,
163 path TEXT NOT NULL,
164 PRIMARY KEY (span_id, path)
165 )",
166 "CREATE TABLE IF NOT EXISTS trace_spans (
167 span_id TEXT PRIMARY KEY,
168 trace_id TEXT NOT NULL,
169 parent_span_id TEXT,
170 session_id TEXT NOT NULL,
171 kind TEXT NOT NULL,
172 name TEXT NOT NULL,
173 status TEXT NOT NULL,
174 started_at_ms INTEGER,
175 ended_at_ms INTEGER,
176 duration_ms INTEGER,
177 model TEXT,
178 tool TEXT,
179 tokens_in INTEGER,
180 tokens_out INTEGER,
181 reasoning_tokens INTEGER,
182 cost_usd_e6 INTEGER,
183 context_used_tokens INTEGER,
184 context_max_tokens INTEGER,
185 payload TEXT NOT NULL DEFAULT '{}'
186 )",
187 "CREATE INDEX IF NOT EXISTS trace_spans_session_idx ON trace_spans(session_id)",
188 "CREATE INDEX IF NOT EXISTS trace_spans_trace_idx ON trace_spans(trace_id)",
189 "CREATE INDEX IF NOT EXISTS trace_spans_started_idx ON trace_spans(started_at_ms)",
190 "CREATE TABLE IF NOT EXISTS session_repo_binding (
191 session_id TEXT PRIMARY KEY,
192 start_commit TEXT,
193 end_commit TEXT,
194 branch TEXT,
195 dirty_start INTEGER,
196 dirty_end INTEGER,
197 repo_binding_source TEXT NOT NULL DEFAULT ''
198 )",
199 "CREATE TABLE IF NOT EXISTS repo_snapshots (
200 id TEXT PRIMARY KEY,
201 workspace TEXT NOT NULL,
202 head_commit TEXT,
203 dirty_fingerprint TEXT NOT NULL,
204 analyzer_version TEXT NOT NULL,
205 indexed_at_ms INTEGER NOT NULL,
206 dirty INTEGER NOT NULL DEFAULT 0,
207 graph_path TEXT NOT NULL
208 )",
209 "CREATE TABLE IF NOT EXISTS file_facts (
210 snapshot_id TEXT NOT NULL,
211 path TEXT NOT NULL,
212 language TEXT NOT NULL,
213 bytes INTEGER NOT NULL,
214 loc INTEGER NOT NULL,
215 sloc INTEGER NOT NULL,
216 complexity_total INTEGER NOT NULL,
217 max_fn_complexity INTEGER NOT NULL,
218 symbol_count INTEGER NOT NULL,
219 import_count INTEGER NOT NULL,
220 fan_in INTEGER NOT NULL,
221 fan_out INTEGER NOT NULL,
222 churn_30d INTEGER NOT NULL,
223 churn_90d INTEGER NOT NULL,
224 authors_90d INTEGER NOT NULL,
225 last_changed_ms INTEGER,
226 PRIMARY KEY (snapshot_id, path)
227 )",
228 "CREATE TABLE IF NOT EXISTS repo_edges (
229 snapshot_id TEXT NOT NULL,
230 from_id TEXT NOT NULL,
231 to_id TEXT NOT NULL,
232 kind TEXT NOT NULL,
233 weight INTEGER NOT NULL,
234 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
235 )",
236 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
238 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
240 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
241 ON sessions(workspace, started_at_ms DESC, id ASC)",
242 "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
243 ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
244 "CREATE TABLE IF NOT EXISTS rules_used (
245 id INTEGER PRIMARY KEY AUTOINCREMENT,
246 session_id TEXT NOT NULL,
247 rule TEXT NOT NULL
248 )",
249 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
250 "CREATE TABLE IF NOT EXISTS remote_pull_state (
252 id INTEGER PRIMARY KEY CHECK (id = 1),
253 query_provider TEXT NOT NULL DEFAULT 'none',
254 cursor_json TEXT NOT NULL DEFAULT '',
255 last_success_ms INTEGER
256 )",
257 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
258 "CREATE TABLE IF NOT EXISTS remote_sessions (
259 team_id TEXT NOT NULL,
260 workspace_hash TEXT NOT NULL,
261 session_id_hash TEXT NOT NULL,
262 json TEXT NOT NULL,
263 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
264 )",
265 "CREATE TABLE IF NOT EXISTS remote_events (
266 team_id TEXT NOT NULL,
267 workspace_hash TEXT NOT NULL,
268 session_id_hash TEXT NOT NULL,
269 event_seq INTEGER NOT NULL,
270 json TEXT NOT NULL,
271 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
272 )",
273 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
274 team_id TEXT NOT NULL,
275 workspace_hash TEXT NOT NULL,
276 span_id_hash TEXT NOT NULL,
277 json TEXT NOT NULL,
278 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
279 )",
280 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
281 team_id TEXT NOT NULL,
282 workspace_hash TEXT NOT NULL,
283 snapshot_id_hash TEXT NOT NULL,
284 chunk_index INTEGER NOT NULL,
285 json TEXT NOT NULL,
286 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
287 )",
288 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
289 team_id TEXT NOT NULL,
290 workspace_hash TEXT NOT NULL,
291 fact_key TEXT NOT NULL,
292 json TEXT NOT NULL,
293 PRIMARY KEY (team_id, workspace_hash, fact_key)
294 )",
295 "CREATE TABLE IF NOT EXISTS session_evals (
296 id TEXT PRIMARY KEY,
297 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
298 judge_model TEXT NOT NULL,
299 rubric_id TEXT NOT NULL,
300 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
301 rationale TEXT NOT NULL,
302 flagged INTEGER NOT NULL DEFAULT 0,
303 created_at_ms INTEGER NOT NULL
304 );
305 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
306 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
307 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
308 fingerprint TEXT PRIMARY KEY,
309 captured_at_ms INTEGER NOT NULL,
310 files_json TEXT NOT NULL,
311 total_bytes INTEGER NOT NULL
312 )",
313 "CREATE TABLE IF NOT EXISTS session_feedback (
314 id TEXT PRIMARY KEY,
315 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
316 score INTEGER CHECK(score BETWEEN 1 AND 5),
317 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
318 note TEXT,
319 created_at_ms INTEGER NOT NULL
320 );
321 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
322 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
323 "CREATE TABLE IF NOT EXISTS session_outcomes (
324 session_id TEXT PRIMARY KEY NOT NULL,
325 test_passed INTEGER,
326 test_failed INTEGER,
327 test_skipped INTEGER,
328 build_ok INTEGER,
329 lint_errors INTEGER,
330 revert_lines_14d INTEGER,
331 pr_open INTEGER,
332 ci_ok INTEGER,
333 measured_at_ms INTEGER NOT NULL,
334 measure_error TEXT
335 )",
336 "CREATE TABLE IF NOT EXISTS session_samples (
337 session_id TEXT NOT NULL,
338 ts_ms INTEGER NOT NULL,
339 pid INTEGER NOT NULL,
340 cpu_percent REAL,
341 rss_bytes INTEGER,
342 PRIMARY KEY (session_id, ts_ms, pid)
343 )",
344 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
345 "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
346 "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
347 "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
348 "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
349 "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
350 "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
351 "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
352 "CREATE INDEX IF NOT EXISTS events_tool_ts_session_seq_idx ON events(tool, ts_ms DESC, session_id, seq)",
353 "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
354 "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
355 "CREATE INDEX IF NOT EXISTS tool_span_paths_path_idx ON tool_span_paths(path, span_id)",
356 "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
357];
358
359#[derive(Clone)]
361pub struct InsightsStats {
362 pub total_sessions: u64,
363 pub running_sessions: u64,
364 pub total_events: u64,
365 pub sessions_by_day: Vec<(String, u64)>,
367 pub recent: Vec<(SessionRecord, u64)>,
369 pub top_tools: Vec<(String, u64)>,
371 pub total_cost_usd_e6: i64,
372 pub sessions_with_cost: u64,
373}
374
375pub struct SyncStatusSnapshot {
377 pub pending_outbox: u64,
378 pub last_success_ms: Option<u64>,
379 pub last_error: Option<String>,
380 pub consecutive_failures: u32,
381}
382
383#[derive(serde::Serialize)]
385pub struct SummaryStats {
386 pub session_count: u64,
387 pub total_cost_usd_e6: i64,
388 pub by_agent: Vec<(String, u64)>,
389 pub by_model: Vec<(String, u64)>,
390 pub top_tools: Vec<(String, u64)>,
391}
392
393#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
395#[serde(rename_all = "lowercase")]
396pub enum GuidanceKind {
397 Skill,
398 Rule,
399}
400
401#[derive(Clone, Debug, serde::Serialize)]
403pub struct GuidancePerfRow {
404 pub kind: GuidanceKind,
405 pub id: String,
406 pub sessions: u64,
407 pub sessions_pct: f64,
408 pub total_cost_usd_e6: i64,
409 pub avg_cost_per_session_usd: Option<f64>,
410 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
411 pub on_disk: bool,
412}
413
414#[derive(Clone, Debug, serde::Serialize)]
416pub struct GuidanceReport {
417 pub workspace: String,
418 pub window_start_ms: u64,
419 pub window_end_ms: u64,
420 pub sessions_in_window: u64,
421 pub workspace_avg_cost_per_session_usd: Option<f64>,
422 pub rows: Vec<GuidancePerfRow>,
423}
424
425#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
427pub struct PruneStats {
428 pub sessions_removed: u64,
429 pub events_removed: u64,
430}
431
432#[derive(Debug, Clone, Eq, PartialEq)]
434pub struct SessionOutcomeRow {
435 pub session_id: String,
436 pub test_passed: Option<i64>,
437 pub test_failed: Option<i64>,
438 pub test_skipped: Option<i64>,
439 pub build_ok: Option<bool>,
440 pub lint_errors: Option<i64>,
441 pub revert_lines_14d: Option<i64>,
442 pub pr_open: Option<i64>,
443 pub ci_ok: Option<bool>,
444 pub measured_at_ms: u64,
445 pub measure_error: Option<String>,
446}
447
448#[derive(Debug, Clone)]
450pub struct SessionSampleAgg {
451 pub session_id: String,
452 pub sample_count: u64,
453 pub max_cpu_percent: f64,
454 pub max_rss_bytes: u64,
455}
456
457pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
459pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
460pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
461
462pub struct ToolSpanSyncRow {
463 pub span_id: String,
464 pub session_id: String,
465 pub tool: Option<String>,
466 pub tool_call_id: Option<String>,
467 pub status: String,
468 pub started_at_ms: Option<u64>,
469 pub ended_at_ms: Option<u64>,
470 pub lead_time_ms: Option<u64>,
471 pub tokens_in: Option<u32>,
472 pub tokens_out: Option<u32>,
473 pub reasoning_tokens: Option<u32>,
474 pub cost_usd_e6: Option<i64>,
475 pub paths: Vec<String>,
476}
477
478pub(crate) struct CaptureQualityRow {
479 pub source: String,
480 pub has_tokens: bool,
481 pub has_latency: bool,
482 pub has_context: bool,
483}
484
485pub(crate) struct TraceSpanQualityRow {
486 pub kind: String,
487 pub is_orphan: bool,
488}
489
490#[derive(Debug, Clone, Copy, Eq, PartialEq)]
491pub enum StoreOpenMode {
492 ReadWrite,
493 ReadOnlyQuery,
494}
495
496#[derive(Debug, Clone)]
497pub struct SessionStatusRow {
498 pub id: String,
499 pub status: SessionStatus,
500 pub ended_at_ms: Option<u64>,
501}
502
503#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
504pub struct SessionFilter {
505 pub agent_prefix: Option<String>,
506 pub status: Option<SessionStatus>,
507 pub since_ms: Option<u64>,
508}
509
510#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
511pub struct SessionPage {
512 pub rows: Vec<SessionRecord>,
513 pub total: usize,
514 pub next_offset: Option<usize>,
515}
516
517#[derive(Clone)]
518struct SpanTreeCacheEntry {
519 session_id: String,
520 last_event_seq: Option<u64>,
521 nodes: Vec<crate::store::span_tree::SpanNode>,
522}
523
524pub struct Store {
525 conn: Connection,
526 root: PathBuf,
527 hot_log: RefCell<Option<HotLog>>,
528 search_writer: RefCell<Option<crate::search::PendingWriter>>,
529 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
530 projector: RefCell<Projector>,
531}
532
533impl Store {
534 pub(crate) fn conn(&self) -> &Connection {
535 &self.conn
536 }
537
538 pub fn open(path: &Path) -> Result<Self> {
539 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
540 }
541
542 pub fn open_read_only(path: &Path) -> Result<Self> {
543 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
544 }
545
546 pub fn open_query(path: &Path) -> Result<Self> {
547 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
548 }
549
550 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
551 if let Some(parent) = path.parent() {
552 std::fs::create_dir_all(parent)?;
553 }
554 let conn = match mode {
555 StoreOpenMode::ReadWrite => Connection::open(path),
556 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
557 path,
558 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
559 ),
560 }
561 .with_context(|| format!("open db: {}", path.display()))?;
562 apply_pragmas(&conn, mode)?;
563 if mode == StoreOpenMode::ReadWrite {
564 for sql in MIGRATIONS {
565 conn.execute_batch(sql)?;
566 }
567 ensure_schema_columns(&conn)?;
568 }
569 let store = Self {
570 conn,
571 root: path
572 .parent()
573 .unwrap_or_else(|| Path::new("."))
574 .to_path_buf(),
575 hot_log: RefCell::new(None),
576 search_writer: RefCell::new(None),
577 span_tree_cache: RefCell::new(None),
578 projector: RefCell::new(Projector::default()),
579 };
580 if mode == StoreOpenMode::ReadWrite {
581 store.warm_projector()?;
582 }
583 Ok(store)
584 }
585
586 fn invalidate_span_tree_cache(&self) {
587 *self.span_tree_cache.borrow_mut() = None;
588 }
589
590 fn warm_projector(&self) -> Result<()> {
591 let ids = self.running_session_ids()?;
592 let mut projector = self.projector.borrow_mut();
593 for id in ids {
594 for event in self.list_events_for_session(&id)? {
595 let _ = projector.apply(&event);
596 }
597 }
598 Ok(())
599 }
600
601 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
602 self.conn.execute(
603 "INSERT INTO sessions (
604 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
605 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
606 prompt_fingerprint, parent_session_id, agent_version, os, arch,
607 repo_file_count, repo_total_loc
608 )
609 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
610 ?16, ?17, ?18, ?19, ?20, ?21)
611 ON CONFLICT(id) DO UPDATE SET
612 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
613 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
614 status=excluded.status, trace_path=excluded.trace_path,
615 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
616 branch=excluded.branch, dirty_start=excluded.dirty_start,
617 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
618 prompt_fingerprint=excluded.prompt_fingerprint,
619 parent_session_id=excluded.parent_session_id,
620 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
621 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
622 params![
623 s.id,
624 s.agent,
625 s.model,
626 s.workspace,
627 s.started_at_ms as i64,
628 s.ended_at_ms.map(|v| v as i64),
629 format!("{:?}", s.status),
630 s.trace_path,
631 s.start_commit,
632 s.end_commit,
633 s.branch,
634 s.dirty_start.map(bool_to_i64),
635 s.dirty_end.map(bool_to_i64),
636 s.repo_binding_source.clone().unwrap_or_default(),
637 s.prompt_fingerprint.as_deref(),
638 s.parent_session_id.as_deref(),
639 s.agent_version.as_deref(),
640 s.os.as_deref(),
641 s.arch.as_deref(),
642 s.repo_file_count.map(|v| v as i64),
643 s.repo_total_loc.map(|v| v as i64),
644 ],
645 )?;
646 self.conn.execute(
647 "INSERT INTO session_repo_binding (
648 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
649 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
650 ON CONFLICT(session_id) DO UPDATE SET
651 start_commit=excluded.start_commit,
652 end_commit=excluded.end_commit,
653 branch=excluded.branch,
654 dirty_start=excluded.dirty_start,
655 dirty_end=excluded.dirty_end,
656 repo_binding_source=excluded.repo_binding_source",
657 params![
658 s.id,
659 s.start_commit,
660 s.end_commit,
661 s.branch,
662 s.dirty_start.map(bool_to_i64),
663 s.dirty_end.map(bool_to_i64),
664 s.repo_binding_source.clone().unwrap_or_default(),
665 ],
666 )?;
667 Ok(())
668 }
669
670 pub fn ensure_session_stub(
673 &self,
674 id: &str,
675 agent: &str,
676 workspace: &str,
677 started_at_ms: u64,
678 ) -> Result<()> {
679 self.conn.execute(
680 "INSERT OR IGNORE INTO sessions (
681 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
682 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
683 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
684 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
685 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
686 params![id, agent, workspace, started_at_ms as i64],
687 )?;
688 Ok(())
689 }
690
691 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
693 let n: i64 = self.conn.query_row(
694 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
695 [session_id],
696 |r| r.get(0),
697 )?;
698 Ok(n as u64)
699 }
700
701 pub fn append_event(&self, e: &Event) -> Result<()> {
702 self.append_event_with_sync(e, None)
703 }
704
705 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
707 let last_before = if projector_legacy_mode() {
708 None
709 } else {
710 self.last_event_seq_for_session(&e.session_id)?
711 };
712 let payload = serde_json::to_string(&e.payload)?;
713 self.conn.execute(
714 "INSERT INTO events (
715 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
716 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
717 stop_reason, latency_ms, ttft_ms, retry_count,
718 context_used_tokens, context_max_tokens,
719 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
720 ) VALUES (
721 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
722 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
723 )
724 ON CONFLICT(session_id, seq) DO UPDATE SET
725 ts_ms = excluded.ts_ms,
726 ts_exact = excluded.ts_exact,
727 kind = excluded.kind,
728 source = excluded.source,
729 tool = excluded.tool,
730 tool_call_id = excluded.tool_call_id,
731 tokens_in = excluded.tokens_in,
732 tokens_out = excluded.tokens_out,
733 reasoning_tokens = excluded.reasoning_tokens,
734 cost_usd_e6 = excluded.cost_usd_e6,
735 payload = excluded.payload,
736 stop_reason = excluded.stop_reason,
737 latency_ms = excluded.latency_ms,
738 ttft_ms = excluded.ttft_ms,
739 retry_count = excluded.retry_count,
740 context_used_tokens = excluded.context_used_tokens,
741 context_max_tokens = excluded.context_max_tokens,
742 cache_creation_tokens = excluded.cache_creation_tokens,
743 cache_read_tokens = excluded.cache_read_tokens,
744 system_prompt_tokens = excluded.system_prompt_tokens",
745 params![
746 e.session_id,
747 e.seq as i64,
748 e.ts_ms as i64,
749 bool_to_i64(e.ts_exact),
750 format!("{:?}", e.kind),
751 format!("{:?}", e.source),
752 e.tool,
753 e.tool_call_id,
754 e.tokens_in.map(|v| v as i64),
755 e.tokens_out.map(|v| v as i64),
756 e.reasoning_tokens.map(|v| v as i64),
757 e.cost_usd_e6,
758 payload,
759 e.stop_reason,
760 e.latency_ms.map(|v| v as i64),
761 e.ttft_ms.map(|v| v as i64),
762 e.retry_count.map(|v| v as i64),
763 e.context_used_tokens.map(|v| v as i64),
764 e.context_max_tokens.map(|v| v as i64),
765 e.cache_creation_tokens.map(|v| v as i64),
766 e.cache_read_tokens.map(|v| v as i64),
767 e.system_prompt_tokens.map(|v| v as i64),
768 ],
769 )?;
770 if self.conn.changes() == 0 {
771 return Ok(());
772 }
773 self.append_hot_event(e)?;
774 if projector_legacy_mode() {
775 index_event_derived(&self.conn, e)?;
776 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
777 self.invalidate_span_tree_cache();
778 } else if last_before.is_some_and(|last| e.seq <= last) {
779 self.replay_projector_session(&e.session_id)?;
780 } else {
781 let deltas = self.projector.borrow_mut().apply(e);
782 self.apply_projector_events(&deltas)?;
783 let expired = self
784 .projector
785 .borrow_mut()
786 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
787 self.apply_projector_events(&expired)?;
788 if is_stop_event(e) {
789 let flushed = self
790 .projector
791 .borrow_mut()
792 .flush_session(&e.session_id, e.ts_ms);
793 self.apply_projector_events(&flushed)?;
794 }
795 self.invalidate_span_tree_cache();
796 }
797 self.append_search_event(e);
798 let Some(ctx) = ctx else {
799 return Ok(());
800 };
801 let sync = &ctx.sync;
802 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
803 return Ok(());
804 }
805 let Some(salt) = try_team_salt(sync) else {
806 tracing::warn!(
807 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
808 );
809 return Ok(());
810 };
811 if sync.sample_rate < 1.0 {
812 let u: f64 = rand::random();
813 if u > sync.sample_rate {
814 return Ok(());
815 }
816 }
817 let Some(session) = self.get_session(&e.session_id)? else {
818 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
819 return Ok(());
820 };
821 let mut outbound = outbound_event_from_row(e, &session, &salt);
822 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
823 let row = serde_json::to_string(&outbound)?;
824 self.outbox()?.append(&e.session_id, "events", &row)?;
825 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
826 Ok(())
827 }
828
829 fn append_hot_event(&self, e: &Event) -> Result<()> {
830 if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
831 return Ok(());
832 }
833 let mut slot = self.hot_log.borrow_mut();
834 if slot.is_none() {
835 *slot = Some(HotLog::open(&self.root)?);
836 }
837 if let Some(log) = slot.as_mut() {
838 log.append(e)?;
839 }
840 Ok(())
841 }
842
843 fn append_search_event(&self, e: &Event) {
844 if let Err(err) = self.try_append_search_event(e) {
845 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
846 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
847 }
848 }
849
850 fn try_append_search_event(&self, e: &Event) -> Result<()> {
851 let Some(session) = self.get_session(&e.session_id)? else {
852 return Ok(());
853 };
854 let workspace = PathBuf::from(&session.workspace);
855 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
856 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
857 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
858 return Ok(());
859 };
860 let mut slot = self.search_writer.borrow_mut();
861 if slot.is_none() {
862 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
863 }
864 slot.as_mut().expect("writer").add(&doc)
865 }
866
867 pub fn flush_search(&self) -> Result<()> {
868 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
869 writer.commit()?;
870 }
871 Ok(())
872 }
873
874 fn outbox(&self) -> Result<Outbox> {
875 Outbox::open(&self.root)
876 }
877
878 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
879 if projector_legacy_mode() {
880 rebuild_tool_spans_for_session(&self.conn, session_id)?;
881 self.invalidate_span_tree_cache();
882 return Ok(());
883 }
884 let deltas = self
885 .projector
886 .borrow_mut()
887 .flush_session(session_id, now_ms);
888 if self.apply_projector_events(&deltas)? {
889 self.invalidate_span_tree_cache();
890 }
891 Ok(())
892 }
893
894 fn replay_projector_session(&self, session_id: &str) -> Result<()> {
895 clear_session_spans(&self.conn, session_id)?;
896 self.projector.borrow_mut().reset_session(session_id);
897 let events = self.list_events_for_session(session_id)?;
898 let mut changed = false;
899 for event in &events {
900 let deltas = self.projector.borrow_mut().apply(event);
901 changed |= self.apply_projector_events(&deltas)?;
902 }
903 if self
904 .get_session(session_id)?
905 .is_some_and(|session| session.status == SessionStatus::Done)
906 {
907 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
908 let deltas = self
909 .projector
910 .borrow_mut()
911 .flush_session(session_id, now_ms);
912 changed |= self.apply_projector_events(&deltas)?;
913 }
914 if changed {
915 self.invalidate_span_tree_cache();
916 }
917 Ok(())
918 }
919
920 fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
921 let mut changed = false;
922 for delta in deltas {
923 match delta {
924 ProjectorEvent::SpanClosed(span, sample) => {
925 upsert_tool_span_record(&self.conn, span)?;
926 tracing::debug!(
927 session_id = %sample.session_id,
928 span_id = %sample.span_id,
929 tool = ?sample.tool,
930 lead_time_ms = ?sample.lead_time_ms,
931 tokens_in = ?sample.tokens_in,
932 tokens_out = ?sample.tokens_out,
933 reasoning_tokens = ?sample.reasoning_tokens,
934 cost_usd_e6 = ?sample.cost_usd_e6,
935 paths = ?sample.paths,
936 "tool span closed"
937 );
938 changed = true;
939 }
940 ProjectorEvent::SpanPatched(span) => {
941 upsert_tool_span_record(&self.conn, span)?;
942 changed = true;
943 }
944 ProjectorEvent::FileTouched { session, path } => {
945 self.conn.execute(
946 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
947 params![session, path],
948 )?;
949 changed = true;
950 }
951 ProjectorEvent::SkillUsed { session, skill } => {
952 self.conn.execute(
953 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
954 params![session, skill],
955 )?;
956 changed = true;
957 }
958 ProjectorEvent::RuleUsed { session, rule } => {
959 self.conn.execute(
960 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
961 params![session, rule],
962 )?;
963 changed = true;
964 }
965 }
966 }
967 Ok(changed)
968 }
969
970 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
971 let rows = self.outbox()?.list_pending(limit)?;
972 if !rows.is_empty() {
973 return Ok(rows);
974 }
975 let mut stmt = self.conn.prepare(
976 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
977 )?;
978 let rows = stmt.query_map(params![limit as i64], |row| {
979 Ok((
980 row.get::<_, i64>(0)?,
981 row.get::<_, String>(1)?,
982 row.get::<_, String>(2)?,
983 ))
984 })?;
985 let mut out = Vec::new();
986 for r in rows {
987 out.push(r?);
988 }
989 Ok(out)
990 }
991
992 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
993 self.outbox()?.delete_ids(ids)?;
994 for id in ids {
995 self.conn
996 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
997 }
998 Ok(())
999 }
1000
1001 pub fn replace_outbox_rows(
1002 &self,
1003 owner_id: &str,
1004 kind: &str,
1005 payloads: &[String],
1006 ) -> Result<()> {
1007 self.outbox()?.replace(owner_id, kind, payloads)?;
1008 self.conn.execute(
1009 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
1010 params![owner_id, kind],
1011 )?;
1012 for payload in payloads {
1013 self.conn.execute(
1014 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
1015 params![owner_id, kind, payload],
1016 )?;
1017 }
1018 Ok(())
1019 }
1020
1021 pub fn outbox_pending_count(&self) -> Result<u64> {
1022 let redb = self.outbox()?.pending_count()?;
1023 if redb > 0 {
1024 return Ok(redb);
1025 }
1026 let c: i64 =
1027 self.conn
1028 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
1029 r.get(0)
1030 })?;
1031 Ok(c as u64)
1032 }
1033
1034 pub fn set_sync_state_ok(&self) -> Result<()> {
1035 let now = now_ms().to_string();
1036 self.conn.execute(
1037 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
1038 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1039 params![now],
1040 )?;
1041 self.conn.execute(
1042 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
1043 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1044 [],
1045 )?;
1046 self.conn
1047 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
1048 Ok(())
1049 }
1050
1051 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
1052 let prev: i64 = self
1053 .conn
1054 .query_row(
1055 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1056 [],
1057 |r| {
1058 let s: String = r.get(0)?;
1059 Ok(s.parse::<i64>().unwrap_or(0))
1060 },
1061 )
1062 .optional()?
1063 .unwrap_or(0);
1064 let next = prev.saturating_add(1);
1065 self.conn.execute(
1066 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
1067 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1068 params![msg],
1069 )?;
1070 self.conn.execute(
1071 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
1072 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1073 params![next.to_string()],
1074 )?;
1075 Ok(())
1076 }
1077
1078 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
1079 let pending_outbox = self.outbox_pending_count()?;
1080 let last_success_ms = self
1081 .conn
1082 .query_row(
1083 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
1084 [],
1085 |r| r.get::<_, String>(0),
1086 )
1087 .optional()?
1088 .and_then(|s| s.parse().ok());
1089 let last_error = self
1090 .conn
1091 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1092 r.get::<_, String>(0)
1093 })
1094 .optional()?;
1095 let consecutive_failures = self
1096 .conn
1097 .query_row(
1098 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1099 [],
1100 |r| r.get::<_, String>(0),
1101 )
1102 .optional()?
1103 .and_then(|s| s.parse().ok())
1104 .unwrap_or(0);
1105 Ok(SyncStatusSnapshot {
1106 pending_outbox,
1107 last_success_ms,
1108 last_error,
1109 consecutive_failures,
1110 })
1111 }
1112
1113 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1114 let row: Option<String> = self
1115 .conn
1116 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1117 r.get::<_, String>(0)
1118 })
1119 .optional()?;
1120 Ok(row.and_then(|s| s.parse().ok()))
1121 }
1122
1123 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1124 self.conn.execute(
1125 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1126 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1127 params![key, v.to_string()],
1128 )?;
1129 Ok(())
1130 }
1131
1132 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1134 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1135 let old_ids = old_session_ids(&tx, cutoff_ms)?;
1136 let sessions_to_remove: i64 = tx.query_row(
1137 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1138 params![cutoff_ms],
1139 |r| r.get(0),
1140 )?;
1141 let events_to_remove: i64 = tx.query_row(
1142 "SELECT COUNT(*) FROM events WHERE session_id IN \
1143 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1144 params![cutoff_ms],
1145 |r| r.get(0),
1146 )?;
1147
1148 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1149 tx.execute(
1150 &format!(
1151 "DELETE FROM tool_span_paths WHERE span_id IN \
1152 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1153 ),
1154 params![cutoff_ms],
1155 )?;
1156 tx.execute(
1157 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1158 params![cutoff_ms],
1159 )?;
1160 tx.execute(
1161 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1162 params![cutoff_ms],
1163 )?;
1164 tx.execute(
1165 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1166 params![cutoff_ms],
1167 )?;
1168 tx.execute(
1169 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1170 params![cutoff_ms],
1171 )?;
1172 tx.execute(
1173 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1174 params![cutoff_ms],
1175 )?;
1176 tx.execute(
1177 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1178 params![cutoff_ms],
1179 )?;
1180 tx.execute(
1181 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1182 params![cutoff_ms],
1183 )?;
1184 tx.execute(
1185 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1186 params![cutoff_ms],
1187 )?;
1188 tx.execute(
1189 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1190 params![cutoff_ms],
1191 )?;
1192 tx.execute(
1193 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1194 params![cutoff_ms],
1195 )?;
1196 tx.execute(
1197 "DELETE FROM sessions WHERE started_at_ms < ?1",
1198 params![cutoff_ms],
1199 )?;
1200 tx.commit()?;
1201 if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1202 let _ = writer.commit();
1203 }
1204 if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1205 tracing::warn!("search prune skipped: {err:#}");
1206 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1207 }
1208 self.invalidate_span_tree_cache();
1209 Ok(PruneStats {
1210 sessions_removed: sessions_to_remove as u64,
1211 events_removed: events_to_remove as u64,
1212 })
1213 }
1214
1215 pub fn vacuum(&self) -> Result<()> {
1217 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1218 Ok(())
1219 }
1220
1221 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1222 Ok(self
1223 .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1224 .rows)
1225 }
1226
1227 pub fn list_sessions_page(
1228 &self,
1229 workspace: &str,
1230 offset: usize,
1231 limit: usize,
1232 filter: SessionFilter,
1233 ) -> Result<SessionPage> {
1234 let (where_sql, args) = session_filter_sql(workspace, &filter);
1235 let total = self.query_session_page_count(&where_sql, &args)?;
1236 let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1237 let next = offset.saturating_add(rows.len());
1238 Ok(SessionPage {
1239 rows,
1240 total,
1241 next_offset: (next < total).then_some(next),
1242 })
1243 }
1244
1245 fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1246 let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1247 let total: i64 = self
1248 .conn
1249 .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1250 Ok(total as usize)
1251 }
1252
1253 fn query_session_page_rows(
1254 &self,
1255 where_sql: &str,
1256 args: &[Value],
1257 offset: usize,
1258 limit: usize,
1259 ) -> Result<Vec<SessionRecord>> {
1260 let sql = format!(
1261 "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1262 );
1263 let mut values = args.to_vec();
1264 values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1265 values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1266 let mut stmt = self.conn.prepare(&sql)?;
1267 let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1268 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1269 }
1270
1271 pub fn list_sessions_started_after(
1272 &self,
1273 workspace: &str,
1274 after_started_at_ms: u64,
1275 ) -> Result<Vec<SessionRecord>> {
1276 let mut stmt = self.conn.prepare(
1277 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1278 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1279 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1280 repo_file_count, repo_total_loc
1281 FROM sessions
1282 WHERE workspace = ?1 AND started_at_ms > ?2
1283 ORDER BY started_at_ms DESC, id ASC",
1284 )?;
1285 let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1286 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1287 }
1288
1289 pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1290 if ids.is_empty() {
1291 return Ok(Vec::new());
1292 }
1293 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1294 let sql =
1295 format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1296 let mut stmt = self.conn.prepare(&sql)?;
1297 let params: Vec<&dyn rusqlite::ToSql> =
1298 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1299 let rows = stmt.query_map(params.as_slice(), |r| {
1300 let status: String = r.get(1)?;
1301 Ok(SessionStatusRow {
1302 id: r.get(0)?,
1303 status: status_from_str(&status),
1304 ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1305 })
1306 })?;
1307 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1308 }
1309
1310 fn running_session_ids(&self) -> Result<Vec<String>> {
1311 let mut stmt = self
1312 .conn
1313 .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1314 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1315 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1316 }
1317
1318 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1319 let session_count: i64 = self.conn.query_row(
1320 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1321 params![workspace],
1322 |r| r.get(0),
1323 )?;
1324
1325 let total_cost: i64 = self.conn.query_row(
1326 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1327 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1328 params![workspace],
1329 |r| r.get(0),
1330 )?;
1331
1332 let mut stmt = self.conn.prepare(
1333 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1334 )?;
1335 let by_agent: Vec<(String, u64)> = stmt
1336 .query_map(params![workspace], |r| {
1337 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1338 })?
1339 .filter_map(|r| r.ok())
1340 .collect();
1341
1342 let mut stmt = self.conn.prepare(
1343 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1344 )?;
1345 let by_model: Vec<(String, u64)> = stmt
1346 .query_map(params![workspace], |r| {
1347 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1348 })?
1349 .filter_map(|r| r.ok())
1350 .collect();
1351
1352 let mut stmt = self.conn.prepare(
1353 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1354 WHERE s.workspace = ?1 AND tool IS NOT NULL
1355 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1356 )?;
1357 let top_tools: Vec<(String, u64)> = stmt
1358 .query_map(params![workspace], |r| {
1359 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1360 })?
1361 .filter_map(|r| r.ok())
1362 .collect();
1363
1364 Ok(SummaryStats {
1365 session_count: session_count as u64,
1366 total_cost_usd_e6: total_cost,
1367 by_agent,
1368 by_model,
1369 top_tools,
1370 })
1371 }
1372
1373 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1374 self.list_events_page(session_id, 0, i64::MAX as usize)
1375 }
1376
1377 pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1378 let mut stmt = self.conn.prepare(
1379 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1380 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1381 stop_reason, latency_ms, ttft_ms, retry_count,
1382 context_used_tokens, context_max_tokens,
1383 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1384 FROM events WHERE session_id = ?1 AND seq = ?2",
1385 )?;
1386 stmt.query_row(params![session_id, seq as i64], event_row)
1387 .optional()
1388 .map_err(Into::into)
1389 }
1390
1391 pub fn search_tool_events(
1392 &self,
1393 workspace: &str,
1394 tool: &str,
1395 since_ms: Option<u64>,
1396 agent: Option<&str>,
1397 limit: usize,
1398 ) -> Result<Vec<(String, Event)>> {
1399 let mut stmt = self.conn.prepare(
1400 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1401 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1402 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1403 e.context_used_tokens, e.context_max_tokens,
1404 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
1405 s.agent
1406 FROM events e JOIN sessions s ON s.id = e.session_id
1407 WHERE e.tool = ?2
1408 AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
1409 AND (?3 IS NULL OR e.ts_ms >= ?3)
1410 AND (?4 IS NULL OR s.agent = ?4)
1411 ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
1412 LIMIT ?5",
1413 )?;
1414 let since = since_ms.map(|v| v as i64);
1415 let rows = stmt.query_map(
1416 params![workspace, tool, since, agent, limit as i64],
1417 search_tool_event_row,
1418 )?;
1419 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1420 }
1421
1422 pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1423 let mut out = Vec::new();
1424 for session in self.list_sessions(workspace)? {
1425 for event in self.list_events_for_session(&session.id)? {
1426 out.push((session.clone(), event));
1427 }
1428 }
1429 out.sort_by(|a, b| {
1430 (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1431 });
1432 Ok(out)
1433 }
1434
1435 pub fn list_events_page(
1436 &self,
1437 session_id: &str,
1438 after_seq: u64,
1439 limit: usize,
1440 ) -> Result<Vec<Event>> {
1441 let mut stmt = self.conn.prepare(
1442 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1443 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1444 stop_reason, latency_ms, ttft_ms, retry_count,
1445 context_used_tokens, context_max_tokens,
1446 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1447 FROM events
1448 WHERE session_id = ?1 AND seq >= ?2
1449 ORDER BY seq ASC LIMIT ?3",
1450 )?;
1451 let rows = stmt.query_map(
1452 params![
1453 session_id,
1454 after_seq as i64,
1455 limit.min(i64::MAX as usize) as i64
1456 ],
1457 event_row,
1458 )?;
1459 let mut events = Vec::new();
1460 for row in rows {
1461 events.push(row?);
1462 }
1463 Ok(events)
1464 }
1465
1466 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1468 self.conn.execute(
1469 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1470 params![format!("{:?}", status), id],
1471 )?;
1472 Ok(())
1473 }
1474
1475 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1477 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1478 Ok(InsightsStats {
1479 total_sessions: count_q(
1480 &self.conn,
1481 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1482 workspace,
1483 )?,
1484 running_sessions: count_q(
1485 &self.conn,
1486 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1487 workspace,
1488 )?,
1489 total_events: count_q(
1490 &self.conn,
1491 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1492 workspace,
1493 )?,
1494 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1495 recent: recent_sessions_3(&self.conn, workspace)?,
1496 top_tools: top_tools_5(&self.conn, workspace)?,
1497 total_cost_usd_e6,
1498 sessions_with_cost,
1499 })
1500 }
1501
1502 pub fn retro_events_in_window(
1504 &self,
1505 workspace: &str,
1506 start_ms: u64,
1507 end_ms: u64,
1508 ) -> Result<Vec<(SessionRecord, Event)>> {
1509 let mut stmt = self.conn.prepare(
1510 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1511 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1512 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1513 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1514 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1515 s.repo_file_count, s.repo_total_loc,
1516 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1517 e.context_used_tokens, e.context_max_tokens,
1518 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1519 FROM events e
1520 JOIN sessions s ON s.id = e.session_id
1521 WHERE s.workspace = ?1
1522 AND (
1523 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1524 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1525 )
1526 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1527 )?;
1528 let rows = stmt.query_map(
1529 params![
1530 workspace,
1531 start_ms as i64,
1532 end_ms as i64,
1533 SYNTHETIC_TS_CEILING_MS,
1534 ],
1535 |row| {
1536 let payload_str: String = row.get(12)?;
1537 let status_str: String = row.get(19)?;
1538 Ok((
1539 SessionRecord {
1540 id: row.get(13)?,
1541 agent: row.get(14)?,
1542 model: row.get(15)?,
1543 workspace: row.get(16)?,
1544 started_at_ms: row.get::<_, i64>(17)? as u64,
1545 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1546 status: status_from_str(&status_str),
1547 trace_path: row.get(20)?,
1548 start_commit: row.get(21)?,
1549 end_commit: row.get(22)?,
1550 branch: row.get(23)?,
1551 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1552 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1553 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1554 prompt_fingerprint: row.get(27)?,
1555 parent_session_id: row.get(28)?,
1556 agent_version: row.get(29)?,
1557 os: row.get(30)?,
1558 arch: row.get(31)?,
1559 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1560 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1561 },
1562 Event {
1563 session_id: row.get(0)?,
1564 seq: row.get::<_, i64>(1)? as u64,
1565 ts_ms: row.get::<_, i64>(2)? as u64,
1566 ts_exact: row.get::<_, i64>(3)? != 0,
1567 kind: kind_from_str(&row.get::<_, String>(4)?),
1568 source: source_from_str(&row.get::<_, String>(5)?),
1569 tool: row.get(6)?,
1570 tool_call_id: row.get(7)?,
1571 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1572 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1573 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1574 cost_usd_e6: row.get(11)?,
1575 payload: serde_json::from_str(&payload_str)
1576 .unwrap_or(serde_json::Value::Null),
1577 stop_reason: row.get(34)?,
1578 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1579 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1580 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1581 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1582 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1583 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1584 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1585 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1586 },
1587 ))
1588 },
1589 )?;
1590
1591 let mut out = Vec::new();
1592 for r in rows {
1593 out.push(r?);
1594 }
1595 Ok(out)
1596 }
1597
1598 pub fn experiment_metric_values_in_window(
1599 &self,
1600 workspace: &str,
1601 start_ms: u64,
1602 end_ms: u64,
1603 metric: crate::experiment::types::Metric,
1604 ) -> Result<Vec<(SessionRecord, f64)>> {
1605 use crate::experiment::types::Metric;
1606 let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1607 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1608 s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1609 s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1610 let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1611 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1612 let sql = match metric {
1613 Metric::TokensPerSession => format!(
1614 "SELECT {session_cols},
1615 SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1616 FROM sessions s JOIN events e ON e.session_id = s.id
1617 WHERE {window}
1618 GROUP BY s.id"
1619 ),
1620 Metric::CostPerSession => format!(
1621 "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1622 FROM sessions s JOIN events e ON e.session_id = s.id
1623 WHERE {window}
1624 GROUP BY s.id"
1625 ),
1626 Metric::SuccessRate => format!(
1627 "SELECT {session_cols},
1628 CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1629 FROM sessions s JOIN events e ON e.session_id = s.id
1630 WHERE {window}
1631 GROUP BY s.id"
1632 ),
1633 Metric::DurationMinutes => format!(
1634 "SELECT {session_cols},
1635 (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1636 FROM sessions s
1637 WHERE s.workspace = ?1
1638 AND s.ended_at_ms IS NOT NULL
1639 AND EXISTS (
1640 SELECT 1 FROM events e
1641 WHERE e.session_id = s.id
1642 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1643 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1644 )"
1645 ),
1646 Metric::FilesPerSession => format!(
1647 "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1648 FROM sessions s
1649 JOIN events e ON e.session_id = s.id
1650 LEFT JOIN files_touched ft ON ft.session_id = s.id
1651 WHERE {window}
1652 GROUP BY s.id"
1653 ),
1654 Metric::SuccessRateByPrompt => format!(
1655 "SELECT {session_cols},
1656 1.0 - (MIN(
1657 SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1658 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1659 ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1660 FROM sessions s JOIN events e ON e.session_id = s.id
1661 WHERE {window}
1662 GROUP BY s.id
1663 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1664 ),
1665 Metric::CostByPrompt => format!(
1666 "SELECT {session_cols},
1667 SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1668 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1669 FROM sessions s JOIN events e ON e.session_id = s.id
1670 WHERE {window}
1671 GROUP BY s.id
1672 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1673 ),
1674 Metric::ToolLoops => format!(
1675 "WITH calls AS (
1676 SELECT e.session_id, e.tool,
1677 LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1678 FROM events e JOIN sessions s ON s.id = e.session_id
1679 WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1680 )
1681 SELECT {session_cols},
1682 SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1683 FROM sessions s JOIN calls ON calls.session_id = s.id
1684 GROUP BY s.id"
1685 ),
1686 };
1687 let mut stmt = self.conn.prepare(&sql)?;
1688 let rows = stmt.query_map(
1689 params![
1690 workspace,
1691 start_ms as i64,
1692 end_ms as i64,
1693 SYNTHETIC_TS_CEILING_MS,
1694 ],
1695 |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1696 )?;
1697 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1698 }
1699
1700 pub fn files_touched_in_window(
1702 &self,
1703 workspace: &str,
1704 start_ms: u64,
1705 end_ms: u64,
1706 ) -> Result<Vec<(String, String)>> {
1707 let mut stmt = self.conn.prepare(
1708 "SELECT DISTINCT ft.session_id, ft.path
1709 FROM files_touched ft
1710 JOIN sessions s ON s.id = ft.session_id
1711 WHERE s.workspace = ?1
1712 AND EXISTS (
1713 SELECT 1 FROM events e
1714 JOIN sessions ss ON ss.id = e.session_id
1715 WHERE e.session_id = ft.session_id
1716 AND (
1717 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1718 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1719 )
1720 )
1721 ORDER BY ft.session_id, ft.path",
1722 )?;
1723 let out: Vec<(String, String)> = stmt
1724 .query_map(
1725 params![
1726 workspace,
1727 start_ms as i64,
1728 end_ms as i64,
1729 SYNTHETIC_TS_CEILING_MS,
1730 ],
1731 |r| Ok((r.get(0)?, r.get(1)?)),
1732 )?
1733 .filter_map(|r| r.ok())
1734 .collect();
1735 Ok(out)
1736 }
1737
1738 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1741 let mut stmt = self.conn.prepare(
1742 "SELECT DISTINCT su.skill
1743 FROM skills_used su
1744 JOIN sessions s ON s.id = su.session_id
1745 WHERE s.workspace = ?1
1746 AND EXISTS (
1747 SELECT 1 FROM events e
1748 JOIN sessions ss ON ss.id = e.session_id
1749 WHERE e.session_id = su.session_id
1750 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1751 )
1752 ORDER BY su.skill",
1753 )?;
1754 let out: Vec<String> = stmt
1755 .query_map(
1756 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1757 |r| r.get::<_, String>(0),
1758 )?
1759 .filter_map(|r| r.ok())
1760 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1761 .collect();
1762 Ok(out)
1763 }
1764
1765 pub fn skills_used_in_window(
1767 &self,
1768 workspace: &str,
1769 start_ms: u64,
1770 end_ms: u64,
1771 ) -> Result<Vec<(String, String)>> {
1772 let mut stmt = self.conn.prepare(
1773 "SELECT DISTINCT su.session_id, su.skill
1774 FROM skills_used su
1775 JOIN sessions s ON s.id = su.session_id
1776 WHERE s.workspace = ?1
1777 AND EXISTS (
1778 SELECT 1 FROM events e
1779 JOIN sessions ss ON ss.id = e.session_id
1780 WHERE e.session_id = su.session_id
1781 AND (
1782 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1783 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1784 )
1785 )
1786 ORDER BY su.session_id, su.skill",
1787 )?;
1788 let out: Vec<(String, String)> = stmt
1789 .query_map(
1790 params![
1791 workspace,
1792 start_ms as i64,
1793 end_ms as i64,
1794 SYNTHETIC_TS_CEILING_MS,
1795 ],
1796 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1797 )?
1798 .filter_map(|r| r.ok())
1799 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1800 .collect();
1801 Ok(out)
1802 }
1803
1804 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1806 let mut stmt = self.conn.prepare(
1807 "SELECT DISTINCT ru.rule
1808 FROM rules_used ru
1809 JOIN sessions s ON s.id = ru.session_id
1810 WHERE s.workspace = ?1
1811 AND EXISTS (
1812 SELECT 1 FROM events e
1813 JOIN sessions ss ON ss.id = e.session_id
1814 WHERE e.session_id = ru.session_id
1815 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1816 )
1817 ORDER BY ru.rule",
1818 )?;
1819 let out: Vec<String> = stmt
1820 .query_map(
1821 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1822 |r| r.get::<_, String>(0),
1823 )?
1824 .filter_map(|r| r.ok())
1825 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1826 .collect();
1827 Ok(out)
1828 }
1829
1830 pub fn rules_used_in_window(
1832 &self,
1833 workspace: &str,
1834 start_ms: u64,
1835 end_ms: u64,
1836 ) -> Result<Vec<(String, String)>> {
1837 let mut stmt = self.conn.prepare(
1838 "SELECT DISTINCT ru.session_id, ru.rule
1839 FROM rules_used ru
1840 JOIN sessions s ON s.id = ru.session_id
1841 WHERE s.workspace = ?1
1842 AND EXISTS (
1843 SELECT 1 FROM events e
1844 JOIN sessions ss ON ss.id = e.session_id
1845 WHERE e.session_id = ru.session_id
1846 AND (
1847 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1848 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1849 )
1850 )
1851 ORDER BY ru.session_id, ru.rule",
1852 )?;
1853 let out: Vec<(String, String)> = stmt
1854 .query_map(
1855 params![
1856 workspace,
1857 start_ms as i64,
1858 end_ms as i64,
1859 SYNTHETIC_TS_CEILING_MS,
1860 ],
1861 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1862 )?
1863 .filter_map(|r| r.ok())
1864 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1865 .collect();
1866 Ok(out)
1867 }
1868
1869 pub fn sessions_active_in_window(
1871 &self,
1872 workspace: &str,
1873 start_ms: u64,
1874 end_ms: u64,
1875 ) -> Result<HashSet<String>> {
1876 let mut stmt = self.conn.prepare(
1877 "SELECT DISTINCT s.id
1878 FROM sessions s
1879 WHERE s.workspace = ?1
1880 AND EXISTS (
1881 SELECT 1 FROM events e
1882 WHERE e.session_id = s.id
1883 AND (
1884 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1885 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1886 )
1887 )",
1888 )?;
1889 let out: HashSet<String> = stmt
1890 .query_map(
1891 params![
1892 workspace,
1893 start_ms as i64,
1894 end_ms as i64,
1895 SYNTHETIC_TS_CEILING_MS,
1896 ],
1897 |r| r.get(0),
1898 )?
1899 .filter_map(|r| r.ok())
1900 .collect();
1901 Ok(out)
1902 }
1903
1904 pub fn session_costs_usd_e6_in_window(
1906 &self,
1907 workspace: &str,
1908 start_ms: u64,
1909 end_ms: u64,
1910 ) -> Result<HashMap<String, i64>> {
1911 let mut stmt = self.conn.prepare(
1912 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1913 FROM events e
1914 JOIN sessions s ON s.id = e.session_id
1915 WHERE s.workspace = ?1
1916 AND (
1917 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1918 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1919 )
1920 GROUP BY e.session_id",
1921 )?;
1922 let rows: Vec<(String, i64)> = stmt
1923 .query_map(
1924 params![
1925 workspace,
1926 start_ms as i64,
1927 end_ms as i64,
1928 SYNTHETIC_TS_CEILING_MS,
1929 ],
1930 |r| Ok((r.get(0)?, r.get(1)?)),
1931 )?
1932 .filter_map(|r| r.ok())
1933 .collect();
1934 Ok(rows.into_iter().collect())
1935 }
1936
1937 pub fn guidance_report(
1939 &self,
1940 workspace: &str,
1941 window_start_ms: u64,
1942 window_end_ms: u64,
1943 skill_slugs_on_disk: &HashSet<String>,
1944 rule_slugs_on_disk: &HashSet<String>,
1945 ) -> Result<GuidanceReport> {
1946 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1947 let denom = active.len() as u64;
1948 let costs =
1949 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1950
1951 let workspace_avg_cost_per_session_usd = if denom > 0 {
1952 let total_e6: i64 = active
1953 .iter()
1954 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1955 .sum();
1956 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1957 } else {
1958 None
1959 };
1960
1961 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1962 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1963 skill_sessions.entry(skill).or_default().insert(sid);
1964 }
1965 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1966 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1967 rule_sessions.entry(rule).or_default().insert(sid);
1968 }
1969
1970 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1971
1972 let mut push_row =
1973 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1974 let sessions = sids.len() as u64;
1975 let sessions_pct = if denom > 0 {
1976 sessions as f64 * 100.0 / denom as f64
1977 } else {
1978 0.0
1979 };
1980 let total_cost_usd_e6: i64 = sids
1981 .iter()
1982 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1983 .sum();
1984 let avg_cost_per_session_usd = if sessions > 0 {
1985 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1986 } else {
1987 None
1988 };
1989 let vs_workspace_avg_cost_per_session_usd =
1990 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1991 (Some(avg), Some(w)) => Some(avg - w),
1992 _ => None,
1993 };
1994 rows.push(GuidancePerfRow {
1995 kind,
1996 id,
1997 sessions,
1998 sessions_pct,
1999 total_cost_usd_e6,
2000 avg_cost_per_session_usd,
2001 vs_workspace_avg_cost_per_session_usd,
2002 on_disk,
2003 });
2004 };
2005
2006 let mut seen_skills: HashSet<String> = HashSet::new();
2007 for (id, sids) in &skill_sessions {
2008 seen_skills.insert(id.clone());
2009 push_row(
2010 GuidanceKind::Skill,
2011 id.clone(),
2012 sids,
2013 skill_slugs_on_disk.contains(id),
2014 );
2015 }
2016 for slug in skill_slugs_on_disk {
2017 if seen_skills.contains(slug) {
2018 continue;
2019 }
2020 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
2021 }
2022
2023 let mut seen_rules: HashSet<String> = HashSet::new();
2024 for (id, sids) in &rule_sessions {
2025 seen_rules.insert(id.clone());
2026 push_row(
2027 GuidanceKind::Rule,
2028 id.clone(),
2029 sids,
2030 rule_slugs_on_disk.contains(id),
2031 );
2032 }
2033 for slug in rule_slugs_on_disk {
2034 if seen_rules.contains(slug) {
2035 continue;
2036 }
2037 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
2038 }
2039
2040 rows.sort_by(|a, b| {
2041 b.sessions
2042 .cmp(&a.sessions)
2043 .then_with(|| a.kind.cmp(&b.kind))
2044 .then_with(|| a.id.cmp(&b.id))
2045 });
2046
2047 Ok(GuidanceReport {
2048 workspace: workspace.to_string(),
2049 window_start_ms,
2050 window_end_ms,
2051 sessions_in_window: denom,
2052 workspace_avg_cost_per_session_usd,
2053 rows,
2054 })
2055 }
2056
2057 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
2058 let mut stmt = self.conn.prepare(
2059 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
2060 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
2061 prompt_fingerprint, parent_session_id, agent_version, os, arch,
2062 repo_file_count, repo_total_loc
2063 FROM sessions WHERE id = ?1",
2064 )?;
2065 let mut rows = stmt.query_map(params![id], |row| {
2066 Ok((
2067 row.get::<_, String>(0)?,
2068 row.get::<_, String>(1)?,
2069 row.get::<_, Option<String>>(2)?,
2070 row.get::<_, String>(3)?,
2071 row.get::<_, i64>(4)?,
2072 row.get::<_, Option<i64>>(5)?,
2073 row.get::<_, String>(6)?,
2074 row.get::<_, String>(7)?,
2075 row.get::<_, Option<String>>(8)?,
2076 row.get::<_, Option<String>>(9)?,
2077 row.get::<_, Option<String>>(10)?,
2078 row.get::<_, Option<i64>>(11)?,
2079 row.get::<_, Option<i64>>(12)?,
2080 row.get::<_, String>(13)?,
2081 row.get::<_, Option<String>>(14)?,
2082 row.get::<_, Option<String>>(15)?,
2083 row.get::<_, Option<String>>(16)?,
2084 row.get::<_, Option<String>>(17)?,
2085 row.get::<_, Option<String>>(18)?,
2086 row.get::<_, Option<i64>>(19)?,
2087 row.get::<_, Option<i64>>(20)?,
2088 ))
2089 })?;
2090
2091 if let Some(row) = rows.next() {
2092 let (
2093 id,
2094 agent,
2095 model,
2096 workspace,
2097 started,
2098 ended,
2099 status_str,
2100 trace,
2101 start_commit,
2102 end_commit,
2103 branch,
2104 dirty_start,
2105 dirty_end,
2106 source,
2107 prompt_fingerprint,
2108 parent_session_id,
2109 agent_version,
2110 os,
2111 arch,
2112 repo_file_count,
2113 repo_total_loc,
2114 ) = row?;
2115 Ok(Some(SessionRecord {
2116 id,
2117 agent,
2118 model,
2119 workspace,
2120 started_at_ms: started as u64,
2121 ended_at_ms: ended.map(|v| v as u64),
2122 status: status_from_str(&status_str),
2123 trace_path: trace,
2124 start_commit,
2125 end_commit,
2126 branch,
2127 dirty_start: dirty_start.map(i64_to_bool),
2128 dirty_end: dirty_end.map(i64_to_bool),
2129 repo_binding_source: empty_to_none(source),
2130 prompt_fingerprint,
2131 parent_session_id,
2132 agent_version,
2133 os,
2134 arch,
2135 repo_file_count: repo_file_count.map(|v| v as u32),
2136 repo_total_loc: repo_total_loc.map(|v| v as u64),
2137 }))
2138 } else {
2139 Ok(None)
2140 }
2141 }
2142
2143 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2144 let mut stmt = self.conn.prepare(
2145 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2146 indexed_at_ms, dirty, graph_path
2147 FROM repo_snapshots WHERE workspace = ?1
2148 ORDER BY indexed_at_ms DESC LIMIT 1",
2149 )?;
2150 let mut rows = stmt.query_map(params![workspace], |row| {
2151 Ok(RepoSnapshotRecord {
2152 id: row.get(0)?,
2153 workspace: row.get(1)?,
2154 head_commit: row.get(2)?,
2155 dirty_fingerprint: row.get(3)?,
2156 analyzer_version: row.get(4)?,
2157 indexed_at_ms: row.get::<_, i64>(5)? as u64,
2158 dirty: row.get::<_, i64>(6)? != 0,
2159 graph_path: row.get(7)?,
2160 })
2161 })?;
2162 Ok(rows.next().transpose()?)
2163 }
2164
2165 pub fn save_repo_snapshot(
2166 &self,
2167 snapshot: &RepoSnapshotRecord,
2168 facts: &[FileFact],
2169 edges: &[RepoEdge],
2170 ) -> Result<()> {
2171 self.conn.execute(
2172 "INSERT INTO repo_snapshots (
2173 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2174 indexed_at_ms, dirty, graph_path
2175 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2176 ON CONFLICT(id) DO UPDATE SET
2177 workspace=excluded.workspace,
2178 head_commit=excluded.head_commit,
2179 dirty_fingerprint=excluded.dirty_fingerprint,
2180 analyzer_version=excluded.analyzer_version,
2181 indexed_at_ms=excluded.indexed_at_ms,
2182 dirty=excluded.dirty,
2183 graph_path=excluded.graph_path",
2184 params![
2185 snapshot.id,
2186 snapshot.workspace,
2187 snapshot.head_commit,
2188 snapshot.dirty_fingerprint,
2189 snapshot.analyzer_version,
2190 snapshot.indexed_at_ms as i64,
2191 bool_to_i64(snapshot.dirty),
2192 snapshot.graph_path,
2193 ],
2194 )?;
2195 self.conn.execute(
2196 "DELETE FROM file_facts WHERE snapshot_id = ?1",
2197 params![snapshot.id],
2198 )?;
2199 self.conn.execute(
2200 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2201 params![snapshot.id],
2202 )?;
2203 for fact in facts {
2204 self.conn.execute(
2205 "INSERT INTO file_facts (
2206 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2207 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2208 churn_30d, churn_90d, authors_90d, last_changed_ms
2209 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2210 params![
2211 fact.snapshot_id,
2212 fact.path,
2213 fact.language,
2214 fact.bytes as i64,
2215 fact.loc as i64,
2216 fact.sloc as i64,
2217 fact.complexity_total as i64,
2218 fact.max_fn_complexity as i64,
2219 fact.symbol_count as i64,
2220 fact.import_count as i64,
2221 fact.fan_in as i64,
2222 fact.fan_out as i64,
2223 fact.churn_30d as i64,
2224 fact.churn_90d as i64,
2225 fact.authors_90d as i64,
2226 fact.last_changed_ms.map(|v| v as i64),
2227 ],
2228 )?;
2229 }
2230 for edge in edges {
2231 self.conn.execute(
2232 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2233 VALUES (?1, ?2, ?3, ?4, ?5)
2234 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2235 DO UPDATE SET weight = weight + excluded.weight",
2236 params![
2237 snapshot.id,
2238 edge.from_path,
2239 edge.to_path,
2240 edge.kind,
2241 edge.weight as i64,
2242 ],
2243 )?;
2244 }
2245 Ok(())
2246 }
2247
2248 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2249 let mut stmt = self.conn.prepare(
2250 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2251 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2252 churn_30d, churn_90d, authors_90d, last_changed_ms
2253 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2254 )?;
2255 let rows = stmt.query_map(params![snapshot_id], |row| {
2256 Ok(FileFact {
2257 snapshot_id: row.get(0)?,
2258 path: row.get(1)?,
2259 language: row.get(2)?,
2260 bytes: row.get::<_, i64>(3)? as u64,
2261 loc: row.get::<_, i64>(4)? as u32,
2262 sloc: row.get::<_, i64>(5)? as u32,
2263 complexity_total: row.get::<_, i64>(6)? as u32,
2264 max_fn_complexity: row.get::<_, i64>(7)? as u32,
2265 symbol_count: row.get::<_, i64>(8)? as u32,
2266 import_count: row.get::<_, i64>(9)? as u32,
2267 fan_in: row.get::<_, i64>(10)? as u32,
2268 fan_out: row.get::<_, i64>(11)? as u32,
2269 churn_30d: row.get::<_, i64>(12)? as u32,
2270 churn_90d: row.get::<_, i64>(13)? as u32,
2271 authors_90d: row.get::<_, i64>(14)? as u32,
2272 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2273 })
2274 })?;
2275 Ok(rows.filter_map(|row| row.ok()).collect())
2276 }
2277
2278 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2279 let mut stmt = self.conn.prepare(
2280 "SELECT from_id, to_id, kind, weight
2281 FROM repo_edges WHERE snapshot_id = ?1
2282 ORDER BY kind, from_id, to_id",
2283 )?;
2284 let rows = stmt.query_map(params![snapshot_id], |row| {
2285 Ok(RepoEdge {
2286 from_path: row.get(0)?,
2287 to_path: row.get(1)?,
2288 kind: row.get(2)?,
2289 weight: row.get::<_, i64>(3)? as u32,
2290 })
2291 })?;
2292 Ok(rows.filter_map(|row| row.ok()).collect())
2293 }
2294
2295 pub fn hottest_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2296 self.ranked_files_for_snapshot(snapshot_id, "churn_30d * complexity_total")
2297 }
2298
2299 pub fn most_changed_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2300 self.ranked_files_for_snapshot(snapshot_id, "churn_30d")
2301 }
2302
2303 pub fn most_complex_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2304 self.ranked_files_for_snapshot(snapshot_id, "complexity_total")
2305 }
2306
2307 pub fn highest_risk_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2308 self.ranked_files_for_snapshot(snapshot_id, "churn_30d * authors_90d * complexity_total")
2309 }
2310
2311 fn ranked_files_for_snapshot(
2312 &self,
2313 snapshot_id: &str,
2314 value_sql: &str,
2315 ) -> Result<Vec<RankedFile>> {
2316 let sql = format!(
2317 "SELECT path, {value_sql}, complexity_total, churn_30d
2318 FROM file_facts WHERE snapshot_id = ?1
2319 ORDER BY {value_sql} DESC, path ASC LIMIT 10"
2320 );
2321 let mut stmt = self.conn.prepare(&sql)?;
2322 let rows = stmt.query_map(params![snapshot_id], ranked_file_row)?;
2323 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2324 }
2325
2326 pub fn pain_hotspots_for_snapshot(
2327 &self,
2328 snapshot_id: &str,
2329 workspace: &str,
2330 start_ms: u64,
2331 end_ms: u64,
2332 ) -> Result<Vec<RankedFile>> {
2333 let mut stmt = self.conn.prepare(PAIN_HOTSPOTS_SQL)?;
2334 let rows = stmt.query_map(
2335 params![snapshot_id, workspace, start_ms as i64, end_ms as i64],
2336 ranked_file_row,
2337 )?;
2338 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2339 }
2340
2341 pub fn tool_rank_rows_in_window(
2342 &self,
2343 workspace: &str,
2344 start_ms: u64,
2345 end_ms: u64,
2346 ) -> Result<Vec<RankedTool>> {
2347 let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
2348 let rows = stmt.query_map(
2349 params![workspace, start_ms as i64, end_ms as i64],
2350 ranked_tool_row,
2351 )?;
2352 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2353 }
2354
2355 pub fn tool_spans_in_window(
2356 &self,
2357 workspace: &str,
2358 start_ms: u64,
2359 end_ms: u64,
2360 ) -> Result<Vec<ToolSpanView>> {
2361 let mut stmt = self.conn.prepare(
2362 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2363 reasoning_tokens, cost_usd_e6, paths_json,
2364 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2365 FROM (
2366 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2367 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2368 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2369 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2370 ts.started_at_ms AS sort_ms
2371 FROM tool_spans ts
2372 JOIN sessions s ON s.id = ts.session_id
2373 WHERE s.workspace = ?1
2374 AND ts.started_at_ms >= ?2
2375 AND ts.started_at_ms <= ?3
2376 UNION ALL
2377 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2378 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2379 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2380 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2381 ts.ended_at_ms AS sort_ms
2382 FROM tool_spans ts
2383 JOIN sessions s ON s.id = ts.session_id
2384 WHERE s.workspace = ?1
2385 AND ts.started_at_ms IS NULL
2386 AND ts.ended_at_ms >= ?2
2387 AND ts.ended_at_ms <= ?3
2388 )
2389 ORDER BY sort_ms DESC",
2390 )?;
2391 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2392 let paths_json: String = row.get(8)?;
2393 Ok(ToolSpanView {
2394 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2395 tool: row
2396 .get::<_, Option<String>>(1)?
2397 .unwrap_or_else(|| "unknown".into()),
2398 status: row.get(2)?,
2399 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2400 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2401 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2402 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2403 cost_usd_e6: row.get(7)?,
2404 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2405 parent_span_id: row.get(9)?,
2406 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2407 subtree_cost_usd_e6: row.get(11)?,
2408 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2409 })
2410 })?;
2411 Ok(rows.filter_map(|row| row.ok()).collect())
2412 }
2413
2414 pub fn session_span_tree(
2415 &self,
2416 session_id: &str,
2417 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2418 let last_event_seq = self.last_event_seq_for_session(session_id)?;
2419 if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2420 && entry.session_id == session_id
2421 && entry.last_event_seq == last_event_seq
2422 {
2423 return Ok(entry.nodes.clone());
2424 }
2425 let mut stmt = self.conn.prepare(
2426 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2427 reasoning_tokens, cost_usd_e6, paths_json,
2428 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2429 FROM tool_spans
2430 WHERE session_id = ?1
2431 ORDER BY depth ASC, started_at_ms ASC",
2432 )?;
2433 let rows = stmt.query_map(params![session_id], |row| {
2434 let paths_json: String = row.get(8)?;
2435 Ok(crate::metrics::types::ToolSpanView {
2436 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2437 tool: row
2438 .get::<_, Option<String>>(1)?
2439 .unwrap_or_else(|| "unknown".into()),
2440 status: row.get(2)?,
2441 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2442 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2443 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2444 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2445 cost_usd_e6: row.get(7)?,
2446 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2447 parent_span_id: row.get(9)?,
2448 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2449 subtree_cost_usd_e6: row.get(11)?,
2450 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2451 })
2452 })?;
2453 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2454 let nodes = crate::store::span_tree::build_tree(spans);
2455 *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2456 session_id: session_id.to_string(),
2457 last_event_seq,
2458 nodes: nodes.clone(),
2459 });
2460 Ok(nodes)
2461 }
2462
2463 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2464 let seq = self
2465 .conn
2466 .query_row(
2467 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2468 params![session_id],
2469 |r| r.get::<_, Option<i64>>(0),
2470 )?
2471 .map(|v| v as u64);
2472 Ok(seq)
2473 }
2474
2475 pub fn tool_spans_sync_rows_in_window(
2481 &self,
2482 workspace: &str,
2483 start_ms: u64,
2484 end_ms: u64,
2485 ) -> Result<Vec<ToolSpanSyncRow>> {
2486 let mut stmt = self.conn.prepare(
2487 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2488 lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2489 FROM (
2490 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2491 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2492 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2493 ts.started_at_ms AS sort_ms
2494 FROM tool_spans ts
2495 JOIN sessions s ON s.id = ts.session_id
2496 WHERE s.workspace = ?1
2497 AND ts.started_at_ms IS NOT NULL
2498 AND ts.started_at_ms >= ?2
2499 AND ts.started_at_ms <= ?3
2500 UNION ALL
2501 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2502 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2503 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2504 ts.ended_at_ms AS sort_ms
2505 FROM tool_spans ts
2506 JOIN sessions s ON s.id = ts.session_id
2507 WHERE s.workspace = ?1
2508 AND ts.started_at_ms IS NULL
2509 AND ts.ended_at_ms IS NOT NULL
2510 AND ts.ended_at_ms >= ?2
2511 AND ts.ended_at_ms <= ?3
2512 )
2513 ORDER BY sort_ms ASC, span_id ASC",
2514 )?;
2515 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2516 let paths_json: String = row.get(12)?;
2517 Ok(ToolSpanSyncRow {
2518 span_id: row.get(0)?,
2519 session_id: row.get(1)?,
2520 tool: row.get(2)?,
2521 tool_call_id: row.get(3)?,
2522 status: row.get(4)?,
2523 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2524 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2525 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2526 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2527 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2528 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2529 cost_usd_e6: row.get(11)?,
2530 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2531 })
2532 })?;
2533 Ok(rows.filter_map(|row| row.ok()).collect())
2534 }
2535
2536 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2537 let mut stmt = self.conn.prepare(
2538 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2539 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2540 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2541 )?;
2542 let rows = stmt.query_map(params![session_id], |row| {
2543 let paths_json: String = row.get(12)?;
2544 Ok(ToolSpanSyncRow {
2545 span_id: row.get(0)?,
2546 session_id: row.get(1)?,
2547 tool: row.get(2)?,
2548 tool_call_id: row.get(3)?,
2549 status: row.get(4)?,
2550 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2551 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2552 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2553 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2554 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2555 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2556 cost_usd_e6: row.get(11)?,
2557 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2558 })
2559 })?;
2560 Ok(rows.filter_map(|row| row.ok()).collect())
2561 }
2562
2563 pub fn upsert_trace_span(&self, span: &TraceSpanRecord) -> Result<()> {
2564 self.conn.execute(
2565 "INSERT INTO trace_spans (
2566 span_id, trace_id, parent_span_id, session_id, kind, name, status,
2567 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2568 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2569 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)
2570 ON CONFLICT(span_id) DO UPDATE SET
2571 trace_id=excluded.trace_id, parent_span_id=excluded.parent_span_id,
2572 session_id=excluded.session_id, kind=excluded.kind, name=excluded.name,
2573 status=excluded.status, started_at_ms=excluded.started_at_ms,
2574 ended_at_ms=excluded.ended_at_ms, duration_ms=excluded.duration_ms,
2575 model=excluded.model, tool=excluded.tool, tokens_in=excluded.tokens_in,
2576 tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
2577 cost_usd_e6=excluded.cost_usd_e6,
2578 context_used_tokens=excluded.context_used_tokens,
2579 context_max_tokens=excluded.context_max_tokens, payload=excluded.payload",
2580 params![
2581 span.span_id.as_str(),
2582 span.trace_id.as_str(),
2583 span.parent_span_id.as_deref(),
2584 span.session_id.as_str(),
2585 span.kind.as_str(),
2586 span.name.as_str(),
2587 span.status.as_str(),
2588 span.started_at_ms.map(|v| v as i64),
2589 span.ended_at_ms.map(|v| v as i64),
2590 span.duration_ms.map(|v| v as i64),
2591 span.model.as_deref(),
2592 span.tool.as_deref(),
2593 span.tokens_in.map(|v| v as i64),
2594 span.tokens_out.map(|v| v as i64),
2595 span.reasoning_tokens.map(|v| v as i64),
2596 span.cost_usd_e6,
2597 span.context_used_tokens.map(|v| v as i64),
2598 span.context_max_tokens.map(|v| v as i64),
2599 serde_json::to_string(&span.payload)?,
2600 ],
2601 )?;
2602 Ok(())
2603 }
2604
2605 pub fn trace_spans_for_session(&self, session_id: &str) -> Result<Vec<TraceSpanRecord>> {
2606 let mut stmt = self.conn.prepare(
2607 "SELECT span_id, trace_id, parent_span_id, session_id, kind, name, status,
2608 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2609 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2610 FROM trace_spans WHERE session_id = ?1
2611 ORDER BY COALESCE(started_at_ms, ended_at_ms, 0), span_id",
2612 )?;
2613 let rows = stmt.query_map(params![session_id], trace_span_from_row)?;
2614 Ok(rows.filter_map(|row| row.ok()).collect())
2615 }
2616
2617 pub(crate) fn capture_quality_rows(
2618 &self,
2619 workspace: &str,
2620 start_ms: u64,
2621 end_ms: u64,
2622 ) -> Result<Vec<CaptureQualityRow>> {
2623 let mut stmt = self.conn.prepare(
2624 "SELECT e.source,
2625 e.tokens_in IS NOT NULL OR e.tokens_out IS NOT NULL OR e.reasoning_tokens IS NOT NULL,
2626 e.latency_ms IS NOT NULL OR e.ttft_ms IS NOT NULL,
2627 e.context_used_tokens IS NOT NULL AND e.context_max_tokens IS NOT NULL
2628 FROM events e JOIN sessions s ON s.id = e.session_id
2629 WHERE s.workspace = ?1 AND e.ts_ms >= ?2 AND e.ts_ms <= ?3",
2630 )?;
2631 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2632 Ok(CaptureQualityRow {
2633 source: row.get(0)?,
2634 has_tokens: row.get::<_, i64>(1)? != 0,
2635 has_latency: row.get::<_, i64>(2)? != 0,
2636 has_context: row.get::<_, i64>(3)? != 0,
2637 })
2638 })?;
2639 Ok(rows.filter_map(|row| row.ok()).collect())
2640 }
2641
2642 pub(crate) fn trace_span_quality_rows(
2643 &self,
2644 workspace: &str,
2645 start_ms: u64,
2646 end_ms: u64,
2647 ) -> Result<Vec<TraceSpanQualityRow>> {
2648 let mut stmt = self.conn.prepare(
2649 "SELECT ts.kind,
2650 ts.parent_span_id IS NOT NULL
2651 AND parent.span_id IS NULL
2652 AND ts.kind NOT IN ('session', 'agent')
2653 FROM trace_spans ts
2654 JOIN sessions s ON s.id = ts.session_id
2655 LEFT JOIN trace_spans parent ON parent.span_id = ts.parent_span_id
2656 WHERE s.workspace = ?1
2657 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
2658 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3",
2659 )?;
2660 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2661 Ok(TraceSpanQualityRow {
2662 kind: row.get(0)?,
2663 is_orphan: row.get::<_, i64>(1)? != 0,
2664 })
2665 })?;
2666 Ok(rows.filter_map(|row| row.ok()).collect())
2667 }
2668
2669 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2670 self.conn.execute(
2671 "INSERT OR REPLACE INTO session_evals
2672 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2673 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2674 rusqlite::params![
2675 eval.id,
2676 eval.session_id,
2677 eval.judge_model,
2678 eval.rubric_id,
2679 eval.score,
2680 eval.rationale,
2681 eval.flagged as i64,
2682 eval.created_at_ms as i64,
2683 ],
2684 )?;
2685 Ok(())
2686 }
2687
2688 pub fn list_evals_in_window(
2689 &self,
2690 start_ms: u64,
2691 end_ms: u64,
2692 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2693 let mut stmt = self.conn.prepare(
2694 "SELECT id, session_id, judge_model, rubric_id, score,
2695 rationale, flagged, created_at_ms
2696 FROM session_evals
2697 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2698 ORDER BY created_at_ms ASC",
2699 )?;
2700 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2701 Ok(crate::eval::types::EvalRow {
2702 id: r.get(0)?,
2703 session_id: r.get(1)?,
2704 judge_model: r.get(2)?,
2705 rubric_id: r.get(3)?,
2706 score: r.get(4)?,
2707 rationale: r.get(5)?,
2708 flagged: r.get::<_, i64>(6)? != 0,
2709 created_at_ms: r.get::<_, i64>(7)? as u64,
2710 })
2711 })?;
2712 rows.collect()
2713 }
2714
2715 pub fn list_evals_for_session(
2716 &self,
2717 session_id: &str,
2718 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2719 let mut stmt = self.conn.prepare(
2720 "SELECT id, session_id, judge_model, rubric_id, score,
2721 rationale, flagged, created_at_ms
2722 FROM session_evals
2723 WHERE session_id = ?1
2724 ORDER BY created_at_ms DESC",
2725 )?;
2726 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2727 Ok(crate::eval::types::EvalRow {
2728 id: r.get(0)?,
2729 session_id: r.get(1)?,
2730 judge_model: r.get(2)?,
2731 rubric_id: r.get(3)?,
2732 score: r.get(4)?,
2733 rationale: r.get(5)?,
2734 flagged: r.get::<_, i64>(6)? != 0,
2735 created_at_ms: r.get::<_, i64>(7)? as u64,
2736 })
2737 })?;
2738 rows.collect()
2739 }
2740
2741 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2742 use crate::feedback::types::FeedbackLabel;
2743 self.conn.execute(
2744 "INSERT OR REPLACE INTO session_feedback
2745 (id, session_id, score, label, note, created_at_ms)
2746 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2747 rusqlite::params![
2748 r.id,
2749 r.session_id,
2750 r.score.as_ref().map(|s| s.0 as i64),
2751 r.label.as_ref().map(FeedbackLabel::to_db_str),
2752 r.note,
2753 r.created_at_ms as i64,
2754 ],
2755 )?;
2756 let payload = serde_json::to_string(r).unwrap_or_default();
2757 self.conn.execute(
2758 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2759 VALUES (?1, 'session_feedback', ?2, 0)",
2760 rusqlite::params![r.session_id, payload],
2761 )?;
2762 Ok(())
2763 }
2764
2765 pub fn list_feedback_in_window(
2766 &self,
2767 start_ms: u64,
2768 end_ms: u64,
2769 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2770 let mut stmt = self.conn.prepare(
2771 "SELECT id, session_id, score, label, note, created_at_ms
2772 FROM session_feedback
2773 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2774 ORDER BY created_at_ms ASC",
2775 )?;
2776 let rows = stmt.query_map(
2777 rusqlite::params![start_ms as i64, end_ms as i64],
2778 feedback_row,
2779 )?;
2780 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2781 }
2782
2783 pub fn feedback_for_sessions(
2784 &self,
2785 ids: &[String],
2786 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2787 if ids.is_empty() {
2788 return Ok(std::collections::HashMap::new());
2789 }
2790 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2791 let sql = format!(
2792 "SELECT id, session_id, score, label, note, created_at_ms
2793 FROM session_feedback WHERE session_id IN ({placeholders})
2794 ORDER BY created_at_ms DESC"
2795 );
2796 let mut stmt = self.conn.prepare(&sql)?;
2797 let params: Vec<&dyn rusqlite::ToSql> =
2798 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2799 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2800 let mut map = std::collections::HashMap::new();
2801 for row in rows {
2802 let r = row?;
2803 map.entry(r.session_id.clone()).or_insert(r);
2804 }
2805 Ok(map)
2806 }
2807
2808 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2809 self.conn.execute(
2810 "INSERT INTO session_outcomes (
2811 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2812 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2813 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2814 ON CONFLICT(session_id) DO UPDATE SET
2815 test_passed=excluded.test_passed,
2816 test_failed=excluded.test_failed,
2817 test_skipped=excluded.test_skipped,
2818 build_ok=excluded.build_ok,
2819 lint_errors=excluded.lint_errors,
2820 revert_lines_14d=excluded.revert_lines_14d,
2821 pr_open=excluded.pr_open,
2822 ci_ok=excluded.ci_ok,
2823 measured_at_ms=excluded.measured_at_ms,
2824 measure_error=excluded.measure_error",
2825 params![
2826 row.session_id,
2827 row.test_passed,
2828 row.test_failed,
2829 row.test_skipped,
2830 row.build_ok.map(bool_to_i64),
2831 row.lint_errors,
2832 row.revert_lines_14d,
2833 row.pr_open,
2834 row.ci_ok.map(bool_to_i64),
2835 row.measured_at_ms as i64,
2836 row.measure_error.as_deref(),
2837 ],
2838 )?;
2839 Ok(())
2840 }
2841
2842 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2843 let mut stmt = self.conn.prepare(
2844 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2845 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2846 FROM session_outcomes WHERE session_id = ?1",
2847 )?;
2848 let row = stmt
2849 .query_row(params![session_id], outcome_row)
2850 .optional()?;
2851 Ok(row)
2852 }
2853
2854 pub fn list_session_outcomes_in_window(
2856 &self,
2857 workspace: &str,
2858 start_ms: u64,
2859 end_ms: u64,
2860 ) -> Result<Vec<SessionOutcomeRow>> {
2861 let mut stmt = self.conn.prepare(
2862 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2863 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2864 FROM session_outcomes o
2865 JOIN sessions s ON s.id = o.session_id
2866 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2867 ORDER BY o.measured_at_ms ASC",
2868 )?;
2869 let rows = stmt.query_map(
2870 params![workspace, start_ms as i64, end_ms as i64],
2871 outcome_row,
2872 )?;
2873 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2874 }
2875
2876 pub fn append_session_sample(
2877 &self,
2878 session_id: &str,
2879 ts_ms: u64,
2880 pid: u32,
2881 cpu_percent: Option<f64>,
2882 rss_bytes: Option<u64>,
2883 ) -> Result<()> {
2884 self.conn.execute(
2885 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2886 VALUES (?1, ?2, ?3, ?4, ?5)",
2887 params![
2888 session_id,
2889 ts_ms as i64,
2890 pid as i64,
2891 cpu_percent,
2892 rss_bytes.map(|b| b as i64)
2893 ],
2894 )?;
2895 Ok(())
2896 }
2897
2898 pub fn list_session_sample_aggs_in_window(
2900 &self,
2901 workspace: &str,
2902 start_ms: u64,
2903 end_ms: u64,
2904 ) -> Result<Vec<SessionSampleAgg>> {
2905 let mut stmt = self.conn.prepare(
2906 "SELECT ss.session_id, COUNT(*) AS n,
2907 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2908 FROM session_samples ss
2909 JOIN sessions s ON s.id = ss.session_id
2910 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2911 GROUP BY ss.session_id",
2912 )?;
2913 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2914 let sid: String = r.get(0)?;
2915 let n: i64 = r.get(1)?;
2916 let max_cpu: Option<f64> = r.get(2)?;
2917 let max_rss: Option<i64> = r.get(3)?;
2918 Ok(SessionSampleAgg {
2919 session_id: sid,
2920 sample_count: n as u64,
2921 max_cpu_percent: max_cpu.unwrap_or(0.0),
2922 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2923 })
2924 })?;
2925 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2926 }
2927
2928 pub fn list_sessions_for_eval(
2929 &self,
2930 since_ms: u64,
2931 min_cost_usd: f64,
2932 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2933 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2934 let mut stmt = self.conn.prepare(
2935 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2936 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2937 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2938 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2939 FROM sessions s
2940 WHERE s.started_at_ms >= ?1
2941 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2942 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2943 ORDER BY s.started_at_ms DESC",
2944 )?;
2945 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2946 Ok((
2947 r.get::<_, String>(0)?,
2948 r.get::<_, String>(1)?,
2949 r.get::<_, Option<String>>(2)?,
2950 r.get::<_, String>(3)?,
2951 r.get::<_, i64>(4)?,
2952 r.get::<_, Option<i64>>(5)?,
2953 r.get::<_, String>(6)?,
2954 r.get::<_, String>(7)?,
2955 r.get::<_, Option<String>>(8)?,
2956 r.get::<_, Option<String>>(9)?,
2957 r.get::<_, Option<String>>(10)?,
2958 r.get::<_, Option<i64>>(11)?,
2959 r.get::<_, Option<i64>>(12)?,
2960 r.get::<_, Option<String>>(13)?,
2961 r.get::<_, Option<String>>(14)?,
2962 r.get::<_, Option<String>>(15)?,
2963 r.get::<_, Option<String>>(16)?,
2964 r.get::<_, Option<String>>(17)?,
2965 r.get::<_, Option<String>>(18)?,
2966 r.get::<_, Option<i64>>(19)?,
2967 r.get::<_, Option<i64>>(20)?,
2968 ))
2969 })?;
2970 let mut out = Vec::new();
2971 for row in rows {
2972 let (
2973 id,
2974 agent,
2975 model,
2976 workspace,
2977 started,
2978 ended,
2979 status_str,
2980 trace,
2981 start_commit,
2982 end_commit,
2983 branch,
2984 dirty_start,
2985 dirty_end,
2986 source,
2987 prompt_fingerprint,
2988 parent_session_id,
2989 agent_version,
2990 os,
2991 arch,
2992 repo_file_count,
2993 repo_total_loc,
2994 ) = row?;
2995 out.push(crate::core::event::SessionRecord {
2996 id,
2997 agent,
2998 model,
2999 workspace,
3000 started_at_ms: started as u64,
3001 ended_at_ms: ended.map(|v| v as u64),
3002 status: status_from_str(&status_str),
3003 trace_path: trace,
3004 start_commit,
3005 end_commit,
3006 branch,
3007 dirty_start: dirty_start.map(i64_to_bool),
3008 dirty_end: dirty_end.map(i64_to_bool),
3009 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
3010 prompt_fingerprint,
3011 parent_session_id,
3012 agent_version,
3013 os,
3014 arch,
3015 repo_file_count: repo_file_count.map(|v| v as u32),
3016 repo_total_loc: repo_total_loc.map(|v| v as u64),
3017 });
3018 }
3019 Ok(out)
3020 }
3021
3022 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
3023 self.conn.execute(
3024 "INSERT OR IGNORE INTO prompt_snapshots
3025 (fingerprint, captured_at_ms, files_json, total_bytes)
3026 VALUES (?1, ?2, ?3, ?4)",
3027 params![
3028 snap.fingerprint,
3029 snap.captured_at_ms as i64,
3030 snap.files_json,
3031 snap.total_bytes as i64
3032 ],
3033 )?;
3034 Ok(())
3035 }
3036
3037 pub fn get_prompt_snapshot(
3038 &self,
3039 fingerprint: &str,
3040 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
3041 self.conn
3042 .query_row(
3043 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3044 FROM prompt_snapshots WHERE fingerprint = ?1",
3045 params![fingerprint],
3046 |r| {
3047 Ok(crate::prompt::PromptSnapshot {
3048 fingerprint: r.get(0)?,
3049 captured_at_ms: r.get::<_, i64>(1)? as u64,
3050 files_json: r.get(2)?,
3051 total_bytes: r.get::<_, i64>(3)? as u64,
3052 })
3053 },
3054 )
3055 .optional()
3056 .map_err(Into::into)
3057 }
3058
3059 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
3060 let mut stmt = self.conn.prepare(
3061 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3062 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
3063 )?;
3064 let rows = stmt.query_map([], |r| {
3065 Ok(crate::prompt::PromptSnapshot {
3066 fingerprint: r.get(0)?,
3067 captured_at_ms: r.get::<_, i64>(1)? as u64,
3068 files_json: r.get(2)?,
3069 total_bytes: r.get::<_, i64>(3)? as u64,
3070 })
3071 })?;
3072 Ok(rows.filter_map(|r| r.ok()).collect())
3073 }
3074
3075 pub fn sessions_with_prompt_fingerprint(
3077 &self,
3078 workspace: &str,
3079 start_ms: u64,
3080 end_ms: u64,
3081 ) -> Result<Vec<(String, String)>> {
3082 let mut stmt = self.conn.prepare(
3083 "SELECT id, prompt_fingerprint FROM sessions
3084 WHERE workspace = ?1
3085 AND started_at_ms >= ?2 AND started_at_ms < ?3
3086 AND prompt_fingerprint IS NOT NULL",
3087 )?;
3088 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
3089 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
3090 })?;
3091 Ok(rows.filter_map(|r| r.ok()).collect())
3092 }
3093}
3094
3095impl Drop for Store {
3096 fn drop(&mut self) {
3097 if let Some(writer) = self.search_writer.get_mut().as_mut() {
3098 let _ = writer.commit();
3099 }
3100 }
3101}
3102
3103fn now_ms() -> u64 {
3104 std::time::SystemTime::now()
3105 .duration_since(std::time::UNIX_EPOCH)
3106 .unwrap_or_default()
3107 .as_millis() as u64
3108}
3109
3110fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
3111 let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
3112 let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
3113 Ok(rows.filter_map(|r| r.ok()).collect())
3114}
3115
3116fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
3117 raw.and_then(|s| s.trim().parse::<u64>().ok())
3118 .unwrap_or(DEFAULT_MMAP_MB)
3119 .saturating_mul(1024)
3120 .saturating_mul(1024)
3121 .min(i64::MAX as u64) as i64
3122}
3123
3124fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
3125 let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
3126 conn.execute_batch(&format!(
3127 "
3128 PRAGMA journal_mode=WAL;
3129 PRAGMA busy_timeout=5000;
3130 PRAGMA synchronous=NORMAL;
3131 PRAGMA cache_size=-65536;
3132 PRAGMA mmap_size={mmap_size};
3133 PRAGMA temp_store=MEMORY;
3134 PRAGMA wal_autocheckpoint=1000;
3135 "
3136 ))?;
3137 if mode == StoreOpenMode::ReadOnlyQuery {
3138 conn.execute_batch("PRAGMA query_only=ON;")?;
3139 }
3140 Ok(())
3141}
3142
3143fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
3144 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
3145}
3146
3147fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
3148 let status_str: String = row.get(6)?;
3149 Ok(SessionRecord {
3150 id: row.get(0)?,
3151 agent: row.get(1)?,
3152 model: row.get(2)?,
3153 workspace: row.get(3)?,
3154 started_at_ms: row.get::<_, i64>(4)? as u64,
3155 ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3156 status: status_from_str(&status_str),
3157 trace_path: row.get(7)?,
3158 start_commit: row.get(8)?,
3159 end_commit: row.get(9)?,
3160 branch: row.get(10)?,
3161 dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3162 dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3163 repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
3164 prompt_fingerprint: row.get(14)?,
3165 parent_session_id: row.get(15)?,
3166 agent_version: row.get(16)?,
3167 os: row.get(17)?,
3168 arch: row.get(18)?,
3169 repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3170 repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3171 })
3172}
3173
3174fn ranked_file_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedFile> {
3175 Ok(RankedFile {
3176 path: row.get(0)?,
3177 value: row.get::<_, i64>(1)? as u64,
3178 complexity_total: row.get::<_, i64>(2)? as u32,
3179 churn_30d: row.get::<_, i64>(3)? as u32,
3180 })
3181}
3182
3183fn ranked_tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedTool> {
3184 Ok(RankedTool {
3185 tool: row.get(0)?,
3186 calls: row.get::<_, i64>(1)? as u64,
3187 p50_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
3188 p95_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
3189 total_tokens: row.get::<_, i64>(4)? as u64,
3190 total_reasoning_tokens: row.get::<_, i64>(5)? as u64,
3191 })
3192}
3193
3194fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
3195 let payload_str: String = row.get(12)?;
3196 Ok(Event {
3197 session_id: row.get(0)?,
3198 seq: row.get::<_, i64>(1)? as u64,
3199 ts_ms: row.get::<_, i64>(2)? as u64,
3200 ts_exact: row.get::<_, i64>(3)? != 0,
3201 kind: kind_from_str(&row.get::<_, String>(4)?),
3202 source: source_from_str(&row.get::<_, String>(5)?),
3203 tool: row.get(6)?,
3204 tool_call_id: row.get(7)?,
3205 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
3206 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3207 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
3208 cost_usd_e6: row.get(11)?,
3209 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
3210 stop_reason: row.get(13)?,
3211 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3212 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
3213 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
3214 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3215 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
3216 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3217 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
3218 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
3219 })
3220}
3221
3222fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> {
3223 Ok((row.get(22)?, event_row(row)?))
3224}
3225
3226fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
3227 let mut clauses = vec!["workspace = ?".to_string()];
3228 let mut args = vec![Value::Text(workspace.to_string())];
3229 if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
3230 clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
3231 args.push(Value::Text(format!("{}%", escape_like(prefix))));
3232 }
3233 if let Some(status) = &filter.status {
3234 clauses.push("status = ?".to_string());
3235 args.push(Value::Text(format!("{status:?}")));
3236 }
3237 if let Some(since_ms) = filter.since_ms {
3238 clauses.push("started_at_ms >= ?".to_string());
3239 args.push(Value::Integer(since_ms as i64));
3240 }
3241 (format!("WHERE {}", clauses.join(" AND ")), args)
3242}
3243
3244fn escape_like(raw: &str) -> String {
3245 raw.to_lowercase()
3246 .replace('\\', "\\\\")
3247 .replace('%', "\\%")
3248 .replace('_', "\\_")
3249}
3250
3251fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
3252 let cost: i64 = conn.query_row(
3253 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
3254 params![workspace], |r| r.get(0),
3255 )?;
3256 let with_cost: i64 = conn.query_row(
3257 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
3258 params![workspace], |r| r.get(0),
3259 )?;
3260 Ok((cost, with_cost as u64))
3261}
3262
3263fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
3264 let build_raw: Option<i64> = r.get(4)?;
3265 let ci_raw: Option<i64> = r.get(8)?;
3266 Ok(SessionOutcomeRow {
3267 session_id: r.get(0)?,
3268 test_passed: r.get(1)?,
3269 test_failed: r.get(2)?,
3270 test_skipped: r.get(3)?,
3271 build_ok: build_raw.map(|v| v != 0),
3272 lint_errors: r.get(5)?,
3273 revert_lines_14d: r.get(6)?,
3274 pr_open: r.get(7)?,
3275 ci_ok: ci_raw.map(|v| v != 0),
3276 measured_at_ms: r.get::<_, i64>(9)? as u64,
3277 measure_error: r.get(10)?,
3278 })
3279}
3280
3281fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
3282 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
3283 let score = r
3284 .get::<_, Option<i64>>(2)?
3285 .and_then(|v| FeedbackScore::new(v as u8));
3286 let label = r
3287 .get::<_, Option<String>>(3)?
3288 .and_then(|s| FeedbackLabel::from_str_opt(&s));
3289 Ok(FeedbackRecord {
3290 id: r.get(0)?,
3291 session_id: r.get(1)?,
3292 score,
3293 label,
3294 note: r.get(4)?,
3295 created_at_ms: r.get::<_, i64>(5)? as u64,
3296 })
3297}
3298
3299fn day_label(day_idx: u64) -> &'static str {
3300 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
3301}
3302
3303fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
3304 let week_ago = now.saturating_sub(7 * 86_400_000);
3305 let mut stmt = conn
3306 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
3307 let days: Vec<u64> = stmt
3308 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
3309 .filter_map(|r| r.ok())
3310 .map(|v| v as u64 / 86_400_000)
3311 .collect();
3312 let today = now / 86_400_000;
3313 Ok((0u64..7)
3314 .map(|i| {
3315 let d = today.saturating_sub(6 - i);
3316 (
3317 day_label(d).to_string(),
3318 days.iter().filter(|&&x| x == d).count() as u64,
3319 )
3320 })
3321 .collect())
3322}
3323
3324fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3325 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3326 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3327 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3328 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3329 COUNT(e.id) FROM sessions s \
3330 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3331 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3332 let mut stmt = conn.prepare(sql)?;
3333 let out: Vec<(SessionRecord, u64)> = stmt
3334 .query_map(params![workspace], |r| {
3335 let st: String = r.get(6)?;
3336 Ok((
3337 SessionRecord {
3338 id: r.get(0)?,
3339 agent: r.get(1)?,
3340 model: r.get(2)?,
3341 workspace: r.get(3)?,
3342 started_at_ms: r.get::<_, i64>(4)? as u64,
3343 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3344 status: status_from_str(&st),
3345 trace_path: r.get(7)?,
3346 start_commit: r.get(8)?,
3347 end_commit: r.get(9)?,
3348 branch: r.get(10)?,
3349 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3350 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3351 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3352 prompt_fingerprint: r.get(14)?,
3353 parent_session_id: r.get(15)?,
3354 agent_version: r.get(16)?,
3355 os: r.get(17)?,
3356 arch: r.get(18)?,
3357 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3358 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3359 },
3360 r.get::<_, i64>(21)? as u64,
3361 ))
3362 })?
3363 .filter_map(|r| r.ok())
3364 .collect();
3365 Ok(out)
3366}
3367
3368fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3369 let mut stmt = conn.prepare(
3370 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3371 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3372 )?;
3373 let out: Vec<(String, u64)> = stmt
3374 .query_map(params![workspace], |r| {
3375 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3376 })?
3377 .filter_map(|r| r.ok())
3378 .collect();
3379 Ok(out)
3380}
3381
3382fn trace_span_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TraceSpanRecord> {
3383 let kind: String = row.get(4)?;
3384 let payload: String = row.get(18)?;
3385 Ok(TraceSpanRecord {
3386 span_id: row.get(0)?,
3387 trace_id: row.get(1)?,
3388 parent_span_id: row.get(2)?,
3389 session_id: row.get(3)?,
3390 kind: TraceSpanKind::parse(&kind),
3391 name: row.get(5)?,
3392 status: row.get(6)?,
3393 started_at_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
3394 ended_at_ms: row.get::<_, Option<i64>>(8)?.map(|v| v as u64),
3395 duration_ms: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3396 model: row.get(10)?,
3397 tool: row.get(11)?,
3398 tokens_in: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
3399 tokens_out: row.get::<_, Option<i64>>(13)?.map(|v| v as u32),
3400 reasoning_tokens: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3401 cost_usd_e6: row.get(15)?,
3402 context_used_tokens: row.get::<_, Option<i64>>(16)?.map(|v| v as u32),
3403 context_max_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3404 payload: serde_json::from_str(&payload).unwrap_or_default(),
3405 })
3406}
3407
3408fn status_from_str(s: &str) -> SessionStatus {
3409 match s {
3410 "Running" => SessionStatus::Running,
3411 "Waiting" => SessionStatus::Waiting,
3412 "Idle" => SessionStatus::Idle,
3413 _ => SessionStatus::Done,
3414 }
3415}
3416
3417fn projector_legacy_mode() -> bool {
3418 std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3419}
3420
3421fn is_stop_event(e: &Event) -> bool {
3422 if !matches!(e.kind, EventKind::Hook) {
3423 return false;
3424 }
3425 e.payload
3426 .get("event")
3427 .and_then(|v| v.as_str())
3428 .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3429 == Some("Stop")
3430}
3431
3432fn kind_from_str(s: &str) -> EventKind {
3433 match s {
3434 "ToolCall" => EventKind::ToolCall,
3435 "ToolResult" => EventKind::ToolResult,
3436 "Message" => EventKind::Message,
3437 "Error" => EventKind::Error,
3438 "Cost" => EventKind::Cost,
3439 "Hook" => EventKind::Hook,
3440 "Lifecycle" => EventKind::Lifecycle,
3441 _ => EventKind::Hook,
3442 }
3443}
3444
3445fn source_from_str(s: &str) -> EventSource {
3446 match s {
3447 "Tail" => EventSource::Tail,
3448 "Hook" => EventSource::Hook,
3449 _ => EventSource::Proxy,
3450 }
3451}
3452
3453fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3454 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3455 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3456 ensure_column(conn, "sessions", "branch", "TEXT")?;
3457 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3458 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3459 ensure_column(
3460 conn,
3461 "sessions",
3462 "repo_binding_source",
3463 "TEXT NOT NULL DEFAULT ''",
3464 )?;
3465 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3466 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3467 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3468 ensure_column(conn, "events", "stop_reason", "TEXT")?;
3469 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3470 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3471 ensure_column(conn, "events", "retry_count", "INTEGER")?;
3472 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3473 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3474 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3475 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3476 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3477 ensure_column(
3478 conn,
3479 "sync_outbox",
3480 "kind",
3481 "TEXT NOT NULL DEFAULT 'events'",
3482 )?;
3483 ensure_column(
3484 conn,
3485 "experiments",
3486 "state",
3487 "TEXT NOT NULL DEFAULT 'Draft'",
3488 )?;
3489 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3490 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3491 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3492 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3493 ensure_column(conn, "sessions", "os", "TEXT")?;
3494 ensure_column(conn, "sessions", "arch", "TEXT")?;
3495 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3496 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3497 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3498 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3499 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3500 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3501 conn.execute_batch(
3502 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3503 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3504 )?;
3505 Ok(())
3506}
3507
3508fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3509 if has_column(conn, table, column)? {
3510 return Ok(());
3511 }
3512 let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}");
3513 match conn.execute(&sql, []) {
3514 Ok(_) => Ok(()),
3515 Err(err) if column_was_added_by_race(conn, table, column, &err)? => Ok(()),
3516 Err(err) => Err(err.into()),
3517 }
3518}
3519
3520fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3521 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3522 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3523 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3524}
3525
3526fn column_was_added_by_race(
3527 conn: &Connection,
3528 table: &str,
3529 column: &str,
3530 err: &rusqlite::Error,
3531) -> Result<bool> {
3532 if !is_duplicate_column_error(err) {
3533 return Ok(false);
3534 }
3535 has_column(conn, table, column)
3536}
3537
3538fn is_duplicate_column_error(err: &rusqlite::Error) -> bool {
3539 matches!(
3540 err,
3541 rusqlite::Error::SqliteFailure(_, Some(message))
3542 if message.contains("duplicate column name")
3543 )
3544}
3545
3546fn bool_to_i64(v: bool) -> i64 {
3547 if v { 1 } else { 0 }
3548}
3549
3550fn i64_to_bool(v: i64) -> bool {
3551 v != 0
3552}
3553
3554fn empty_to_none(s: String) -> Option<String> {
3555 if s.is_empty() { None } else { Some(s) }
3556}
3557
3558#[cfg(test)]
3559mod tests {
3560 use super::*;
3561 use serde_json::json;
3562 use std::collections::HashSet;
3563 use tempfile::TempDir;
3564
3565 fn make_session(id: &str) -> SessionRecord {
3566 SessionRecord {
3567 id: id.to_string(),
3568 agent: "cursor".to_string(),
3569 model: None,
3570 workspace: "/ws".to_string(),
3571 started_at_ms: 1000,
3572 ended_at_ms: None,
3573 status: SessionStatus::Done,
3574 trace_path: "/trace".to_string(),
3575 start_commit: None,
3576 end_commit: None,
3577 branch: None,
3578 dirty_start: None,
3579 dirty_end: None,
3580 repo_binding_source: None,
3581 prompt_fingerprint: None,
3582 parent_session_id: None,
3583 agent_version: None,
3584 os: None,
3585 arch: None,
3586 repo_file_count: None,
3587 repo_total_loc: None,
3588 }
3589 }
3590
3591 fn make_event(session_id: &str, seq: u64) -> Event {
3592 Event {
3593 session_id: session_id.to_string(),
3594 seq,
3595 ts_ms: 1000 + seq * 100,
3596 ts_exact: false,
3597 kind: EventKind::ToolCall,
3598 source: EventSource::Tail,
3599 tool: Some("read_file".to_string()),
3600 tool_call_id: Some(format!("call_{seq}")),
3601 tokens_in: None,
3602 tokens_out: None,
3603 reasoning_tokens: None,
3604 cost_usd_e6: None,
3605 stop_reason: None,
3606 latency_ms: None,
3607 ttft_ms: None,
3608 retry_count: None,
3609 context_used_tokens: None,
3610 context_max_tokens: None,
3611 cache_creation_tokens: None,
3612 cache_read_tokens: None,
3613 system_prompt_tokens: None,
3614 payload: json!({}),
3615 }
3616 }
3617
3618 #[test]
3619 fn open_and_wal_mode() {
3620 let dir = TempDir::new().unwrap();
3621 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3622 let mode: String = store
3623 .conn
3624 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3625 .unwrap();
3626 assert_eq!(mode, "wal");
3627 }
3628
3629 #[test]
3630 fn open_applies_phase0_pragmas() {
3631 let dir = TempDir::new().unwrap();
3632 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3633 let synchronous: i64 = store
3634 .conn
3635 .query_row("PRAGMA synchronous", [], |r| r.get(0))
3636 .unwrap();
3637 let cache_size: i64 = store
3638 .conn
3639 .query_row("PRAGMA cache_size", [], |r| r.get(0))
3640 .unwrap();
3641 let temp_store: i64 = store
3642 .conn
3643 .query_row("PRAGMA temp_store", [], |r| r.get(0))
3644 .unwrap();
3645 let wal_autocheckpoint: i64 = store
3646 .conn
3647 .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3648 .unwrap();
3649 assert_eq!(synchronous, 1);
3650 assert_eq!(cache_size, -65_536);
3651 assert_eq!(temp_store, 2);
3652 assert_eq!(wal_autocheckpoint, 1_000);
3653 assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3654 }
3655
3656 #[test]
3657 fn ensure_column_tolerates_duplicate_from_race() {
3658 let conn = Connection::open_in_memory().unwrap();
3659 conn.execute_batch("CREATE TABLE sessions (id TEXT, start_commit TEXT)")
3660 .unwrap();
3661 let err = conn
3662 .execute("ALTER TABLE sessions ADD COLUMN start_commit TEXT", [])
3663 .unwrap_err();
3664 assert!(column_was_added_by_race(&conn, "sessions", "start_commit", &err).unwrap());
3665 }
3666
3667 #[test]
3668 fn read_only_open_sets_query_only() {
3669 let dir = TempDir::new().unwrap();
3670 let db = dir.path().join("kaizen.db");
3671 Store::open(&db).unwrap();
3672 let store = Store::open_read_only(&db).unwrap();
3673 let query_only: i64 = store
3674 .conn
3675 .query_row("PRAGMA query_only", [], |r| r.get(0))
3676 .unwrap();
3677 assert_eq!(query_only, 1);
3678 }
3679
3680 #[test]
3681 fn phase0_indexes_exist() {
3682 let dir = TempDir::new().unwrap();
3683 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3684 for name in [
3685 "tool_spans_session_idx",
3686 "tool_spans_started_idx",
3687 "session_samples_ts_idx",
3688 "events_ts_idx",
3689 "feedback_session_idx",
3690 ] {
3691 let found: i64 = store
3692 .conn
3693 .query_row(
3694 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3695 params![name],
3696 |r| r.get(0),
3697 )
3698 .unwrap();
3699 assert_eq!(found, 1, "{name}");
3700 }
3701 }
3702
3703 #[test]
3704 fn upsert_and_get_session() {
3705 let dir = TempDir::new().unwrap();
3706 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3707 let s = make_session("s1");
3708 store.upsert_session(&s).unwrap();
3709
3710 let got = store.get_session("s1").unwrap().unwrap();
3711 assert_eq!(got.id, "s1");
3712 assert_eq!(got.status, SessionStatus::Done);
3713 }
3714
3715 #[test]
3716 fn append_and_list_events_round_trip() {
3717 let dir = TempDir::new().unwrap();
3718 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3719 let s = make_session("s2");
3720 store.upsert_session(&s).unwrap();
3721 store.append_event(&make_event("s2", 0)).unwrap();
3722 store.append_event(&make_event("s2", 1)).unwrap();
3723
3724 let sessions = store.list_sessions("/ws").unwrap();
3725 assert_eq!(sessions.len(), 1);
3726 assert_eq!(sessions[0].id, "s2");
3727 }
3728
3729 #[test]
3730 fn list_sessions_page_orders_and_counts() {
3731 let dir = TempDir::new().unwrap();
3732 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3733 let mut a = make_session("a");
3734 a.started_at_ms = 2_000;
3735 let mut b = make_session("b");
3736 b.started_at_ms = 2_000;
3737 let mut c = make_session("c");
3738 c.started_at_ms = 1_000;
3739 store.upsert_session(&c).unwrap();
3740 store.upsert_session(&b).unwrap();
3741 store.upsert_session(&a).unwrap();
3742
3743 let page = store
3744 .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3745 .unwrap();
3746 assert_eq!(page.total, 3);
3747 assert_eq!(page.next_offset, Some(2));
3748 assert_eq!(
3749 page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3750 vec!["a", "b"]
3751 );
3752
3753 let all = store.list_sessions("/ws").unwrap();
3754 assert_eq!(
3755 all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3756 vec!["a", "b", "c"]
3757 );
3758 }
3759
3760 #[test]
3761 fn list_sessions_page_filters_in_sql_shape() {
3762 let dir = TempDir::new().unwrap();
3763 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3764 let mut cursor = make_session("cursor");
3765 cursor.agent = "Cursor".into();
3766 cursor.started_at_ms = 2_000;
3767 cursor.status = SessionStatus::Running;
3768 let mut claude = make_session("claude");
3769 claude.agent = "claude".into();
3770 claude.started_at_ms = 3_000;
3771 store.upsert_session(&cursor).unwrap();
3772 store.upsert_session(&claude).unwrap();
3773
3774 let page = store
3775 .list_sessions_page(
3776 "/ws",
3777 0,
3778 10,
3779 SessionFilter {
3780 agent_prefix: Some("cur".into()),
3781 status: Some(SessionStatus::Running),
3782 since_ms: Some(1_500),
3783 },
3784 )
3785 .unwrap();
3786 assert_eq!(page.total, 1);
3787 assert_eq!(page.rows[0].id, "cursor");
3788 }
3789
3790 #[test]
3791 fn incremental_session_helpers_find_new_rows_and_statuses() {
3792 let dir = TempDir::new().unwrap();
3793 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3794 let mut old = make_session("old");
3795 old.started_at_ms = 1_000;
3796 let mut new = make_session("new");
3797 new.started_at_ms = 2_000;
3798 new.status = SessionStatus::Running;
3799 store.upsert_session(&old).unwrap();
3800 store.upsert_session(&new).unwrap();
3801
3802 let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3803 assert_eq!(rows.len(), 1);
3804 assert_eq!(rows[0].id, "new");
3805
3806 store
3807 .update_session_status("new", SessionStatus::Done)
3808 .unwrap();
3809 let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3810 assert_eq!(statuses.len(), 1);
3811 assert_eq!(statuses[0].status, SessionStatus::Done);
3812 }
3813
3814 #[test]
3815 fn summary_stats_empty() {
3816 let dir = TempDir::new().unwrap();
3817 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3818 let stats = store.summary_stats("/ws").unwrap();
3819 assert_eq!(stats.session_count, 0);
3820 assert_eq!(stats.total_cost_usd_e6, 0);
3821 }
3822
3823 #[test]
3824 fn summary_stats_counts_sessions() {
3825 let dir = TempDir::new().unwrap();
3826 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3827 store.upsert_session(&make_session("a")).unwrap();
3828 store.upsert_session(&make_session("b")).unwrap();
3829 let stats = store.summary_stats("/ws").unwrap();
3830 assert_eq!(stats.session_count, 2);
3831 assert_eq!(stats.by_agent.len(), 1);
3832 assert_eq!(stats.by_agent[0].0, "cursor");
3833 assert_eq!(stats.by_agent[0].1, 2);
3834 }
3835
3836 #[test]
3837 fn list_events_for_session_round_trip() {
3838 let dir = TempDir::new().unwrap();
3839 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3840 store.upsert_session(&make_session("s4")).unwrap();
3841 store.append_event(&make_event("s4", 0)).unwrap();
3842 store.append_event(&make_event("s4", 1)).unwrap();
3843 let events = store.list_events_for_session("s4").unwrap();
3844 assert_eq!(events.len(), 2);
3845 assert_eq!(events[0].seq, 0);
3846 assert_eq!(events[1].seq, 1);
3847 }
3848
3849 #[test]
3850 fn list_events_page_uses_inclusive_seq_cursor() {
3851 let dir = TempDir::new().unwrap();
3852 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3853 store.upsert_session(&make_session("paged")).unwrap();
3854 for seq in 0..5 {
3855 store.append_event(&make_event("paged", seq)).unwrap();
3856 }
3857 let first = store.list_events_page("paged", 0, 2).unwrap();
3858 assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3859 let second = store
3860 .list_events_page("paged", first[1].seq + 1, 2)
3861 .unwrap();
3862 assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3863 }
3864
3865 #[test]
3866 fn append_event_dedup() {
3867 let dir = TempDir::new().unwrap();
3868 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3869 store.upsert_session(&make_session("s5")).unwrap();
3870 store.append_event(&make_event("s5", 0)).unwrap();
3871 store.append_event(&make_event("s5", 0)).unwrap();
3873 let events = store.list_events_for_session("s5").unwrap();
3874 assert_eq!(events.len(), 1);
3875 }
3876
3877 #[test]
3878 fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3879 let dir = TempDir::new().unwrap();
3880 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3881 assert!(store.session_span_tree("missing").unwrap().is_empty());
3882 assert!(store.span_tree_cache.borrow().is_some());
3883
3884 store.upsert_session(&make_session("tree")).unwrap();
3885 let call = make_event("tree", 0);
3886 store.append_event(&call).unwrap();
3887 assert!(store.span_tree_cache.borrow().is_none());
3888 assert!(store.session_span_tree("tree").unwrap().is_empty());
3889 assert!(store.span_tree_cache.borrow().is_some());
3890 let mut result = make_event("tree", 1);
3891 result.kind = EventKind::ToolResult;
3892 result.tool_call_id = call.tool_call_id.clone();
3893 store.append_event(&result).unwrap();
3894 assert!(store.span_tree_cache.borrow().is_none());
3895 let first = store.session_span_tree("tree").unwrap();
3896 assert_eq!(first.len(), 1);
3897 assert!(store.span_tree_cache.borrow().is_some());
3898 store.append_event(&make_event("tree", 2)).unwrap();
3899 assert!(store.span_tree_cache.borrow().is_none());
3900 }
3901
3902 #[test]
3903 fn tool_spans_in_window_uses_started_then_ended_fallback() {
3904 let dir = TempDir::new().unwrap();
3905 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3906 store.upsert_session(&make_session("spans")).unwrap();
3907 for (id, started, ended) in [
3908 ("started", Some(200_i64), None),
3909 ("fallback", None, Some(250_i64)),
3910 ("outside", Some(400_i64), None),
3911 ("too_old", None, Some(50_i64)),
3912 ("started_wins", Some(500_i64), Some(200_i64)),
3913 ] {
3914 store
3915 .conn
3916 .execute(
3917 "INSERT INTO tool_spans
3918 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3919 VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3920 params![id, started, ended],
3921 )
3922 .unwrap();
3923 }
3924 let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3925 let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3926 assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3927 }
3928
3929 #[test]
3930 fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3931 let dir = TempDir::new().unwrap();
3932 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3933 store.upsert_session(&make_session("s1")).unwrap();
3934 for (id, started, ended) in [
3935 ("inside_started", Some(150_i64), None),
3936 ("inside_ended_only", None, Some(220_i64)),
3937 ("after_window", Some(400_i64), None),
3938 ("before_window", None, Some(50_i64)),
3939 ] {
3940 store
3941 .conn
3942 .execute(
3943 "INSERT INTO tool_spans
3944 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3945 VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3946 params![id, started, ended],
3947 )
3948 .unwrap();
3949 }
3950 let rows = store
3951 .tool_spans_sync_rows_in_window("/ws", 100, 300)
3952 .unwrap();
3953 let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
3954 assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
3955 assert!(rows.iter().all(|r| r.session_id == "s1"));
3956 }
3957
3958 #[test]
3959 fn upsert_idempotent() {
3960 let dir = TempDir::new().unwrap();
3961 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3962 let mut s = make_session("s3");
3963 store.upsert_session(&s).unwrap();
3964 s.status = SessionStatus::Running;
3965 store.upsert_session(&s).unwrap();
3966
3967 let got = store.get_session("s3").unwrap().unwrap();
3968 assert_eq!(got.status, SessionStatus::Running);
3969 }
3970
3971 #[test]
3972 fn append_event_indexes_path_from_payload() {
3973 let dir = TempDir::new().unwrap();
3974 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3975 store.upsert_session(&make_session("sx")).unwrap();
3976 let mut ev = make_event("sx", 0);
3977 ev.payload = json!({"input": {"path": "src/lib.rs"}});
3978 store.append_event(&ev).unwrap();
3979 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3980 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3981 }
3982
3983 #[test]
3984 fn update_session_status_changes_status() {
3985 let dir = TempDir::new().unwrap();
3986 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3987 store.upsert_session(&make_session("s6")).unwrap();
3988 store
3989 .update_session_status("s6", SessionStatus::Running)
3990 .unwrap();
3991 let got = store.get_session("s6").unwrap().unwrap();
3992 assert_eq!(got.status, SessionStatus::Running);
3993 }
3994
3995 #[test]
3996 fn prune_sessions_removes_old_rows_and_keeps_recent() {
3997 let dir = TempDir::new().unwrap();
3998 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3999 let mut old = make_session("old");
4000 old.started_at_ms = 1_000;
4001 let mut new = make_session("new");
4002 new.started_at_ms = 9_000_000_000_000;
4003 store.upsert_session(&old).unwrap();
4004 store.upsert_session(&new).unwrap();
4005 store.append_event(&make_event("old", 0)).unwrap();
4006
4007 let stats = store.prune_sessions_started_before(5_000).unwrap();
4008 assert_eq!(
4009 stats,
4010 PruneStats {
4011 sessions_removed: 1,
4012 events_removed: 1,
4013 }
4014 );
4015 assert!(store.get_session("old").unwrap().is_none());
4016 assert!(store.get_session("new").unwrap().is_some());
4017 let sessions = store.list_sessions("/ws").unwrap();
4018 assert_eq!(sessions.len(), 1);
4019 assert_eq!(sessions[0].id, "new");
4020 }
4021
4022 #[test]
4023 fn append_event_indexes_rules_from_payload() {
4024 let dir = TempDir::new().unwrap();
4025 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4026 store.upsert_session(&make_session("sr")).unwrap();
4027 let mut ev = make_event("sr", 0);
4028 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
4029 store.append_event(&ev).unwrap();
4030 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
4031 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
4032 }
4033
4034 #[test]
4035 fn guidance_report_counts_skill_and_rule_sessions() {
4036 let dir = TempDir::new().unwrap();
4037 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4038 store.upsert_session(&make_session("sx")).unwrap();
4039 let mut ev = make_event("sx", 0);
4040 ev.payload =
4041 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
4042 ev.cost_usd_e6 = Some(500_000);
4043 store.append_event(&ev).unwrap();
4044
4045 let mut skill_slugs = HashSet::new();
4046 skill_slugs.insert("tdd".into());
4047 let mut rule_slugs = HashSet::new();
4048 rule_slugs.insert("style".into());
4049
4050 let rep = store
4051 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
4052 .unwrap();
4053 assert_eq!(rep.sessions_in_window, 1);
4054 let tdd = rep
4055 .rows
4056 .iter()
4057 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
4058 .unwrap();
4059 assert_eq!(tdd.sessions, 1);
4060 assert!(tdd.on_disk);
4061 let style = rep
4062 .rows
4063 .iter()
4064 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
4065 .unwrap();
4066 assert_eq!(style.sessions, 1);
4067 assert!(style.on_disk);
4068 }
4069
4070 #[test]
4071 fn prune_sessions_removes_rules_used_rows() {
4072 let dir = TempDir::new().unwrap();
4073 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4074 let mut old = make_session("old_r");
4075 old.started_at_ms = 1_000;
4076 store.upsert_session(&old).unwrap();
4077 let mut ev = make_event("old_r", 0);
4078 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
4079 store.append_event(&ev).unwrap();
4080
4081 store.prune_sessions_started_before(5_000).unwrap();
4082 let n: i64 = store
4083 .conn
4084 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
4085 .unwrap();
4086 assert_eq!(n, 0);
4087 }
4088}