1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{
7 FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
8};
9use crate::store::event_index::index_event_derived;
10use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
11use crate::store::tool_span_index::{
12 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
13};
14use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
15use crate::sync::context::SyncIngestContext;
16use crate::sync::outbound::outbound_event_from_row;
17use crate::sync::redact::redact_payload;
18use crate::sync::smart::enqueue_tool_spans_for_session;
19use anyhow::{Context, Result};
20use rusqlite::types::Value;
21use rusqlite::{
22 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
23};
24use std::cell::RefCell;
25use std::collections::{HashMap, HashSet};
26use std::path::{Path, PathBuf};
27
28const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
31const DEFAULT_MMAP_MB: u64 = 256;
32const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
33 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
34 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
35 repo_file_count, repo_total_loc FROM sessions";
36const PAIN_HOTSPOTS_SQL: &str = "
37 SELECT f.path,
38 COUNT(s.id) * f.complexity_total AS value,
39 f.complexity_total,
40 f.churn_30d
41 FROM file_facts f
42 LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
43 LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
44 AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
45 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
46 LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
47 WHERE f.snapshot_id = ?1
48 GROUP BY f.path, f.complexity_total, f.churn_30d
49 ORDER BY value DESC, f.path ASC
50 LIMIT 10";
51const TOOL_RANK_ROWS_SQL: &str = "
52 WITH scoped AS (
53 SELECT COALESCE(ts.tool, 'unknown') AS tool,
54 ts.lead_time_ms,
55 COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
56 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
57 COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
58 FROM tool_spans ts
59 JOIN sessions s ON s.id = ts.session_id
60 WHERE s.workspace = ?1
61 AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
62 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
63 ),
64 agg AS (
65 SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
66 SUM(reasoning_tokens) AS total_reasoning_tokens
67 FROM scoped GROUP BY tool
68 ),
69 lat AS (
70 SELECT tool, lead_time_ms,
71 ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
72 COUNT(*) OVER (PARTITION BY tool) AS n
73 FROM scoped WHERE lead_time_ms IS NOT NULL
74 ),
75 pct AS (
76 SELECT tool,
77 MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
78 MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
79 FROM lat GROUP BY tool
80 )
81 SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
82 agg.total_tokens, agg.total_reasoning_tokens
83 FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
84
85const MIGRATIONS: &[&str] = &[
86 "CREATE TABLE IF NOT EXISTS sessions (
87 id TEXT PRIMARY KEY,
88 agent TEXT NOT NULL,
89 model TEXT,
90 workspace TEXT NOT NULL,
91 started_at_ms INTEGER NOT NULL,
92 ended_at_ms INTEGER,
93 status TEXT NOT NULL,
94 trace_path TEXT NOT NULL
95 )",
96 "CREATE TABLE IF NOT EXISTS events (
97 id INTEGER PRIMARY KEY AUTOINCREMENT,
98 session_id TEXT NOT NULL,
99 seq INTEGER NOT NULL,
100 ts_ms INTEGER NOT NULL,
101 kind TEXT NOT NULL,
102 source TEXT NOT NULL,
103 tool TEXT,
104 tokens_in INTEGER,
105 tokens_out INTEGER,
106 cost_usd_e6 INTEGER,
107 payload TEXT NOT NULL
108 )",
109 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
110 "CREATE TABLE IF NOT EXISTS files_touched (
111 id INTEGER PRIMARY KEY AUTOINCREMENT,
112 session_id TEXT NOT NULL,
113 path TEXT NOT NULL
114 )",
115 "CREATE TABLE IF NOT EXISTS skills_used (
116 id INTEGER PRIMARY KEY AUTOINCREMENT,
117 session_id TEXT NOT NULL,
118 skill TEXT NOT NULL
119 )",
120 "CREATE TABLE IF NOT EXISTS sync_outbox (
121 id INTEGER PRIMARY KEY AUTOINCREMENT,
122 session_id TEXT NOT NULL,
123 payload TEXT NOT NULL,
124 sent INTEGER NOT NULL DEFAULT 0
125 )",
126 "CREATE TABLE IF NOT EXISTS experiments (
127 id TEXT PRIMARY KEY,
128 name TEXT NOT NULL,
129 created_at_ms INTEGER NOT NULL,
130 metadata TEXT NOT NULL DEFAULT '{}'
131 )",
132 "CREATE TABLE IF NOT EXISTS experiment_tags (
133 experiment_id TEXT NOT NULL,
134 session_id TEXT NOT NULL,
135 variant TEXT NOT NULL,
136 PRIMARY KEY (experiment_id, session_id)
137 )",
138 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
139 "CREATE TABLE IF NOT EXISTS sync_state (
140 k TEXT PRIMARY KEY,
141 v TEXT NOT NULL
142 )",
143 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
144 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
145 "CREATE TABLE IF NOT EXISTS tool_spans (
146 span_id TEXT PRIMARY KEY,
147 session_id TEXT NOT NULL,
148 tool TEXT,
149 tool_call_id TEXT,
150 status TEXT NOT NULL,
151 started_at_ms INTEGER,
152 ended_at_ms INTEGER,
153 lead_time_ms INTEGER,
154 tokens_in INTEGER,
155 tokens_out INTEGER,
156 reasoning_tokens INTEGER,
157 cost_usd_e6 INTEGER,
158 paths_json TEXT NOT NULL DEFAULT '[]'
159 )",
160 "CREATE TABLE IF NOT EXISTS tool_span_paths (
161 span_id TEXT NOT NULL,
162 path TEXT NOT NULL,
163 PRIMARY KEY (span_id, path)
164 )",
165 "CREATE TABLE IF NOT EXISTS session_repo_binding (
166 session_id TEXT PRIMARY KEY,
167 start_commit TEXT,
168 end_commit TEXT,
169 branch TEXT,
170 dirty_start INTEGER,
171 dirty_end INTEGER,
172 repo_binding_source TEXT NOT NULL DEFAULT ''
173 )",
174 "CREATE TABLE IF NOT EXISTS repo_snapshots (
175 id TEXT PRIMARY KEY,
176 workspace TEXT NOT NULL,
177 head_commit TEXT,
178 dirty_fingerprint TEXT NOT NULL,
179 analyzer_version TEXT NOT NULL,
180 indexed_at_ms INTEGER NOT NULL,
181 dirty INTEGER NOT NULL DEFAULT 0,
182 graph_path TEXT NOT NULL
183 )",
184 "CREATE TABLE IF NOT EXISTS file_facts (
185 snapshot_id TEXT NOT NULL,
186 path TEXT NOT NULL,
187 language TEXT NOT NULL,
188 bytes INTEGER NOT NULL,
189 loc INTEGER NOT NULL,
190 sloc INTEGER NOT NULL,
191 complexity_total INTEGER NOT NULL,
192 max_fn_complexity INTEGER NOT NULL,
193 symbol_count INTEGER NOT NULL,
194 import_count INTEGER NOT NULL,
195 fan_in INTEGER NOT NULL,
196 fan_out INTEGER NOT NULL,
197 churn_30d INTEGER NOT NULL,
198 churn_90d INTEGER NOT NULL,
199 authors_90d INTEGER NOT NULL,
200 last_changed_ms INTEGER,
201 PRIMARY KEY (snapshot_id, path)
202 )",
203 "CREATE TABLE IF NOT EXISTS repo_edges (
204 snapshot_id TEXT NOT NULL,
205 from_id TEXT NOT NULL,
206 to_id TEXT NOT NULL,
207 kind TEXT NOT NULL,
208 weight INTEGER NOT NULL,
209 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
210 )",
211 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
213 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
215 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
216 ON sessions(workspace, started_at_ms DESC, id ASC)",
217 "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
218 ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
219 "CREATE TABLE IF NOT EXISTS rules_used (
220 id INTEGER PRIMARY KEY AUTOINCREMENT,
221 session_id TEXT NOT NULL,
222 rule TEXT NOT NULL
223 )",
224 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
225 "CREATE TABLE IF NOT EXISTS remote_pull_state (
227 id INTEGER PRIMARY KEY CHECK (id = 1),
228 query_provider TEXT NOT NULL DEFAULT 'none',
229 cursor_json TEXT NOT NULL DEFAULT '',
230 last_success_ms INTEGER
231 )",
232 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
233 "CREATE TABLE IF NOT EXISTS remote_sessions (
234 team_id TEXT NOT NULL,
235 workspace_hash TEXT NOT NULL,
236 session_id_hash TEXT NOT NULL,
237 json TEXT NOT NULL,
238 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
239 )",
240 "CREATE TABLE IF NOT EXISTS remote_events (
241 team_id TEXT NOT NULL,
242 workspace_hash TEXT NOT NULL,
243 session_id_hash TEXT NOT NULL,
244 event_seq INTEGER NOT NULL,
245 json TEXT NOT NULL,
246 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
247 )",
248 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
249 team_id TEXT NOT NULL,
250 workspace_hash TEXT NOT NULL,
251 span_id_hash TEXT NOT NULL,
252 json TEXT NOT NULL,
253 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
254 )",
255 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
256 team_id TEXT NOT NULL,
257 workspace_hash TEXT NOT NULL,
258 snapshot_id_hash TEXT NOT NULL,
259 chunk_index INTEGER NOT NULL,
260 json TEXT NOT NULL,
261 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
262 )",
263 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
264 team_id TEXT NOT NULL,
265 workspace_hash TEXT NOT NULL,
266 fact_key TEXT NOT NULL,
267 json TEXT NOT NULL,
268 PRIMARY KEY (team_id, workspace_hash, fact_key)
269 )",
270 "CREATE TABLE IF NOT EXISTS session_evals (
271 id TEXT PRIMARY KEY,
272 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
273 judge_model TEXT NOT NULL,
274 rubric_id TEXT NOT NULL,
275 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
276 rationale TEXT NOT NULL,
277 flagged INTEGER NOT NULL DEFAULT 0,
278 created_at_ms INTEGER NOT NULL
279 );
280 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
281 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
282 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
283 fingerprint TEXT PRIMARY KEY,
284 captured_at_ms INTEGER NOT NULL,
285 files_json TEXT NOT NULL,
286 total_bytes INTEGER NOT NULL
287 )",
288 "CREATE TABLE IF NOT EXISTS session_feedback (
289 id TEXT PRIMARY KEY,
290 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
291 score INTEGER CHECK(score BETWEEN 1 AND 5),
292 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
293 note TEXT,
294 created_at_ms INTEGER NOT NULL
295 );
296 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
297 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
298 "CREATE TABLE IF NOT EXISTS session_outcomes (
299 session_id TEXT PRIMARY KEY NOT NULL,
300 test_passed INTEGER,
301 test_failed INTEGER,
302 test_skipped INTEGER,
303 build_ok INTEGER,
304 lint_errors INTEGER,
305 revert_lines_14d INTEGER,
306 pr_open INTEGER,
307 ci_ok INTEGER,
308 measured_at_ms INTEGER NOT NULL,
309 measure_error TEXT
310 )",
311 "CREATE TABLE IF NOT EXISTS session_samples (
312 session_id TEXT NOT NULL,
313 ts_ms INTEGER NOT NULL,
314 pid INTEGER NOT NULL,
315 cpu_percent REAL,
316 rss_bytes INTEGER,
317 PRIMARY KEY (session_id, ts_ms, pid)
318 )",
319 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
320 "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
321 "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
322 "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
323 "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
324 "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
325 "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
326 "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
327 "CREATE INDEX IF NOT EXISTS events_tool_ts_session_seq_idx ON events(tool, ts_ms DESC, session_id, seq)",
328 "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
329 "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
330 "CREATE INDEX IF NOT EXISTS tool_span_paths_path_idx ON tool_span_paths(path, span_id)",
331 "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
332];
333
334#[derive(Clone)]
336pub struct InsightsStats {
337 pub total_sessions: u64,
338 pub running_sessions: u64,
339 pub total_events: u64,
340 pub sessions_by_day: Vec<(String, u64)>,
342 pub recent: Vec<(SessionRecord, u64)>,
344 pub top_tools: Vec<(String, u64)>,
346 pub total_cost_usd_e6: i64,
347 pub sessions_with_cost: u64,
348}
349
350pub struct SyncStatusSnapshot {
352 pub pending_outbox: u64,
353 pub last_success_ms: Option<u64>,
354 pub last_error: Option<String>,
355 pub consecutive_failures: u32,
356}
357
358#[derive(serde::Serialize)]
360pub struct SummaryStats {
361 pub session_count: u64,
362 pub total_cost_usd_e6: i64,
363 pub by_agent: Vec<(String, u64)>,
364 pub by_model: Vec<(String, u64)>,
365 pub top_tools: Vec<(String, u64)>,
366}
367
368#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
370#[serde(rename_all = "lowercase")]
371pub enum GuidanceKind {
372 Skill,
373 Rule,
374}
375
376#[derive(Clone, Debug, serde::Serialize)]
378pub struct GuidancePerfRow {
379 pub kind: GuidanceKind,
380 pub id: String,
381 pub sessions: u64,
382 pub sessions_pct: f64,
383 pub total_cost_usd_e6: i64,
384 pub avg_cost_per_session_usd: Option<f64>,
385 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
386 pub on_disk: bool,
387}
388
389#[derive(Clone, Debug, serde::Serialize)]
391pub struct GuidanceReport {
392 pub workspace: String,
393 pub window_start_ms: u64,
394 pub window_end_ms: u64,
395 pub sessions_in_window: u64,
396 pub workspace_avg_cost_per_session_usd: Option<f64>,
397 pub rows: Vec<GuidancePerfRow>,
398}
399
400#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
402pub struct PruneStats {
403 pub sessions_removed: u64,
404 pub events_removed: u64,
405}
406
407#[derive(Debug, Clone, Eq, PartialEq)]
409pub struct SessionOutcomeRow {
410 pub session_id: String,
411 pub test_passed: Option<i64>,
412 pub test_failed: Option<i64>,
413 pub test_skipped: Option<i64>,
414 pub build_ok: Option<bool>,
415 pub lint_errors: Option<i64>,
416 pub revert_lines_14d: Option<i64>,
417 pub pr_open: Option<i64>,
418 pub ci_ok: Option<bool>,
419 pub measured_at_ms: u64,
420 pub measure_error: Option<String>,
421}
422
423#[derive(Debug, Clone)]
425pub struct SessionSampleAgg {
426 pub session_id: String,
427 pub sample_count: u64,
428 pub max_cpu_percent: f64,
429 pub max_rss_bytes: u64,
430}
431
432pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
434pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
435pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
436
437pub struct ToolSpanSyncRow {
438 pub span_id: String,
439 pub session_id: String,
440 pub tool: Option<String>,
441 pub tool_call_id: Option<String>,
442 pub status: String,
443 pub started_at_ms: Option<u64>,
444 pub ended_at_ms: Option<u64>,
445 pub lead_time_ms: Option<u64>,
446 pub tokens_in: Option<u32>,
447 pub tokens_out: Option<u32>,
448 pub reasoning_tokens: Option<u32>,
449 pub cost_usd_e6: Option<i64>,
450 pub paths: Vec<String>,
451}
452
453#[derive(Debug, Clone, Copy, Eq, PartialEq)]
454pub enum StoreOpenMode {
455 ReadWrite,
456 ReadOnlyQuery,
457}
458
459#[derive(Debug, Clone)]
460pub struct SessionStatusRow {
461 pub id: String,
462 pub status: SessionStatus,
463 pub ended_at_ms: Option<u64>,
464}
465
466#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
467pub struct SessionFilter {
468 pub agent_prefix: Option<String>,
469 pub status: Option<SessionStatus>,
470 pub since_ms: Option<u64>,
471}
472
473#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
474pub struct SessionPage {
475 pub rows: Vec<SessionRecord>,
476 pub total: usize,
477 pub next_offset: Option<usize>,
478}
479
480#[derive(Clone)]
481struct SpanTreeCacheEntry {
482 session_id: String,
483 last_event_seq: Option<u64>,
484 nodes: Vec<crate::store::span_tree::SpanNode>,
485}
486
487pub struct Store {
488 conn: Connection,
489 root: PathBuf,
490 hot_log: RefCell<Option<HotLog>>,
491 search_writer: RefCell<Option<crate::search::PendingWriter>>,
492 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
493 projector: RefCell<Projector>,
494}
495
496impl Store {
497 pub(crate) fn conn(&self) -> &Connection {
498 &self.conn
499 }
500
501 pub fn open(path: &Path) -> Result<Self> {
502 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
503 }
504
505 pub fn open_read_only(path: &Path) -> Result<Self> {
506 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
507 }
508
509 pub fn open_query(path: &Path) -> Result<Self> {
510 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
511 }
512
513 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
514 if let Some(parent) = path.parent() {
515 std::fs::create_dir_all(parent)?;
516 }
517 let conn = match mode {
518 StoreOpenMode::ReadWrite => Connection::open(path),
519 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
520 path,
521 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
522 ),
523 }
524 .with_context(|| format!("open db: {}", path.display()))?;
525 apply_pragmas(&conn, mode)?;
526 if mode == StoreOpenMode::ReadWrite {
527 for sql in MIGRATIONS {
528 conn.execute_batch(sql)?;
529 }
530 ensure_schema_columns(&conn)?;
531 }
532 let store = Self {
533 conn,
534 root: path
535 .parent()
536 .unwrap_or_else(|| Path::new("."))
537 .to_path_buf(),
538 hot_log: RefCell::new(None),
539 search_writer: RefCell::new(None),
540 span_tree_cache: RefCell::new(None),
541 projector: RefCell::new(Projector::default()),
542 };
543 if mode == StoreOpenMode::ReadWrite {
544 store.warm_projector()?;
545 }
546 Ok(store)
547 }
548
549 fn invalidate_span_tree_cache(&self) {
550 *self.span_tree_cache.borrow_mut() = None;
551 }
552
553 fn warm_projector(&self) -> Result<()> {
554 let ids = self.running_session_ids()?;
555 let mut projector = self.projector.borrow_mut();
556 for id in ids {
557 for event in self.list_events_for_session(&id)? {
558 let _ = projector.apply(&event);
559 }
560 }
561 Ok(())
562 }
563
564 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
565 self.conn.execute(
566 "INSERT INTO sessions (
567 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
568 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
569 prompt_fingerprint, parent_session_id, agent_version, os, arch,
570 repo_file_count, repo_total_loc
571 )
572 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
573 ?16, ?17, ?18, ?19, ?20, ?21)
574 ON CONFLICT(id) DO UPDATE SET
575 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
576 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
577 status=excluded.status, trace_path=excluded.trace_path,
578 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
579 branch=excluded.branch, dirty_start=excluded.dirty_start,
580 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
581 prompt_fingerprint=excluded.prompt_fingerprint,
582 parent_session_id=excluded.parent_session_id,
583 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
584 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
585 params![
586 s.id,
587 s.agent,
588 s.model,
589 s.workspace,
590 s.started_at_ms as i64,
591 s.ended_at_ms.map(|v| v as i64),
592 format!("{:?}", s.status),
593 s.trace_path,
594 s.start_commit,
595 s.end_commit,
596 s.branch,
597 s.dirty_start.map(bool_to_i64),
598 s.dirty_end.map(bool_to_i64),
599 s.repo_binding_source.clone().unwrap_or_default(),
600 s.prompt_fingerprint.as_deref(),
601 s.parent_session_id.as_deref(),
602 s.agent_version.as_deref(),
603 s.os.as_deref(),
604 s.arch.as_deref(),
605 s.repo_file_count.map(|v| v as i64),
606 s.repo_total_loc.map(|v| v as i64),
607 ],
608 )?;
609 self.conn.execute(
610 "INSERT INTO session_repo_binding (
611 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
612 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
613 ON CONFLICT(session_id) DO UPDATE SET
614 start_commit=excluded.start_commit,
615 end_commit=excluded.end_commit,
616 branch=excluded.branch,
617 dirty_start=excluded.dirty_start,
618 dirty_end=excluded.dirty_end,
619 repo_binding_source=excluded.repo_binding_source",
620 params![
621 s.id,
622 s.start_commit,
623 s.end_commit,
624 s.branch,
625 s.dirty_start.map(bool_to_i64),
626 s.dirty_end.map(bool_to_i64),
627 s.repo_binding_source.clone().unwrap_or_default(),
628 ],
629 )?;
630 Ok(())
631 }
632
633 pub fn ensure_session_stub(
636 &self,
637 id: &str,
638 agent: &str,
639 workspace: &str,
640 started_at_ms: u64,
641 ) -> Result<()> {
642 self.conn.execute(
643 "INSERT OR IGNORE INTO sessions (
644 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
645 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
646 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
647 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
648 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
649 params![id, agent, workspace, started_at_ms as i64],
650 )?;
651 Ok(())
652 }
653
654 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
656 let n: i64 = self.conn.query_row(
657 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
658 [session_id],
659 |r| r.get(0),
660 )?;
661 Ok(n as u64)
662 }
663
664 pub fn append_event(&self, e: &Event) -> Result<()> {
665 self.append_event_with_sync(e, None)
666 }
667
668 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
670 let last_before = if projector_legacy_mode() {
671 None
672 } else {
673 self.last_event_seq_for_session(&e.session_id)?
674 };
675 let payload = serde_json::to_string(&e.payload)?;
676 self.conn.execute(
677 "INSERT INTO events (
678 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
679 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
680 stop_reason, latency_ms, ttft_ms, retry_count,
681 context_used_tokens, context_max_tokens,
682 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
683 ) VALUES (
684 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
685 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
686 )
687 ON CONFLICT(session_id, seq) DO UPDATE SET
688 ts_ms = excluded.ts_ms,
689 ts_exact = excluded.ts_exact,
690 kind = excluded.kind,
691 source = excluded.source,
692 tool = excluded.tool,
693 tool_call_id = excluded.tool_call_id,
694 tokens_in = excluded.tokens_in,
695 tokens_out = excluded.tokens_out,
696 reasoning_tokens = excluded.reasoning_tokens,
697 cost_usd_e6 = excluded.cost_usd_e6,
698 payload = excluded.payload,
699 stop_reason = excluded.stop_reason,
700 latency_ms = excluded.latency_ms,
701 ttft_ms = excluded.ttft_ms,
702 retry_count = excluded.retry_count,
703 context_used_tokens = excluded.context_used_tokens,
704 context_max_tokens = excluded.context_max_tokens,
705 cache_creation_tokens = excluded.cache_creation_tokens,
706 cache_read_tokens = excluded.cache_read_tokens,
707 system_prompt_tokens = excluded.system_prompt_tokens",
708 params![
709 e.session_id,
710 e.seq as i64,
711 e.ts_ms as i64,
712 bool_to_i64(e.ts_exact),
713 format!("{:?}", e.kind),
714 format!("{:?}", e.source),
715 e.tool,
716 e.tool_call_id,
717 e.tokens_in.map(|v| v as i64),
718 e.tokens_out.map(|v| v as i64),
719 e.reasoning_tokens.map(|v| v as i64),
720 e.cost_usd_e6,
721 payload,
722 e.stop_reason,
723 e.latency_ms.map(|v| v as i64),
724 e.ttft_ms.map(|v| v as i64),
725 e.retry_count.map(|v| v as i64),
726 e.context_used_tokens.map(|v| v as i64),
727 e.context_max_tokens.map(|v| v as i64),
728 e.cache_creation_tokens.map(|v| v as i64),
729 e.cache_read_tokens.map(|v| v as i64),
730 e.system_prompt_tokens.map(|v| v as i64),
731 ],
732 )?;
733 if self.conn.changes() == 0 {
734 return Ok(());
735 }
736 self.append_hot_event(e)?;
737 if projector_legacy_mode() {
738 index_event_derived(&self.conn, e)?;
739 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
740 self.invalidate_span_tree_cache();
741 } else if last_before.is_some_and(|last| e.seq <= last) {
742 self.replay_projector_session(&e.session_id)?;
743 } else {
744 let deltas = self.projector.borrow_mut().apply(e);
745 self.apply_projector_events(&deltas)?;
746 let expired = self
747 .projector
748 .borrow_mut()
749 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
750 self.apply_projector_events(&expired)?;
751 if is_stop_event(e) {
752 let flushed = self
753 .projector
754 .borrow_mut()
755 .flush_session(&e.session_id, e.ts_ms);
756 self.apply_projector_events(&flushed)?;
757 }
758 self.invalidate_span_tree_cache();
759 }
760 self.append_search_event(e);
761 let Some(ctx) = ctx else {
762 return Ok(());
763 };
764 let sync = &ctx.sync;
765 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
766 return Ok(());
767 }
768 let Some(salt) = try_team_salt(sync) else {
769 tracing::warn!(
770 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
771 );
772 return Ok(());
773 };
774 if sync.sample_rate < 1.0 {
775 let u: f64 = rand::random();
776 if u > sync.sample_rate {
777 return Ok(());
778 }
779 }
780 let Some(session) = self.get_session(&e.session_id)? else {
781 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
782 return Ok(());
783 };
784 let mut outbound = outbound_event_from_row(e, &session, &salt);
785 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
786 let row = serde_json::to_string(&outbound)?;
787 self.outbox()?.append(&e.session_id, "events", &row)?;
788 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
789 Ok(())
790 }
791
792 fn append_hot_event(&self, e: &Event) -> Result<()> {
793 if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
794 return Ok(());
795 }
796 let mut slot = self.hot_log.borrow_mut();
797 if slot.is_none() {
798 *slot = Some(HotLog::open(&self.root)?);
799 }
800 if let Some(log) = slot.as_mut() {
801 log.append(e)?;
802 }
803 Ok(())
804 }
805
806 fn append_search_event(&self, e: &Event) {
807 if let Err(err) = self.try_append_search_event(e) {
808 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
809 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
810 }
811 }
812
813 fn try_append_search_event(&self, e: &Event) -> Result<()> {
814 let Some(session) = self.get_session(&e.session_id)? else {
815 return Ok(());
816 };
817 let workspace = PathBuf::from(&session.workspace);
818 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
819 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
820 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
821 return Ok(());
822 };
823 let mut slot = self.search_writer.borrow_mut();
824 if slot.is_none() {
825 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
826 }
827 slot.as_mut().expect("writer").add(&doc)
828 }
829
830 pub fn flush_search(&self) -> Result<()> {
831 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
832 writer.commit()?;
833 }
834 Ok(())
835 }
836
837 fn outbox(&self) -> Result<Outbox> {
838 Outbox::open(&self.root)
839 }
840
841 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
842 if projector_legacy_mode() {
843 rebuild_tool_spans_for_session(&self.conn, session_id)?;
844 self.invalidate_span_tree_cache();
845 return Ok(());
846 }
847 let deltas = self
848 .projector
849 .borrow_mut()
850 .flush_session(session_id, now_ms);
851 if self.apply_projector_events(&deltas)? {
852 self.invalidate_span_tree_cache();
853 }
854 Ok(())
855 }
856
857 fn replay_projector_session(&self, session_id: &str) -> Result<()> {
858 clear_session_spans(&self.conn, session_id)?;
859 self.projector.borrow_mut().reset_session(session_id);
860 let events = self.list_events_for_session(session_id)?;
861 let mut changed = false;
862 for event in &events {
863 let deltas = self.projector.borrow_mut().apply(event);
864 changed |= self.apply_projector_events(&deltas)?;
865 }
866 if self
867 .get_session(session_id)?
868 .is_some_and(|session| session.status == SessionStatus::Done)
869 {
870 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
871 let deltas = self
872 .projector
873 .borrow_mut()
874 .flush_session(session_id, now_ms);
875 changed |= self.apply_projector_events(&deltas)?;
876 }
877 if changed {
878 self.invalidate_span_tree_cache();
879 }
880 Ok(())
881 }
882
883 fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
884 let mut changed = false;
885 for delta in deltas {
886 match delta {
887 ProjectorEvent::SpanClosed(span, sample) => {
888 upsert_tool_span_record(&self.conn, span)?;
889 tracing::debug!(
890 session_id = %sample.session_id,
891 span_id = %sample.span_id,
892 tool = ?sample.tool,
893 lead_time_ms = ?sample.lead_time_ms,
894 tokens_in = ?sample.tokens_in,
895 tokens_out = ?sample.tokens_out,
896 reasoning_tokens = ?sample.reasoning_tokens,
897 cost_usd_e6 = ?sample.cost_usd_e6,
898 paths = ?sample.paths,
899 "tool span closed"
900 );
901 changed = true;
902 }
903 ProjectorEvent::SpanPatched(span) => {
904 upsert_tool_span_record(&self.conn, span)?;
905 changed = true;
906 }
907 ProjectorEvent::FileTouched { session, path } => {
908 self.conn.execute(
909 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
910 params![session, path],
911 )?;
912 changed = true;
913 }
914 ProjectorEvent::SkillUsed { session, skill } => {
915 self.conn.execute(
916 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
917 params![session, skill],
918 )?;
919 changed = true;
920 }
921 ProjectorEvent::RuleUsed { session, rule } => {
922 self.conn.execute(
923 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
924 params![session, rule],
925 )?;
926 changed = true;
927 }
928 }
929 }
930 Ok(changed)
931 }
932
933 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
934 let rows = self.outbox()?.list_pending(limit)?;
935 if !rows.is_empty() {
936 return Ok(rows);
937 }
938 let mut stmt = self.conn.prepare(
939 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
940 )?;
941 let rows = stmt.query_map(params![limit as i64], |row| {
942 Ok((
943 row.get::<_, i64>(0)?,
944 row.get::<_, String>(1)?,
945 row.get::<_, String>(2)?,
946 ))
947 })?;
948 let mut out = Vec::new();
949 for r in rows {
950 out.push(r?);
951 }
952 Ok(out)
953 }
954
955 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
956 self.outbox()?.delete_ids(ids)?;
957 for id in ids {
958 self.conn
959 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
960 }
961 Ok(())
962 }
963
964 pub fn replace_outbox_rows(
965 &self,
966 owner_id: &str,
967 kind: &str,
968 payloads: &[String],
969 ) -> Result<()> {
970 self.outbox()?.replace(owner_id, kind, payloads)?;
971 self.conn.execute(
972 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
973 params![owner_id, kind],
974 )?;
975 for payload in payloads {
976 self.conn.execute(
977 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
978 params![owner_id, kind, payload],
979 )?;
980 }
981 Ok(())
982 }
983
984 pub fn outbox_pending_count(&self) -> Result<u64> {
985 let redb = self.outbox()?.pending_count()?;
986 if redb > 0 {
987 return Ok(redb);
988 }
989 let c: i64 =
990 self.conn
991 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
992 r.get(0)
993 })?;
994 Ok(c as u64)
995 }
996
997 pub fn set_sync_state_ok(&self) -> Result<()> {
998 let now = now_ms().to_string();
999 self.conn.execute(
1000 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
1001 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1002 params![now],
1003 )?;
1004 self.conn.execute(
1005 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
1006 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1007 [],
1008 )?;
1009 self.conn
1010 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
1011 Ok(())
1012 }
1013
1014 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
1015 let prev: i64 = self
1016 .conn
1017 .query_row(
1018 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1019 [],
1020 |r| {
1021 let s: String = r.get(0)?;
1022 Ok(s.parse::<i64>().unwrap_or(0))
1023 },
1024 )
1025 .optional()?
1026 .unwrap_or(0);
1027 let next = prev.saturating_add(1);
1028 self.conn.execute(
1029 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
1030 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1031 params![msg],
1032 )?;
1033 self.conn.execute(
1034 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
1035 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1036 params![next.to_string()],
1037 )?;
1038 Ok(())
1039 }
1040
1041 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
1042 let pending_outbox = self.outbox_pending_count()?;
1043 let last_success_ms = self
1044 .conn
1045 .query_row(
1046 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
1047 [],
1048 |r| r.get::<_, String>(0),
1049 )
1050 .optional()?
1051 .and_then(|s| s.parse().ok());
1052 let last_error = self
1053 .conn
1054 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1055 r.get::<_, String>(0)
1056 })
1057 .optional()?;
1058 let consecutive_failures = self
1059 .conn
1060 .query_row(
1061 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1062 [],
1063 |r| r.get::<_, String>(0),
1064 )
1065 .optional()?
1066 .and_then(|s| s.parse().ok())
1067 .unwrap_or(0);
1068 Ok(SyncStatusSnapshot {
1069 pending_outbox,
1070 last_success_ms,
1071 last_error,
1072 consecutive_failures,
1073 })
1074 }
1075
1076 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1077 let row: Option<String> = self
1078 .conn
1079 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1080 r.get::<_, String>(0)
1081 })
1082 .optional()?;
1083 Ok(row.and_then(|s| s.parse().ok()))
1084 }
1085
1086 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1087 self.conn.execute(
1088 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1089 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1090 params![key, v.to_string()],
1091 )?;
1092 Ok(())
1093 }
1094
1095 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1097 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1098 let old_ids = old_session_ids(&tx, cutoff_ms)?;
1099 let sessions_to_remove: i64 = tx.query_row(
1100 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1101 params![cutoff_ms],
1102 |r| r.get(0),
1103 )?;
1104 let events_to_remove: i64 = tx.query_row(
1105 "SELECT COUNT(*) FROM events WHERE session_id IN \
1106 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1107 params![cutoff_ms],
1108 |r| r.get(0),
1109 )?;
1110
1111 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1112 tx.execute(
1113 &format!(
1114 "DELETE FROM tool_span_paths WHERE span_id IN \
1115 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1116 ),
1117 params![cutoff_ms],
1118 )?;
1119 tx.execute(
1120 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1121 params![cutoff_ms],
1122 )?;
1123 tx.execute(
1124 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1125 params![cutoff_ms],
1126 )?;
1127 tx.execute(
1128 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1129 params![cutoff_ms],
1130 )?;
1131 tx.execute(
1132 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1133 params![cutoff_ms],
1134 )?;
1135 tx.execute(
1136 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1137 params![cutoff_ms],
1138 )?;
1139 tx.execute(
1140 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1141 params![cutoff_ms],
1142 )?;
1143 tx.execute(
1144 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1145 params![cutoff_ms],
1146 )?;
1147 tx.execute(
1148 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1149 params![cutoff_ms],
1150 )?;
1151 tx.execute(
1152 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1153 params![cutoff_ms],
1154 )?;
1155 tx.execute(
1156 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1157 params![cutoff_ms],
1158 )?;
1159 tx.execute(
1160 "DELETE FROM sessions WHERE started_at_ms < ?1",
1161 params![cutoff_ms],
1162 )?;
1163 tx.commit()?;
1164 if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1165 let _ = writer.commit();
1166 }
1167 if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1168 tracing::warn!("search prune skipped: {err:#}");
1169 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1170 }
1171 self.invalidate_span_tree_cache();
1172 Ok(PruneStats {
1173 sessions_removed: sessions_to_remove as u64,
1174 events_removed: events_to_remove as u64,
1175 })
1176 }
1177
1178 pub fn vacuum(&self) -> Result<()> {
1180 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1181 Ok(())
1182 }
1183
1184 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1185 Ok(self
1186 .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1187 .rows)
1188 }
1189
1190 pub fn list_sessions_page(
1191 &self,
1192 workspace: &str,
1193 offset: usize,
1194 limit: usize,
1195 filter: SessionFilter,
1196 ) -> Result<SessionPage> {
1197 let (where_sql, args) = session_filter_sql(workspace, &filter);
1198 let total = self.query_session_page_count(&where_sql, &args)?;
1199 let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1200 let next = offset.saturating_add(rows.len());
1201 Ok(SessionPage {
1202 rows,
1203 total,
1204 next_offset: (next < total).then_some(next),
1205 })
1206 }
1207
1208 fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1209 let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1210 let total: i64 = self
1211 .conn
1212 .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1213 Ok(total as usize)
1214 }
1215
1216 fn query_session_page_rows(
1217 &self,
1218 where_sql: &str,
1219 args: &[Value],
1220 offset: usize,
1221 limit: usize,
1222 ) -> Result<Vec<SessionRecord>> {
1223 let sql = format!(
1224 "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1225 );
1226 let mut values = args.to_vec();
1227 values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1228 values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1229 let mut stmt = self.conn.prepare(&sql)?;
1230 let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1231 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1232 }
1233
1234 pub fn list_sessions_started_after(
1235 &self,
1236 workspace: &str,
1237 after_started_at_ms: u64,
1238 ) -> Result<Vec<SessionRecord>> {
1239 let mut stmt = self.conn.prepare(
1240 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1241 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1242 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1243 repo_file_count, repo_total_loc
1244 FROM sessions
1245 WHERE workspace = ?1 AND started_at_ms > ?2
1246 ORDER BY started_at_ms DESC, id ASC",
1247 )?;
1248 let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1249 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1250 }
1251
1252 pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1253 if ids.is_empty() {
1254 return Ok(Vec::new());
1255 }
1256 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1257 let sql =
1258 format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1259 let mut stmt = self.conn.prepare(&sql)?;
1260 let params: Vec<&dyn rusqlite::ToSql> =
1261 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1262 let rows = stmt.query_map(params.as_slice(), |r| {
1263 let status: String = r.get(1)?;
1264 Ok(SessionStatusRow {
1265 id: r.get(0)?,
1266 status: status_from_str(&status),
1267 ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1268 })
1269 })?;
1270 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1271 }
1272
1273 fn running_session_ids(&self) -> Result<Vec<String>> {
1274 let mut stmt = self
1275 .conn
1276 .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1277 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1278 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1279 }
1280
1281 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1282 let session_count: i64 = self.conn.query_row(
1283 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1284 params![workspace],
1285 |r| r.get(0),
1286 )?;
1287
1288 let total_cost: i64 = self.conn.query_row(
1289 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1290 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1291 params![workspace],
1292 |r| r.get(0),
1293 )?;
1294
1295 let mut stmt = self.conn.prepare(
1296 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1297 )?;
1298 let by_agent: Vec<(String, u64)> = stmt
1299 .query_map(params![workspace], |r| {
1300 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1301 })?
1302 .filter_map(|r| r.ok())
1303 .collect();
1304
1305 let mut stmt = self.conn.prepare(
1306 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1307 )?;
1308 let by_model: Vec<(String, u64)> = stmt
1309 .query_map(params![workspace], |r| {
1310 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1311 })?
1312 .filter_map(|r| r.ok())
1313 .collect();
1314
1315 let mut stmt = self.conn.prepare(
1316 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1317 WHERE s.workspace = ?1 AND tool IS NOT NULL
1318 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1319 )?;
1320 let top_tools: Vec<(String, u64)> = stmt
1321 .query_map(params![workspace], |r| {
1322 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1323 })?
1324 .filter_map(|r| r.ok())
1325 .collect();
1326
1327 Ok(SummaryStats {
1328 session_count: session_count as u64,
1329 total_cost_usd_e6: total_cost,
1330 by_agent,
1331 by_model,
1332 top_tools,
1333 })
1334 }
1335
1336 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1337 self.list_events_page(session_id, 0, i64::MAX as usize)
1338 }
1339
1340 pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1341 let mut stmt = self.conn.prepare(
1342 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1343 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1344 stop_reason, latency_ms, ttft_ms, retry_count,
1345 context_used_tokens, context_max_tokens,
1346 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1347 FROM events WHERE session_id = ?1 AND seq = ?2",
1348 )?;
1349 stmt.query_row(params![session_id, seq as i64], event_row)
1350 .optional()
1351 .map_err(Into::into)
1352 }
1353
1354 pub fn search_tool_events(
1355 &self,
1356 workspace: &str,
1357 tool: &str,
1358 since_ms: Option<u64>,
1359 agent: Option<&str>,
1360 limit: usize,
1361 ) -> Result<Vec<(String, Event)>> {
1362 let mut stmt = self.conn.prepare(
1363 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1364 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1365 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1366 e.context_used_tokens, e.context_max_tokens,
1367 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
1368 s.agent
1369 FROM events e JOIN sessions s ON s.id = e.session_id
1370 WHERE e.tool = ?2
1371 AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
1372 AND (?3 IS NULL OR e.ts_ms >= ?3)
1373 AND (?4 IS NULL OR s.agent = ?4)
1374 ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
1375 LIMIT ?5",
1376 )?;
1377 let since = since_ms.map(|v| v as i64);
1378 let rows = stmt.query_map(
1379 params![workspace, tool, since, agent, limit as i64],
1380 search_tool_event_row,
1381 )?;
1382 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1383 }
1384
1385 pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1386 let mut out = Vec::new();
1387 for session in self.list_sessions(workspace)? {
1388 for event in self.list_events_for_session(&session.id)? {
1389 out.push((session.clone(), event));
1390 }
1391 }
1392 out.sort_by(|a, b| {
1393 (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1394 });
1395 Ok(out)
1396 }
1397
1398 pub fn list_events_page(
1399 &self,
1400 session_id: &str,
1401 after_seq: u64,
1402 limit: usize,
1403 ) -> Result<Vec<Event>> {
1404 let mut stmt = self.conn.prepare(
1405 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1406 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1407 stop_reason, latency_ms, ttft_ms, retry_count,
1408 context_used_tokens, context_max_tokens,
1409 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1410 FROM events
1411 WHERE session_id = ?1 AND seq >= ?2
1412 ORDER BY seq ASC LIMIT ?3",
1413 )?;
1414 let rows = stmt.query_map(
1415 params![
1416 session_id,
1417 after_seq as i64,
1418 limit.min(i64::MAX as usize) as i64
1419 ],
1420 event_row,
1421 )?;
1422 let mut events = Vec::new();
1423 for row in rows {
1424 events.push(row?);
1425 }
1426 Ok(events)
1427 }
1428
1429 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1431 self.conn.execute(
1432 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1433 params![format!("{:?}", status), id],
1434 )?;
1435 Ok(())
1436 }
1437
1438 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1440 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1441 Ok(InsightsStats {
1442 total_sessions: count_q(
1443 &self.conn,
1444 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1445 workspace,
1446 )?,
1447 running_sessions: count_q(
1448 &self.conn,
1449 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1450 workspace,
1451 )?,
1452 total_events: count_q(
1453 &self.conn,
1454 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1455 workspace,
1456 )?,
1457 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1458 recent: recent_sessions_3(&self.conn, workspace)?,
1459 top_tools: top_tools_5(&self.conn, workspace)?,
1460 total_cost_usd_e6,
1461 sessions_with_cost,
1462 })
1463 }
1464
1465 pub fn retro_events_in_window(
1467 &self,
1468 workspace: &str,
1469 start_ms: u64,
1470 end_ms: u64,
1471 ) -> Result<Vec<(SessionRecord, Event)>> {
1472 let mut stmt = self.conn.prepare(
1473 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1474 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1475 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1476 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1477 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1478 s.repo_file_count, s.repo_total_loc,
1479 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1480 e.context_used_tokens, e.context_max_tokens,
1481 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1482 FROM events e
1483 JOIN sessions s ON s.id = e.session_id
1484 WHERE s.workspace = ?1
1485 AND (
1486 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1487 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1488 )
1489 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1490 )?;
1491 let rows = stmt.query_map(
1492 params![
1493 workspace,
1494 start_ms as i64,
1495 end_ms as i64,
1496 SYNTHETIC_TS_CEILING_MS,
1497 ],
1498 |row| {
1499 let payload_str: String = row.get(12)?;
1500 let status_str: String = row.get(19)?;
1501 Ok((
1502 SessionRecord {
1503 id: row.get(13)?,
1504 agent: row.get(14)?,
1505 model: row.get(15)?,
1506 workspace: row.get(16)?,
1507 started_at_ms: row.get::<_, i64>(17)? as u64,
1508 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1509 status: status_from_str(&status_str),
1510 trace_path: row.get(20)?,
1511 start_commit: row.get(21)?,
1512 end_commit: row.get(22)?,
1513 branch: row.get(23)?,
1514 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1515 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1516 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1517 prompt_fingerprint: row.get(27)?,
1518 parent_session_id: row.get(28)?,
1519 agent_version: row.get(29)?,
1520 os: row.get(30)?,
1521 arch: row.get(31)?,
1522 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1523 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1524 },
1525 Event {
1526 session_id: row.get(0)?,
1527 seq: row.get::<_, i64>(1)? as u64,
1528 ts_ms: row.get::<_, i64>(2)? as u64,
1529 ts_exact: row.get::<_, i64>(3)? != 0,
1530 kind: kind_from_str(&row.get::<_, String>(4)?),
1531 source: source_from_str(&row.get::<_, String>(5)?),
1532 tool: row.get(6)?,
1533 tool_call_id: row.get(7)?,
1534 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1535 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1536 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1537 cost_usd_e6: row.get(11)?,
1538 payload: serde_json::from_str(&payload_str)
1539 .unwrap_or(serde_json::Value::Null),
1540 stop_reason: row.get(34)?,
1541 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1542 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1543 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1544 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1545 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1546 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1547 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1548 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1549 },
1550 ))
1551 },
1552 )?;
1553
1554 let mut out = Vec::new();
1555 for r in rows {
1556 out.push(r?);
1557 }
1558 Ok(out)
1559 }
1560
1561 pub fn experiment_metric_values_in_window(
1562 &self,
1563 workspace: &str,
1564 start_ms: u64,
1565 end_ms: u64,
1566 metric: crate::experiment::types::Metric,
1567 ) -> Result<Vec<(SessionRecord, f64)>> {
1568 use crate::experiment::types::Metric;
1569 let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1570 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1571 s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1572 s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1573 let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1574 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1575 let sql = match metric {
1576 Metric::TokensPerSession => format!(
1577 "SELECT {session_cols},
1578 SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1579 FROM sessions s JOIN events e ON e.session_id = s.id
1580 WHERE {window}
1581 GROUP BY s.id"
1582 ),
1583 Metric::CostPerSession => format!(
1584 "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1585 FROM sessions s JOIN events e ON e.session_id = s.id
1586 WHERE {window}
1587 GROUP BY s.id"
1588 ),
1589 Metric::SuccessRate => format!(
1590 "SELECT {session_cols},
1591 CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1592 FROM sessions s JOIN events e ON e.session_id = s.id
1593 WHERE {window}
1594 GROUP BY s.id"
1595 ),
1596 Metric::DurationMinutes => format!(
1597 "SELECT {session_cols},
1598 (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1599 FROM sessions s
1600 WHERE s.workspace = ?1
1601 AND s.ended_at_ms IS NOT NULL
1602 AND EXISTS (
1603 SELECT 1 FROM events e
1604 WHERE e.session_id = s.id
1605 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1606 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1607 )"
1608 ),
1609 Metric::FilesPerSession => format!(
1610 "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1611 FROM sessions s
1612 JOIN events e ON e.session_id = s.id
1613 LEFT JOIN files_touched ft ON ft.session_id = s.id
1614 WHERE {window}
1615 GROUP BY s.id"
1616 ),
1617 Metric::SuccessRateByPrompt => format!(
1618 "SELECT {session_cols},
1619 1.0 - (MIN(
1620 SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1621 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1622 ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1623 FROM sessions s JOIN events e ON e.session_id = s.id
1624 WHERE {window}
1625 GROUP BY s.id
1626 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1627 ),
1628 Metric::CostByPrompt => format!(
1629 "SELECT {session_cols},
1630 SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1631 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1632 FROM sessions s JOIN events e ON e.session_id = s.id
1633 WHERE {window}
1634 GROUP BY s.id
1635 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1636 ),
1637 Metric::ToolLoops => format!(
1638 "WITH calls AS (
1639 SELECT e.session_id, e.tool,
1640 LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1641 FROM events e JOIN sessions s ON s.id = e.session_id
1642 WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1643 )
1644 SELECT {session_cols},
1645 SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1646 FROM sessions s JOIN calls ON calls.session_id = s.id
1647 GROUP BY s.id"
1648 ),
1649 };
1650 let mut stmt = self.conn.prepare(&sql)?;
1651 let rows = stmt.query_map(
1652 params![
1653 workspace,
1654 start_ms as i64,
1655 end_ms as i64,
1656 SYNTHETIC_TS_CEILING_MS,
1657 ],
1658 |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1659 )?;
1660 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1661 }
1662
1663 pub fn files_touched_in_window(
1665 &self,
1666 workspace: &str,
1667 start_ms: u64,
1668 end_ms: u64,
1669 ) -> Result<Vec<(String, String)>> {
1670 let mut stmt = self.conn.prepare(
1671 "SELECT DISTINCT ft.session_id, ft.path
1672 FROM files_touched ft
1673 JOIN sessions s ON s.id = ft.session_id
1674 WHERE s.workspace = ?1
1675 AND EXISTS (
1676 SELECT 1 FROM events e
1677 JOIN sessions ss ON ss.id = e.session_id
1678 WHERE e.session_id = ft.session_id
1679 AND (
1680 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1681 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1682 )
1683 )
1684 ORDER BY ft.session_id, ft.path",
1685 )?;
1686 let out: Vec<(String, String)> = stmt
1687 .query_map(
1688 params![
1689 workspace,
1690 start_ms as i64,
1691 end_ms as i64,
1692 SYNTHETIC_TS_CEILING_MS,
1693 ],
1694 |r| Ok((r.get(0)?, r.get(1)?)),
1695 )?
1696 .filter_map(|r| r.ok())
1697 .collect();
1698 Ok(out)
1699 }
1700
1701 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1704 let mut stmt = self.conn.prepare(
1705 "SELECT DISTINCT su.skill
1706 FROM skills_used su
1707 JOIN sessions s ON s.id = su.session_id
1708 WHERE s.workspace = ?1
1709 AND EXISTS (
1710 SELECT 1 FROM events e
1711 JOIN sessions ss ON ss.id = e.session_id
1712 WHERE e.session_id = su.session_id
1713 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1714 )
1715 ORDER BY su.skill",
1716 )?;
1717 let out: Vec<String> = stmt
1718 .query_map(
1719 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1720 |r| r.get::<_, String>(0),
1721 )?
1722 .filter_map(|r| r.ok())
1723 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1724 .collect();
1725 Ok(out)
1726 }
1727
1728 pub fn skills_used_in_window(
1730 &self,
1731 workspace: &str,
1732 start_ms: u64,
1733 end_ms: u64,
1734 ) -> Result<Vec<(String, String)>> {
1735 let mut stmt = self.conn.prepare(
1736 "SELECT DISTINCT su.session_id, su.skill
1737 FROM skills_used su
1738 JOIN sessions s ON s.id = su.session_id
1739 WHERE s.workspace = ?1
1740 AND EXISTS (
1741 SELECT 1 FROM events e
1742 JOIN sessions ss ON ss.id = e.session_id
1743 WHERE e.session_id = su.session_id
1744 AND (
1745 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1746 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1747 )
1748 )
1749 ORDER BY su.session_id, su.skill",
1750 )?;
1751 let out: Vec<(String, String)> = stmt
1752 .query_map(
1753 params![
1754 workspace,
1755 start_ms as i64,
1756 end_ms as i64,
1757 SYNTHETIC_TS_CEILING_MS,
1758 ],
1759 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1760 )?
1761 .filter_map(|r| r.ok())
1762 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1763 .collect();
1764 Ok(out)
1765 }
1766
1767 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1769 let mut stmt = self.conn.prepare(
1770 "SELECT DISTINCT ru.rule
1771 FROM rules_used ru
1772 JOIN sessions s ON s.id = ru.session_id
1773 WHERE s.workspace = ?1
1774 AND EXISTS (
1775 SELECT 1 FROM events e
1776 JOIN sessions ss ON ss.id = e.session_id
1777 WHERE e.session_id = ru.session_id
1778 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1779 )
1780 ORDER BY ru.rule",
1781 )?;
1782 let out: Vec<String> = stmt
1783 .query_map(
1784 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1785 |r| r.get::<_, String>(0),
1786 )?
1787 .filter_map(|r| r.ok())
1788 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1789 .collect();
1790 Ok(out)
1791 }
1792
1793 pub fn rules_used_in_window(
1795 &self,
1796 workspace: &str,
1797 start_ms: u64,
1798 end_ms: u64,
1799 ) -> Result<Vec<(String, String)>> {
1800 let mut stmt = self.conn.prepare(
1801 "SELECT DISTINCT ru.session_id, ru.rule
1802 FROM rules_used ru
1803 JOIN sessions s ON s.id = ru.session_id
1804 WHERE s.workspace = ?1
1805 AND EXISTS (
1806 SELECT 1 FROM events e
1807 JOIN sessions ss ON ss.id = e.session_id
1808 WHERE e.session_id = ru.session_id
1809 AND (
1810 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1811 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1812 )
1813 )
1814 ORDER BY ru.session_id, ru.rule",
1815 )?;
1816 let out: Vec<(String, String)> = stmt
1817 .query_map(
1818 params![
1819 workspace,
1820 start_ms as i64,
1821 end_ms as i64,
1822 SYNTHETIC_TS_CEILING_MS,
1823 ],
1824 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1825 )?
1826 .filter_map(|r| r.ok())
1827 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1828 .collect();
1829 Ok(out)
1830 }
1831
1832 pub fn sessions_active_in_window(
1834 &self,
1835 workspace: &str,
1836 start_ms: u64,
1837 end_ms: u64,
1838 ) -> Result<HashSet<String>> {
1839 let mut stmt = self.conn.prepare(
1840 "SELECT DISTINCT s.id
1841 FROM sessions s
1842 WHERE s.workspace = ?1
1843 AND EXISTS (
1844 SELECT 1 FROM events e
1845 WHERE e.session_id = s.id
1846 AND (
1847 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1848 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1849 )
1850 )",
1851 )?;
1852 let out: HashSet<String> = stmt
1853 .query_map(
1854 params![
1855 workspace,
1856 start_ms as i64,
1857 end_ms as i64,
1858 SYNTHETIC_TS_CEILING_MS,
1859 ],
1860 |r| r.get(0),
1861 )?
1862 .filter_map(|r| r.ok())
1863 .collect();
1864 Ok(out)
1865 }
1866
1867 pub fn session_costs_usd_e6_in_window(
1869 &self,
1870 workspace: &str,
1871 start_ms: u64,
1872 end_ms: u64,
1873 ) -> Result<HashMap<String, i64>> {
1874 let mut stmt = self.conn.prepare(
1875 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1876 FROM events e
1877 JOIN sessions s ON s.id = e.session_id
1878 WHERE s.workspace = ?1
1879 AND (
1880 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1881 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1882 )
1883 GROUP BY e.session_id",
1884 )?;
1885 let rows: Vec<(String, i64)> = stmt
1886 .query_map(
1887 params![
1888 workspace,
1889 start_ms as i64,
1890 end_ms as i64,
1891 SYNTHETIC_TS_CEILING_MS,
1892 ],
1893 |r| Ok((r.get(0)?, r.get(1)?)),
1894 )?
1895 .filter_map(|r| r.ok())
1896 .collect();
1897 Ok(rows.into_iter().collect())
1898 }
1899
1900 pub fn guidance_report(
1902 &self,
1903 workspace: &str,
1904 window_start_ms: u64,
1905 window_end_ms: u64,
1906 skill_slugs_on_disk: &HashSet<String>,
1907 rule_slugs_on_disk: &HashSet<String>,
1908 ) -> Result<GuidanceReport> {
1909 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1910 let denom = active.len() as u64;
1911 let costs =
1912 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1913
1914 let workspace_avg_cost_per_session_usd = if denom > 0 {
1915 let total_e6: i64 = active
1916 .iter()
1917 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1918 .sum();
1919 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1920 } else {
1921 None
1922 };
1923
1924 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1925 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1926 skill_sessions.entry(skill).or_default().insert(sid);
1927 }
1928 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1929 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1930 rule_sessions.entry(rule).or_default().insert(sid);
1931 }
1932
1933 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1934
1935 let mut push_row =
1936 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1937 let sessions = sids.len() as u64;
1938 let sessions_pct = if denom > 0 {
1939 sessions as f64 * 100.0 / denom as f64
1940 } else {
1941 0.0
1942 };
1943 let total_cost_usd_e6: i64 = sids
1944 .iter()
1945 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1946 .sum();
1947 let avg_cost_per_session_usd = if sessions > 0 {
1948 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1949 } else {
1950 None
1951 };
1952 let vs_workspace_avg_cost_per_session_usd =
1953 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1954 (Some(avg), Some(w)) => Some(avg - w),
1955 _ => None,
1956 };
1957 rows.push(GuidancePerfRow {
1958 kind,
1959 id,
1960 sessions,
1961 sessions_pct,
1962 total_cost_usd_e6,
1963 avg_cost_per_session_usd,
1964 vs_workspace_avg_cost_per_session_usd,
1965 on_disk,
1966 });
1967 };
1968
1969 let mut seen_skills: HashSet<String> = HashSet::new();
1970 for (id, sids) in &skill_sessions {
1971 seen_skills.insert(id.clone());
1972 push_row(
1973 GuidanceKind::Skill,
1974 id.clone(),
1975 sids,
1976 skill_slugs_on_disk.contains(id),
1977 );
1978 }
1979 for slug in skill_slugs_on_disk {
1980 if seen_skills.contains(slug) {
1981 continue;
1982 }
1983 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1984 }
1985
1986 let mut seen_rules: HashSet<String> = HashSet::new();
1987 for (id, sids) in &rule_sessions {
1988 seen_rules.insert(id.clone());
1989 push_row(
1990 GuidanceKind::Rule,
1991 id.clone(),
1992 sids,
1993 rule_slugs_on_disk.contains(id),
1994 );
1995 }
1996 for slug in rule_slugs_on_disk {
1997 if seen_rules.contains(slug) {
1998 continue;
1999 }
2000 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
2001 }
2002
2003 rows.sort_by(|a, b| {
2004 b.sessions
2005 .cmp(&a.sessions)
2006 .then_with(|| a.kind.cmp(&b.kind))
2007 .then_with(|| a.id.cmp(&b.id))
2008 });
2009
2010 Ok(GuidanceReport {
2011 workspace: workspace.to_string(),
2012 window_start_ms,
2013 window_end_ms,
2014 sessions_in_window: denom,
2015 workspace_avg_cost_per_session_usd,
2016 rows,
2017 })
2018 }
2019
2020 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
2021 let mut stmt = self.conn.prepare(
2022 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
2023 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
2024 prompt_fingerprint, parent_session_id, agent_version, os, arch,
2025 repo_file_count, repo_total_loc
2026 FROM sessions WHERE id = ?1",
2027 )?;
2028 let mut rows = stmt.query_map(params![id], |row| {
2029 Ok((
2030 row.get::<_, String>(0)?,
2031 row.get::<_, String>(1)?,
2032 row.get::<_, Option<String>>(2)?,
2033 row.get::<_, String>(3)?,
2034 row.get::<_, i64>(4)?,
2035 row.get::<_, Option<i64>>(5)?,
2036 row.get::<_, String>(6)?,
2037 row.get::<_, String>(7)?,
2038 row.get::<_, Option<String>>(8)?,
2039 row.get::<_, Option<String>>(9)?,
2040 row.get::<_, Option<String>>(10)?,
2041 row.get::<_, Option<i64>>(11)?,
2042 row.get::<_, Option<i64>>(12)?,
2043 row.get::<_, String>(13)?,
2044 row.get::<_, Option<String>>(14)?,
2045 row.get::<_, Option<String>>(15)?,
2046 row.get::<_, Option<String>>(16)?,
2047 row.get::<_, Option<String>>(17)?,
2048 row.get::<_, Option<String>>(18)?,
2049 row.get::<_, Option<i64>>(19)?,
2050 row.get::<_, Option<i64>>(20)?,
2051 ))
2052 })?;
2053
2054 if let Some(row) = rows.next() {
2055 let (
2056 id,
2057 agent,
2058 model,
2059 workspace,
2060 started,
2061 ended,
2062 status_str,
2063 trace,
2064 start_commit,
2065 end_commit,
2066 branch,
2067 dirty_start,
2068 dirty_end,
2069 source,
2070 prompt_fingerprint,
2071 parent_session_id,
2072 agent_version,
2073 os,
2074 arch,
2075 repo_file_count,
2076 repo_total_loc,
2077 ) = row?;
2078 Ok(Some(SessionRecord {
2079 id,
2080 agent,
2081 model,
2082 workspace,
2083 started_at_ms: started as u64,
2084 ended_at_ms: ended.map(|v| v as u64),
2085 status: status_from_str(&status_str),
2086 trace_path: trace,
2087 start_commit,
2088 end_commit,
2089 branch,
2090 dirty_start: dirty_start.map(i64_to_bool),
2091 dirty_end: dirty_end.map(i64_to_bool),
2092 repo_binding_source: empty_to_none(source),
2093 prompt_fingerprint,
2094 parent_session_id,
2095 agent_version,
2096 os,
2097 arch,
2098 repo_file_count: repo_file_count.map(|v| v as u32),
2099 repo_total_loc: repo_total_loc.map(|v| v as u64),
2100 }))
2101 } else {
2102 Ok(None)
2103 }
2104 }
2105
2106 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2107 let mut stmt = self.conn.prepare(
2108 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2109 indexed_at_ms, dirty, graph_path
2110 FROM repo_snapshots WHERE workspace = ?1
2111 ORDER BY indexed_at_ms DESC LIMIT 1",
2112 )?;
2113 let mut rows = stmt.query_map(params![workspace], |row| {
2114 Ok(RepoSnapshotRecord {
2115 id: row.get(0)?,
2116 workspace: row.get(1)?,
2117 head_commit: row.get(2)?,
2118 dirty_fingerprint: row.get(3)?,
2119 analyzer_version: row.get(4)?,
2120 indexed_at_ms: row.get::<_, i64>(5)? as u64,
2121 dirty: row.get::<_, i64>(6)? != 0,
2122 graph_path: row.get(7)?,
2123 })
2124 })?;
2125 Ok(rows.next().transpose()?)
2126 }
2127
2128 pub fn save_repo_snapshot(
2129 &self,
2130 snapshot: &RepoSnapshotRecord,
2131 facts: &[FileFact],
2132 edges: &[RepoEdge],
2133 ) -> Result<()> {
2134 self.conn.execute(
2135 "INSERT INTO repo_snapshots (
2136 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2137 indexed_at_ms, dirty, graph_path
2138 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2139 ON CONFLICT(id) DO UPDATE SET
2140 workspace=excluded.workspace,
2141 head_commit=excluded.head_commit,
2142 dirty_fingerprint=excluded.dirty_fingerprint,
2143 analyzer_version=excluded.analyzer_version,
2144 indexed_at_ms=excluded.indexed_at_ms,
2145 dirty=excluded.dirty,
2146 graph_path=excluded.graph_path",
2147 params![
2148 snapshot.id,
2149 snapshot.workspace,
2150 snapshot.head_commit,
2151 snapshot.dirty_fingerprint,
2152 snapshot.analyzer_version,
2153 snapshot.indexed_at_ms as i64,
2154 bool_to_i64(snapshot.dirty),
2155 snapshot.graph_path,
2156 ],
2157 )?;
2158 self.conn.execute(
2159 "DELETE FROM file_facts WHERE snapshot_id = ?1",
2160 params![snapshot.id],
2161 )?;
2162 self.conn.execute(
2163 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2164 params![snapshot.id],
2165 )?;
2166 for fact in facts {
2167 self.conn.execute(
2168 "INSERT INTO file_facts (
2169 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2170 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2171 churn_30d, churn_90d, authors_90d, last_changed_ms
2172 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2173 params![
2174 fact.snapshot_id,
2175 fact.path,
2176 fact.language,
2177 fact.bytes as i64,
2178 fact.loc as i64,
2179 fact.sloc as i64,
2180 fact.complexity_total as i64,
2181 fact.max_fn_complexity as i64,
2182 fact.symbol_count as i64,
2183 fact.import_count as i64,
2184 fact.fan_in as i64,
2185 fact.fan_out as i64,
2186 fact.churn_30d as i64,
2187 fact.churn_90d as i64,
2188 fact.authors_90d as i64,
2189 fact.last_changed_ms.map(|v| v as i64),
2190 ],
2191 )?;
2192 }
2193 for edge in edges {
2194 self.conn.execute(
2195 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2196 VALUES (?1, ?2, ?3, ?4, ?5)
2197 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2198 DO UPDATE SET weight = weight + excluded.weight",
2199 params![
2200 snapshot.id,
2201 edge.from_path,
2202 edge.to_path,
2203 edge.kind,
2204 edge.weight as i64,
2205 ],
2206 )?;
2207 }
2208 Ok(())
2209 }
2210
2211 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2212 let mut stmt = self.conn.prepare(
2213 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2214 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2215 churn_30d, churn_90d, authors_90d, last_changed_ms
2216 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2217 )?;
2218 let rows = stmt.query_map(params![snapshot_id], |row| {
2219 Ok(FileFact {
2220 snapshot_id: row.get(0)?,
2221 path: row.get(1)?,
2222 language: row.get(2)?,
2223 bytes: row.get::<_, i64>(3)? as u64,
2224 loc: row.get::<_, i64>(4)? as u32,
2225 sloc: row.get::<_, i64>(5)? as u32,
2226 complexity_total: row.get::<_, i64>(6)? as u32,
2227 max_fn_complexity: row.get::<_, i64>(7)? as u32,
2228 symbol_count: row.get::<_, i64>(8)? as u32,
2229 import_count: row.get::<_, i64>(9)? as u32,
2230 fan_in: row.get::<_, i64>(10)? as u32,
2231 fan_out: row.get::<_, i64>(11)? as u32,
2232 churn_30d: row.get::<_, i64>(12)? as u32,
2233 churn_90d: row.get::<_, i64>(13)? as u32,
2234 authors_90d: row.get::<_, i64>(14)? as u32,
2235 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2236 })
2237 })?;
2238 Ok(rows.filter_map(|row| row.ok()).collect())
2239 }
2240
2241 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2242 let mut stmt = self.conn.prepare(
2243 "SELECT from_id, to_id, kind, weight
2244 FROM repo_edges WHERE snapshot_id = ?1
2245 ORDER BY kind, from_id, to_id",
2246 )?;
2247 let rows = stmt.query_map(params![snapshot_id], |row| {
2248 Ok(RepoEdge {
2249 from_path: row.get(0)?,
2250 to_path: row.get(1)?,
2251 kind: row.get(2)?,
2252 weight: row.get::<_, i64>(3)? as u32,
2253 })
2254 })?;
2255 Ok(rows.filter_map(|row| row.ok()).collect())
2256 }
2257
2258 pub fn hottest_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2259 self.ranked_files_for_snapshot(snapshot_id, "churn_30d * complexity_total")
2260 }
2261
2262 pub fn most_changed_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2263 self.ranked_files_for_snapshot(snapshot_id, "churn_30d")
2264 }
2265
2266 pub fn most_complex_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2267 self.ranked_files_for_snapshot(snapshot_id, "complexity_total")
2268 }
2269
2270 pub fn highest_risk_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2271 self.ranked_files_for_snapshot(snapshot_id, "churn_30d * authors_90d * complexity_total")
2272 }
2273
2274 fn ranked_files_for_snapshot(
2275 &self,
2276 snapshot_id: &str,
2277 value_sql: &str,
2278 ) -> Result<Vec<RankedFile>> {
2279 let sql = format!(
2280 "SELECT path, {value_sql}, complexity_total, churn_30d
2281 FROM file_facts WHERE snapshot_id = ?1
2282 ORDER BY {value_sql} DESC, path ASC LIMIT 10"
2283 );
2284 let mut stmt = self.conn.prepare(&sql)?;
2285 let rows = stmt.query_map(params![snapshot_id], ranked_file_row)?;
2286 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2287 }
2288
2289 pub fn pain_hotspots_for_snapshot(
2290 &self,
2291 snapshot_id: &str,
2292 workspace: &str,
2293 start_ms: u64,
2294 end_ms: u64,
2295 ) -> Result<Vec<RankedFile>> {
2296 let mut stmt = self.conn.prepare(PAIN_HOTSPOTS_SQL)?;
2297 let rows = stmt.query_map(
2298 params![snapshot_id, workspace, start_ms as i64, end_ms as i64],
2299 ranked_file_row,
2300 )?;
2301 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2302 }
2303
2304 pub fn tool_rank_rows_in_window(
2305 &self,
2306 workspace: &str,
2307 start_ms: u64,
2308 end_ms: u64,
2309 ) -> Result<Vec<RankedTool>> {
2310 let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
2311 let rows = stmt.query_map(
2312 params![workspace, start_ms as i64, end_ms as i64],
2313 ranked_tool_row,
2314 )?;
2315 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2316 }
2317
2318 pub fn tool_spans_in_window(
2319 &self,
2320 workspace: &str,
2321 start_ms: u64,
2322 end_ms: u64,
2323 ) -> Result<Vec<ToolSpanView>> {
2324 let mut stmt = self.conn.prepare(
2325 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2326 reasoning_tokens, cost_usd_e6, paths_json,
2327 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2328 FROM (
2329 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2330 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2331 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2332 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2333 ts.started_at_ms AS sort_ms
2334 FROM tool_spans ts
2335 JOIN sessions s ON s.id = ts.session_id
2336 WHERE s.workspace = ?1
2337 AND ts.started_at_ms >= ?2
2338 AND ts.started_at_ms <= ?3
2339 UNION ALL
2340 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2341 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2342 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2343 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2344 ts.ended_at_ms AS sort_ms
2345 FROM tool_spans ts
2346 JOIN sessions s ON s.id = ts.session_id
2347 WHERE s.workspace = ?1
2348 AND ts.started_at_ms IS NULL
2349 AND ts.ended_at_ms >= ?2
2350 AND ts.ended_at_ms <= ?3
2351 )
2352 ORDER BY sort_ms DESC",
2353 )?;
2354 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2355 let paths_json: String = row.get(8)?;
2356 Ok(ToolSpanView {
2357 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2358 tool: row
2359 .get::<_, Option<String>>(1)?
2360 .unwrap_or_else(|| "unknown".into()),
2361 status: row.get(2)?,
2362 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2363 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2364 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2365 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2366 cost_usd_e6: row.get(7)?,
2367 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2368 parent_span_id: row.get(9)?,
2369 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2370 subtree_cost_usd_e6: row.get(11)?,
2371 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2372 })
2373 })?;
2374 Ok(rows.filter_map(|row| row.ok()).collect())
2375 }
2376
2377 pub fn session_span_tree(
2378 &self,
2379 session_id: &str,
2380 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2381 let last_event_seq = self.last_event_seq_for_session(session_id)?;
2382 if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2383 && entry.session_id == session_id
2384 && entry.last_event_seq == last_event_seq
2385 {
2386 return Ok(entry.nodes.clone());
2387 }
2388 let mut stmt = self.conn.prepare(
2389 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2390 reasoning_tokens, cost_usd_e6, paths_json,
2391 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2392 FROM tool_spans
2393 WHERE session_id = ?1
2394 ORDER BY depth ASC, started_at_ms ASC",
2395 )?;
2396 let rows = stmt.query_map(params![session_id], |row| {
2397 let paths_json: String = row.get(8)?;
2398 Ok(crate::metrics::types::ToolSpanView {
2399 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2400 tool: row
2401 .get::<_, Option<String>>(1)?
2402 .unwrap_or_else(|| "unknown".into()),
2403 status: row.get(2)?,
2404 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2405 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2406 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2407 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2408 cost_usd_e6: row.get(7)?,
2409 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2410 parent_span_id: row.get(9)?,
2411 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2412 subtree_cost_usd_e6: row.get(11)?,
2413 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2414 })
2415 })?;
2416 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2417 let nodes = crate::store::span_tree::build_tree(spans);
2418 *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2419 session_id: session_id.to_string(),
2420 last_event_seq,
2421 nodes: nodes.clone(),
2422 });
2423 Ok(nodes)
2424 }
2425
2426 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2427 let seq = self
2428 .conn
2429 .query_row(
2430 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2431 params![session_id],
2432 |r| r.get::<_, Option<i64>>(0),
2433 )?
2434 .map(|v| v as u64);
2435 Ok(seq)
2436 }
2437
2438 pub fn tool_spans_sync_rows_in_window(
2444 &self,
2445 workspace: &str,
2446 start_ms: u64,
2447 end_ms: u64,
2448 ) -> Result<Vec<ToolSpanSyncRow>> {
2449 let mut stmt = self.conn.prepare(
2450 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2451 lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2452 FROM (
2453 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2454 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2455 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2456 ts.started_at_ms AS sort_ms
2457 FROM tool_spans ts
2458 JOIN sessions s ON s.id = ts.session_id
2459 WHERE s.workspace = ?1
2460 AND ts.started_at_ms IS NOT NULL
2461 AND ts.started_at_ms >= ?2
2462 AND ts.started_at_ms <= ?3
2463 UNION ALL
2464 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2465 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2466 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2467 ts.ended_at_ms AS sort_ms
2468 FROM tool_spans ts
2469 JOIN sessions s ON s.id = ts.session_id
2470 WHERE s.workspace = ?1
2471 AND ts.started_at_ms IS NULL
2472 AND ts.ended_at_ms IS NOT NULL
2473 AND ts.ended_at_ms >= ?2
2474 AND ts.ended_at_ms <= ?3
2475 )
2476 ORDER BY sort_ms ASC, span_id ASC",
2477 )?;
2478 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2479 let paths_json: String = row.get(12)?;
2480 Ok(ToolSpanSyncRow {
2481 span_id: row.get(0)?,
2482 session_id: row.get(1)?,
2483 tool: row.get(2)?,
2484 tool_call_id: row.get(3)?,
2485 status: row.get(4)?,
2486 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2487 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2488 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2489 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2490 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2491 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2492 cost_usd_e6: row.get(11)?,
2493 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2494 })
2495 })?;
2496 Ok(rows.filter_map(|row| row.ok()).collect())
2497 }
2498
2499 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2500 let mut stmt = self.conn.prepare(
2501 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2502 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2503 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2504 )?;
2505 let rows = stmt.query_map(params![session_id], |row| {
2506 let paths_json: String = row.get(12)?;
2507 Ok(ToolSpanSyncRow {
2508 span_id: row.get(0)?,
2509 session_id: row.get(1)?,
2510 tool: row.get(2)?,
2511 tool_call_id: row.get(3)?,
2512 status: row.get(4)?,
2513 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2514 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2515 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2516 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2517 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2518 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2519 cost_usd_e6: row.get(11)?,
2520 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2521 })
2522 })?;
2523 Ok(rows.filter_map(|row| row.ok()).collect())
2524 }
2525
2526 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2527 self.conn.execute(
2528 "INSERT OR REPLACE INTO session_evals
2529 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2530 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2531 rusqlite::params![
2532 eval.id,
2533 eval.session_id,
2534 eval.judge_model,
2535 eval.rubric_id,
2536 eval.score,
2537 eval.rationale,
2538 eval.flagged as i64,
2539 eval.created_at_ms as i64,
2540 ],
2541 )?;
2542 Ok(())
2543 }
2544
2545 pub fn list_evals_in_window(
2546 &self,
2547 start_ms: u64,
2548 end_ms: u64,
2549 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2550 let mut stmt = self.conn.prepare(
2551 "SELECT id, session_id, judge_model, rubric_id, score,
2552 rationale, flagged, created_at_ms
2553 FROM session_evals
2554 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2555 ORDER BY created_at_ms ASC",
2556 )?;
2557 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2558 Ok(crate::eval::types::EvalRow {
2559 id: r.get(0)?,
2560 session_id: r.get(1)?,
2561 judge_model: r.get(2)?,
2562 rubric_id: r.get(3)?,
2563 score: r.get(4)?,
2564 rationale: r.get(5)?,
2565 flagged: r.get::<_, i64>(6)? != 0,
2566 created_at_ms: r.get::<_, i64>(7)? as u64,
2567 })
2568 })?;
2569 rows.collect()
2570 }
2571
2572 pub fn list_evals_for_session(
2573 &self,
2574 session_id: &str,
2575 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2576 let mut stmt = self.conn.prepare(
2577 "SELECT id, session_id, judge_model, rubric_id, score,
2578 rationale, flagged, created_at_ms
2579 FROM session_evals
2580 WHERE session_id = ?1
2581 ORDER BY created_at_ms DESC",
2582 )?;
2583 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2584 Ok(crate::eval::types::EvalRow {
2585 id: r.get(0)?,
2586 session_id: r.get(1)?,
2587 judge_model: r.get(2)?,
2588 rubric_id: r.get(3)?,
2589 score: r.get(4)?,
2590 rationale: r.get(5)?,
2591 flagged: r.get::<_, i64>(6)? != 0,
2592 created_at_ms: r.get::<_, i64>(7)? as u64,
2593 })
2594 })?;
2595 rows.collect()
2596 }
2597
2598 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2599 use crate::feedback::types::FeedbackLabel;
2600 self.conn.execute(
2601 "INSERT OR REPLACE INTO session_feedback
2602 (id, session_id, score, label, note, created_at_ms)
2603 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2604 rusqlite::params![
2605 r.id,
2606 r.session_id,
2607 r.score.as_ref().map(|s| s.0 as i64),
2608 r.label.as_ref().map(FeedbackLabel::to_db_str),
2609 r.note,
2610 r.created_at_ms as i64,
2611 ],
2612 )?;
2613 let payload = serde_json::to_string(r).unwrap_or_default();
2614 self.conn.execute(
2615 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2616 VALUES (?1, 'session_feedback', ?2, 0)",
2617 rusqlite::params![r.session_id, payload],
2618 )?;
2619 Ok(())
2620 }
2621
2622 pub fn list_feedback_in_window(
2623 &self,
2624 start_ms: u64,
2625 end_ms: u64,
2626 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2627 let mut stmt = self.conn.prepare(
2628 "SELECT id, session_id, score, label, note, created_at_ms
2629 FROM session_feedback
2630 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2631 ORDER BY created_at_ms ASC",
2632 )?;
2633 let rows = stmt.query_map(
2634 rusqlite::params![start_ms as i64, end_ms as i64],
2635 feedback_row,
2636 )?;
2637 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2638 }
2639
2640 pub fn feedback_for_sessions(
2641 &self,
2642 ids: &[String],
2643 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2644 if ids.is_empty() {
2645 return Ok(std::collections::HashMap::new());
2646 }
2647 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2648 let sql = format!(
2649 "SELECT id, session_id, score, label, note, created_at_ms
2650 FROM session_feedback WHERE session_id IN ({placeholders})
2651 ORDER BY created_at_ms DESC"
2652 );
2653 let mut stmt = self.conn.prepare(&sql)?;
2654 let params: Vec<&dyn rusqlite::ToSql> =
2655 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2656 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2657 let mut map = std::collections::HashMap::new();
2658 for row in rows {
2659 let r = row?;
2660 map.entry(r.session_id.clone()).or_insert(r);
2661 }
2662 Ok(map)
2663 }
2664
2665 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2666 self.conn.execute(
2667 "INSERT INTO session_outcomes (
2668 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2669 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2670 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2671 ON CONFLICT(session_id) DO UPDATE SET
2672 test_passed=excluded.test_passed,
2673 test_failed=excluded.test_failed,
2674 test_skipped=excluded.test_skipped,
2675 build_ok=excluded.build_ok,
2676 lint_errors=excluded.lint_errors,
2677 revert_lines_14d=excluded.revert_lines_14d,
2678 pr_open=excluded.pr_open,
2679 ci_ok=excluded.ci_ok,
2680 measured_at_ms=excluded.measured_at_ms,
2681 measure_error=excluded.measure_error",
2682 params![
2683 row.session_id,
2684 row.test_passed,
2685 row.test_failed,
2686 row.test_skipped,
2687 row.build_ok.map(bool_to_i64),
2688 row.lint_errors,
2689 row.revert_lines_14d,
2690 row.pr_open,
2691 row.ci_ok.map(bool_to_i64),
2692 row.measured_at_ms as i64,
2693 row.measure_error.as_deref(),
2694 ],
2695 )?;
2696 Ok(())
2697 }
2698
2699 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2700 let mut stmt = self.conn.prepare(
2701 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2702 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2703 FROM session_outcomes WHERE session_id = ?1",
2704 )?;
2705 let row = stmt
2706 .query_row(params![session_id], outcome_row)
2707 .optional()?;
2708 Ok(row)
2709 }
2710
2711 pub fn list_session_outcomes_in_window(
2713 &self,
2714 workspace: &str,
2715 start_ms: u64,
2716 end_ms: u64,
2717 ) -> Result<Vec<SessionOutcomeRow>> {
2718 let mut stmt = self.conn.prepare(
2719 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2720 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2721 FROM session_outcomes o
2722 JOIN sessions s ON s.id = o.session_id
2723 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2724 ORDER BY o.measured_at_ms ASC",
2725 )?;
2726 let rows = stmt.query_map(
2727 params![workspace, start_ms as i64, end_ms as i64],
2728 outcome_row,
2729 )?;
2730 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2731 }
2732
2733 pub fn append_session_sample(
2734 &self,
2735 session_id: &str,
2736 ts_ms: u64,
2737 pid: u32,
2738 cpu_percent: Option<f64>,
2739 rss_bytes: Option<u64>,
2740 ) -> Result<()> {
2741 self.conn.execute(
2742 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2743 VALUES (?1, ?2, ?3, ?4, ?5)",
2744 params![
2745 session_id,
2746 ts_ms as i64,
2747 pid as i64,
2748 cpu_percent,
2749 rss_bytes.map(|b| b as i64)
2750 ],
2751 )?;
2752 Ok(())
2753 }
2754
2755 pub fn list_session_sample_aggs_in_window(
2757 &self,
2758 workspace: &str,
2759 start_ms: u64,
2760 end_ms: u64,
2761 ) -> Result<Vec<SessionSampleAgg>> {
2762 let mut stmt = self.conn.prepare(
2763 "SELECT ss.session_id, COUNT(*) AS n,
2764 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2765 FROM session_samples ss
2766 JOIN sessions s ON s.id = ss.session_id
2767 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2768 GROUP BY ss.session_id",
2769 )?;
2770 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2771 let sid: String = r.get(0)?;
2772 let n: i64 = r.get(1)?;
2773 let max_cpu: Option<f64> = r.get(2)?;
2774 let max_rss: Option<i64> = r.get(3)?;
2775 Ok(SessionSampleAgg {
2776 session_id: sid,
2777 sample_count: n as u64,
2778 max_cpu_percent: max_cpu.unwrap_or(0.0),
2779 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2780 })
2781 })?;
2782 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2783 }
2784
2785 pub fn list_sessions_for_eval(
2786 &self,
2787 since_ms: u64,
2788 min_cost_usd: f64,
2789 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2790 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2791 let mut stmt = self.conn.prepare(
2792 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2793 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2794 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2795 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2796 FROM sessions s
2797 WHERE s.started_at_ms >= ?1
2798 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2799 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2800 ORDER BY s.started_at_ms DESC",
2801 )?;
2802 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2803 Ok((
2804 r.get::<_, String>(0)?,
2805 r.get::<_, String>(1)?,
2806 r.get::<_, Option<String>>(2)?,
2807 r.get::<_, String>(3)?,
2808 r.get::<_, i64>(4)?,
2809 r.get::<_, Option<i64>>(5)?,
2810 r.get::<_, String>(6)?,
2811 r.get::<_, String>(7)?,
2812 r.get::<_, Option<String>>(8)?,
2813 r.get::<_, Option<String>>(9)?,
2814 r.get::<_, Option<String>>(10)?,
2815 r.get::<_, Option<i64>>(11)?,
2816 r.get::<_, Option<i64>>(12)?,
2817 r.get::<_, Option<String>>(13)?,
2818 r.get::<_, Option<String>>(14)?,
2819 r.get::<_, Option<String>>(15)?,
2820 r.get::<_, Option<String>>(16)?,
2821 r.get::<_, Option<String>>(17)?,
2822 r.get::<_, Option<String>>(18)?,
2823 r.get::<_, Option<i64>>(19)?,
2824 r.get::<_, Option<i64>>(20)?,
2825 ))
2826 })?;
2827 let mut out = Vec::new();
2828 for row in rows {
2829 let (
2830 id,
2831 agent,
2832 model,
2833 workspace,
2834 started,
2835 ended,
2836 status_str,
2837 trace,
2838 start_commit,
2839 end_commit,
2840 branch,
2841 dirty_start,
2842 dirty_end,
2843 source,
2844 prompt_fingerprint,
2845 parent_session_id,
2846 agent_version,
2847 os,
2848 arch,
2849 repo_file_count,
2850 repo_total_loc,
2851 ) = row?;
2852 out.push(crate::core::event::SessionRecord {
2853 id,
2854 agent,
2855 model,
2856 workspace,
2857 started_at_ms: started as u64,
2858 ended_at_ms: ended.map(|v| v as u64),
2859 status: status_from_str(&status_str),
2860 trace_path: trace,
2861 start_commit,
2862 end_commit,
2863 branch,
2864 dirty_start: dirty_start.map(i64_to_bool),
2865 dirty_end: dirty_end.map(i64_to_bool),
2866 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2867 prompt_fingerprint,
2868 parent_session_id,
2869 agent_version,
2870 os,
2871 arch,
2872 repo_file_count: repo_file_count.map(|v| v as u32),
2873 repo_total_loc: repo_total_loc.map(|v| v as u64),
2874 });
2875 }
2876 Ok(out)
2877 }
2878
2879 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2880 self.conn.execute(
2881 "INSERT OR IGNORE INTO prompt_snapshots
2882 (fingerprint, captured_at_ms, files_json, total_bytes)
2883 VALUES (?1, ?2, ?3, ?4)",
2884 params![
2885 snap.fingerprint,
2886 snap.captured_at_ms as i64,
2887 snap.files_json,
2888 snap.total_bytes as i64
2889 ],
2890 )?;
2891 Ok(())
2892 }
2893
2894 pub fn get_prompt_snapshot(
2895 &self,
2896 fingerprint: &str,
2897 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2898 self.conn
2899 .query_row(
2900 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2901 FROM prompt_snapshots WHERE fingerprint = ?1",
2902 params![fingerprint],
2903 |r| {
2904 Ok(crate::prompt::PromptSnapshot {
2905 fingerprint: r.get(0)?,
2906 captured_at_ms: r.get::<_, i64>(1)? as u64,
2907 files_json: r.get(2)?,
2908 total_bytes: r.get::<_, i64>(3)? as u64,
2909 })
2910 },
2911 )
2912 .optional()
2913 .map_err(Into::into)
2914 }
2915
2916 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2917 let mut stmt = self.conn.prepare(
2918 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2919 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2920 )?;
2921 let rows = stmt.query_map([], |r| {
2922 Ok(crate::prompt::PromptSnapshot {
2923 fingerprint: r.get(0)?,
2924 captured_at_ms: r.get::<_, i64>(1)? as u64,
2925 files_json: r.get(2)?,
2926 total_bytes: r.get::<_, i64>(3)? as u64,
2927 })
2928 })?;
2929 Ok(rows.filter_map(|r| r.ok()).collect())
2930 }
2931
2932 pub fn sessions_with_prompt_fingerprint(
2934 &self,
2935 workspace: &str,
2936 start_ms: u64,
2937 end_ms: u64,
2938 ) -> Result<Vec<(String, String)>> {
2939 let mut stmt = self.conn.prepare(
2940 "SELECT id, prompt_fingerprint FROM sessions
2941 WHERE workspace = ?1
2942 AND started_at_ms >= ?2 AND started_at_ms < ?3
2943 AND prompt_fingerprint IS NOT NULL",
2944 )?;
2945 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2946 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2947 })?;
2948 Ok(rows.filter_map(|r| r.ok()).collect())
2949 }
2950}
2951
2952impl Drop for Store {
2953 fn drop(&mut self) {
2954 if let Some(writer) = self.search_writer.get_mut().as_mut() {
2955 let _ = writer.commit();
2956 }
2957 }
2958}
2959
2960fn now_ms() -> u64 {
2961 std::time::SystemTime::now()
2962 .duration_since(std::time::UNIX_EPOCH)
2963 .unwrap_or_default()
2964 .as_millis() as u64
2965}
2966
2967fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
2968 let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
2969 let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
2970 Ok(rows.filter_map(|r| r.ok()).collect())
2971}
2972
2973fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2974 raw.and_then(|s| s.trim().parse::<u64>().ok())
2975 .unwrap_or(DEFAULT_MMAP_MB)
2976 .saturating_mul(1024)
2977 .saturating_mul(1024)
2978 .min(i64::MAX as u64) as i64
2979}
2980
2981fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2982 let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2983 conn.execute_batch(&format!(
2984 "
2985 PRAGMA journal_mode=WAL;
2986 PRAGMA busy_timeout=5000;
2987 PRAGMA synchronous=NORMAL;
2988 PRAGMA cache_size=-65536;
2989 PRAGMA mmap_size={mmap_size};
2990 PRAGMA temp_store=MEMORY;
2991 PRAGMA wal_autocheckpoint=1000;
2992 "
2993 ))?;
2994 if mode == StoreOpenMode::ReadOnlyQuery {
2995 conn.execute_batch("PRAGMA query_only=ON;")?;
2996 }
2997 Ok(())
2998}
2999
3000fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
3001 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
3002}
3003
3004fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
3005 let status_str: String = row.get(6)?;
3006 Ok(SessionRecord {
3007 id: row.get(0)?,
3008 agent: row.get(1)?,
3009 model: row.get(2)?,
3010 workspace: row.get(3)?,
3011 started_at_ms: row.get::<_, i64>(4)? as u64,
3012 ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3013 status: status_from_str(&status_str),
3014 trace_path: row.get(7)?,
3015 start_commit: row.get(8)?,
3016 end_commit: row.get(9)?,
3017 branch: row.get(10)?,
3018 dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3019 dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3020 repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
3021 prompt_fingerprint: row.get(14)?,
3022 parent_session_id: row.get(15)?,
3023 agent_version: row.get(16)?,
3024 os: row.get(17)?,
3025 arch: row.get(18)?,
3026 repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3027 repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3028 })
3029}
3030
3031fn ranked_file_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedFile> {
3032 Ok(RankedFile {
3033 path: row.get(0)?,
3034 value: row.get::<_, i64>(1)? as u64,
3035 complexity_total: row.get::<_, i64>(2)? as u32,
3036 churn_30d: row.get::<_, i64>(3)? as u32,
3037 })
3038}
3039
3040fn ranked_tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedTool> {
3041 Ok(RankedTool {
3042 tool: row.get(0)?,
3043 calls: row.get::<_, i64>(1)? as u64,
3044 p50_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
3045 p95_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
3046 total_tokens: row.get::<_, i64>(4)? as u64,
3047 total_reasoning_tokens: row.get::<_, i64>(5)? as u64,
3048 })
3049}
3050
3051fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
3052 let payload_str: String = row.get(12)?;
3053 Ok(Event {
3054 session_id: row.get(0)?,
3055 seq: row.get::<_, i64>(1)? as u64,
3056 ts_ms: row.get::<_, i64>(2)? as u64,
3057 ts_exact: row.get::<_, i64>(3)? != 0,
3058 kind: kind_from_str(&row.get::<_, String>(4)?),
3059 source: source_from_str(&row.get::<_, String>(5)?),
3060 tool: row.get(6)?,
3061 tool_call_id: row.get(7)?,
3062 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
3063 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3064 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
3065 cost_usd_e6: row.get(11)?,
3066 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
3067 stop_reason: row.get(13)?,
3068 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3069 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
3070 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
3071 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3072 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
3073 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3074 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
3075 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
3076 })
3077}
3078
3079fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> {
3080 Ok((row.get(22)?, event_row(row)?))
3081}
3082
3083fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
3084 let mut clauses = vec!["workspace = ?".to_string()];
3085 let mut args = vec![Value::Text(workspace.to_string())];
3086 if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
3087 clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
3088 args.push(Value::Text(format!("{}%", escape_like(prefix))));
3089 }
3090 if let Some(status) = &filter.status {
3091 clauses.push("status = ?".to_string());
3092 args.push(Value::Text(format!("{status:?}")));
3093 }
3094 if let Some(since_ms) = filter.since_ms {
3095 clauses.push("started_at_ms >= ?".to_string());
3096 args.push(Value::Integer(since_ms as i64));
3097 }
3098 (format!("WHERE {}", clauses.join(" AND ")), args)
3099}
3100
3101fn escape_like(raw: &str) -> String {
3102 raw.to_lowercase()
3103 .replace('\\', "\\\\")
3104 .replace('%', "\\%")
3105 .replace('_', "\\_")
3106}
3107
3108fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
3109 let cost: i64 = conn.query_row(
3110 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
3111 params![workspace], |r| r.get(0),
3112 )?;
3113 let with_cost: i64 = conn.query_row(
3114 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
3115 params![workspace], |r| r.get(0),
3116 )?;
3117 Ok((cost, with_cost as u64))
3118}
3119
3120fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
3121 let build_raw: Option<i64> = r.get(4)?;
3122 let ci_raw: Option<i64> = r.get(8)?;
3123 Ok(SessionOutcomeRow {
3124 session_id: r.get(0)?,
3125 test_passed: r.get(1)?,
3126 test_failed: r.get(2)?,
3127 test_skipped: r.get(3)?,
3128 build_ok: build_raw.map(|v| v != 0),
3129 lint_errors: r.get(5)?,
3130 revert_lines_14d: r.get(6)?,
3131 pr_open: r.get(7)?,
3132 ci_ok: ci_raw.map(|v| v != 0),
3133 measured_at_ms: r.get::<_, i64>(9)? as u64,
3134 measure_error: r.get(10)?,
3135 })
3136}
3137
3138fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
3139 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
3140 let score = r
3141 .get::<_, Option<i64>>(2)?
3142 .and_then(|v| FeedbackScore::new(v as u8));
3143 let label = r
3144 .get::<_, Option<String>>(3)?
3145 .and_then(|s| FeedbackLabel::from_str_opt(&s));
3146 Ok(FeedbackRecord {
3147 id: r.get(0)?,
3148 session_id: r.get(1)?,
3149 score,
3150 label,
3151 note: r.get(4)?,
3152 created_at_ms: r.get::<_, i64>(5)? as u64,
3153 })
3154}
3155
3156fn day_label(day_idx: u64) -> &'static str {
3157 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
3158}
3159
3160fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
3161 let week_ago = now.saturating_sub(7 * 86_400_000);
3162 let mut stmt = conn
3163 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
3164 let days: Vec<u64> = stmt
3165 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
3166 .filter_map(|r| r.ok())
3167 .map(|v| v as u64 / 86_400_000)
3168 .collect();
3169 let today = now / 86_400_000;
3170 Ok((0u64..7)
3171 .map(|i| {
3172 let d = today.saturating_sub(6 - i);
3173 (
3174 day_label(d).to_string(),
3175 days.iter().filter(|&&x| x == d).count() as u64,
3176 )
3177 })
3178 .collect())
3179}
3180
3181fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3182 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3183 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3184 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3185 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3186 COUNT(e.id) FROM sessions s \
3187 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3188 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3189 let mut stmt = conn.prepare(sql)?;
3190 let out: Vec<(SessionRecord, u64)> = stmt
3191 .query_map(params![workspace], |r| {
3192 let st: String = r.get(6)?;
3193 Ok((
3194 SessionRecord {
3195 id: r.get(0)?,
3196 agent: r.get(1)?,
3197 model: r.get(2)?,
3198 workspace: r.get(3)?,
3199 started_at_ms: r.get::<_, i64>(4)? as u64,
3200 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3201 status: status_from_str(&st),
3202 trace_path: r.get(7)?,
3203 start_commit: r.get(8)?,
3204 end_commit: r.get(9)?,
3205 branch: r.get(10)?,
3206 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3207 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3208 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3209 prompt_fingerprint: r.get(14)?,
3210 parent_session_id: r.get(15)?,
3211 agent_version: r.get(16)?,
3212 os: r.get(17)?,
3213 arch: r.get(18)?,
3214 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3215 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3216 },
3217 r.get::<_, i64>(21)? as u64,
3218 ))
3219 })?
3220 .filter_map(|r| r.ok())
3221 .collect();
3222 Ok(out)
3223}
3224
3225fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3226 let mut stmt = conn.prepare(
3227 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3228 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3229 )?;
3230 let out: Vec<(String, u64)> = stmt
3231 .query_map(params![workspace], |r| {
3232 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3233 })?
3234 .filter_map(|r| r.ok())
3235 .collect();
3236 Ok(out)
3237}
3238
3239fn status_from_str(s: &str) -> SessionStatus {
3240 match s {
3241 "Running" => SessionStatus::Running,
3242 "Waiting" => SessionStatus::Waiting,
3243 "Idle" => SessionStatus::Idle,
3244 _ => SessionStatus::Done,
3245 }
3246}
3247
3248fn projector_legacy_mode() -> bool {
3249 std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3250}
3251
3252fn is_stop_event(e: &Event) -> bool {
3253 if !matches!(e.kind, EventKind::Hook) {
3254 return false;
3255 }
3256 e.payload
3257 .get("event")
3258 .and_then(|v| v.as_str())
3259 .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3260 == Some("Stop")
3261}
3262
3263fn kind_from_str(s: &str) -> EventKind {
3264 match s {
3265 "ToolCall" => EventKind::ToolCall,
3266 "ToolResult" => EventKind::ToolResult,
3267 "Message" => EventKind::Message,
3268 "Error" => EventKind::Error,
3269 "Cost" => EventKind::Cost,
3270 "Hook" => EventKind::Hook,
3271 "Lifecycle" => EventKind::Lifecycle,
3272 _ => EventKind::Hook,
3273 }
3274}
3275
3276fn source_from_str(s: &str) -> EventSource {
3277 match s {
3278 "Tail" => EventSource::Tail,
3279 "Hook" => EventSource::Hook,
3280 _ => EventSource::Proxy,
3281 }
3282}
3283
3284fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3285 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3286 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3287 ensure_column(conn, "sessions", "branch", "TEXT")?;
3288 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3289 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3290 ensure_column(
3291 conn,
3292 "sessions",
3293 "repo_binding_source",
3294 "TEXT NOT NULL DEFAULT ''",
3295 )?;
3296 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3297 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3298 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3299 ensure_column(conn, "events", "stop_reason", "TEXT")?;
3300 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3301 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3302 ensure_column(conn, "events", "retry_count", "INTEGER")?;
3303 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3304 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3305 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3306 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3307 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3308 ensure_column(
3309 conn,
3310 "sync_outbox",
3311 "kind",
3312 "TEXT NOT NULL DEFAULT 'events'",
3313 )?;
3314 ensure_column(
3315 conn,
3316 "experiments",
3317 "state",
3318 "TEXT NOT NULL DEFAULT 'Draft'",
3319 )?;
3320 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3321 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3322 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3323 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3324 ensure_column(conn, "sessions", "os", "TEXT")?;
3325 ensure_column(conn, "sessions", "arch", "TEXT")?;
3326 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3327 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3328 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3329 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3330 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3331 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3332 conn.execute_batch(
3333 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3334 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3335 )?;
3336 Ok(())
3337}
3338
3339fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3340 if has_column(conn, table, column)? {
3341 return Ok(());
3342 }
3343 conn.execute(
3344 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
3345 [],
3346 )?;
3347 Ok(())
3348}
3349
3350fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3351 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3352 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3353 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3354}
3355
3356fn bool_to_i64(v: bool) -> i64 {
3357 if v { 1 } else { 0 }
3358}
3359
3360fn i64_to_bool(v: i64) -> bool {
3361 v != 0
3362}
3363
3364fn empty_to_none(s: String) -> Option<String> {
3365 if s.is_empty() { None } else { Some(s) }
3366}
3367
3368#[cfg(test)]
3369mod tests {
3370 use super::*;
3371 use serde_json::json;
3372 use std::collections::HashSet;
3373 use tempfile::TempDir;
3374
3375 fn make_session(id: &str) -> SessionRecord {
3376 SessionRecord {
3377 id: id.to_string(),
3378 agent: "cursor".to_string(),
3379 model: None,
3380 workspace: "/ws".to_string(),
3381 started_at_ms: 1000,
3382 ended_at_ms: None,
3383 status: SessionStatus::Done,
3384 trace_path: "/trace".to_string(),
3385 start_commit: None,
3386 end_commit: None,
3387 branch: None,
3388 dirty_start: None,
3389 dirty_end: None,
3390 repo_binding_source: None,
3391 prompt_fingerprint: None,
3392 parent_session_id: None,
3393 agent_version: None,
3394 os: None,
3395 arch: None,
3396 repo_file_count: None,
3397 repo_total_loc: None,
3398 }
3399 }
3400
3401 fn make_event(session_id: &str, seq: u64) -> Event {
3402 Event {
3403 session_id: session_id.to_string(),
3404 seq,
3405 ts_ms: 1000 + seq * 100,
3406 ts_exact: false,
3407 kind: EventKind::ToolCall,
3408 source: EventSource::Tail,
3409 tool: Some("read_file".to_string()),
3410 tool_call_id: Some(format!("call_{seq}")),
3411 tokens_in: None,
3412 tokens_out: None,
3413 reasoning_tokens: None,
3414 cost_usd_e6: None,
3415 stop_reason: None,
3416 latency_ms: None,
3417 ttft_ms: None,
3418 retry_count: None,
3419 context_used_tokens: None,
3420 context_max_tokens: None,
3421 cache_creation_tokens: None,
3422 cache_read_tokens: None,
3423 system_prompt_tokens: None,
3424 payload: json!({}),
3425 }
3426 }
3427
3428 #[test]
3429 fn open_and_wal_mode() {
3430 let dir = TempDir::new().unwrap();
3431 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3432 let mode: String = store
3433 .conn
3434 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3435 .unwrap();
3436 assert_eq!(mode, "wal");
3437 }
3438
3439 #[test]
3440 fn open_applies_phase0_pragmas() {
3441 let dir = TempDir::new().unwrap();
3442 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3443 let synchronous: i64 = store
3444 .conn
3445 .query_row("PRAGMA synchronous", [], |r| r.get(0))
3446 .unwrap();
3447 let cache_size: i64 = store
3448 .conn
3449 .query_row("PRAGMA cache_size", [], |r| r.get(0))
3450 .unwrap();
3451 let temp_store: i64 = store
3452 .conn
3453 .query_row("PRAGMA temp_store", [], |r| r.get(0))
3454 .unwrap();
3455 let wal_autocheckpoint: i64 = store
3456 .conn
3457 .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3458 .unwrap();
3459 assert_eq!(synchronous, 1);
3460 assert_eq!(cache_size, -65_536);
3461 assert_eq!(temp_store, 2);
3462 assert_eq!(wal_autocheckpoint, 1_000);
3463 assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3464 }
3465
3466 #[test]
3467 fn read_only_open_sets_query_only() {
3468 let dir = TempDir::new().unwrap();
3469 let db = dir.path().join("kaizen.db");
3470 Store::open(&db).unwrap();
3471 let store = Store::open_read_only(&db).unwrap();
3472 let query_only: i64 = store
3473 .conn
3474 .query_row("PRAGMA query_only", [], |r| r.get(0))
3475 .unwrap();
3476 assert_eq!(query_only, 1);
3477 }
3478
3479 #[test]
3480 fn phase0_indexes_exist() {
3481 let dir = TempDir::new().unwrap();
3482 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3483 for name in [
3484 "tool_spans_session_idx",
3485 "tool_spans_started_idx",
3486 "session_samples_ts_idx",
3487 "events_ts_idx",
3488 "feedback_session_idx",
3489 ] {
3490 let found: i64 = store
3491 .conn
3492 .query_row(
3493 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3494 params![name],
3495 |r| r.get(0),
3496 )
3497 .unwrap();
3498 assert_eq!(found, 1, "{name}");
3499 }
3500 }
3501
3502 #[test]
3503 fn upsert_and_get_session() {
3504 let dir = TempDir::new().unwrap();
3505 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3506 let s = make_session("s1");
3507 store.upsert_session(&s).unwrap();
3508
3509 let got = store.get_session("s1").unwrap().unwrap();
3510 assert_eq!(got.id, "s1");
3511 assert_eq!(got.status, SessionStatus::Done);
3512 }
3513
3514 #[test]
3515 fn append_and_list_events_round_trip() {
3516 let dir = TempDir::new().unwrap();
3517 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3518 let s = make_session("s2");
3519 store.upsert_session(&s).unwrap();
3520 store.append_event(&make_event("s2", 0)).unwrap();
3521 store.append_event(&make_event("s2", 1)).unwrap();
3522
3523 let sessions = store.list_sessions("/ws").unwrap();
3524 assert_eq!(sessions.len(), 1);
3525 assert_eq!(sessions[0].id, "s2");
3526 }
3527
3528 #[test]
3529 fn list_sessions_page_orders_and_counts() {
3530 let dir = TempDir::new().unwrap();
3531 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3532 let mut a = make_session("a");
3533 a.started_at_ms = 2_000;
3534 let mut b = make_session("b");
3535 b.started_at_ms = 2_000;
3536 let mut c = make_session("c");
3537 c.started_at_ms = 1_000;
3538 store.upsert_session(&c).unwrap();
3539 store.upsert_session(&b).unwrap();
3540 store.upsert_session(&a).unwrap();
3541
3542 let page = store
3543 .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3544 .unwrap();
3545 assert_eq!(page.total, 3);
3546 assert_eq!(page.next_offset, Some(2));
3547 assert_eq!(
3548 page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3549 vec!["a", "b"]
3550 );
3551
3552 let all = store.list_sessions("/ws").unwrap();
3553 assert_eq!(
3554 all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3555 vec!["a", "b", "c"]
3556 );
3557 }
3558
3559 #[test]
3560 fn list_sessions_page_filters_in_sql_shape() {
3561 let dir = TempDir::new().unwrap();
3562 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3563 let mut cursor = make_session("cursor");
3564 cursor.agent = "Cursor".into();
3565 cursor.started_at_ms = 2_000;
3566 cursor.status = SessionStatus::Running;
3567 let mut claude = make_session("claude");
3568 claude.agent = "claude".into();
3569 claude.started_at_ms = 3_000;
3570 store.upsert_session(&cursor).unwrap();
3571 store.upsert_session(&claude).unwrap();
3572
3573 let page = store
3574 .list_sessions_page(
3575 "/ws",
3576 0,
3577 10,
3578 SessionFilter {
3579 agent_prefix: Some("cur".into()),
3580 status: Some(SessionStatus::Running),
3581 since_ms: Some(1_500),
3582 },
3583 )
3584 .unwrap();
3585 assert_eq!(page.total, 1);
3586 assert_eq!(page.rows[0].id, "cursor");
3587 }
3588
3589 #[test]
3590 fn incremental_session_helpers_find_new_rows_and_statuses() {
3591 let dir = TempDir::new().unwrap();
3592 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3593 let mut old = make_session("old");
3594 old.started_at_ms = 1_000;
3595 let mut new = make_session("new");
3596 new.started_at_ms = 2_000;
3597 new.status = SessionStatus::Running;
3598 store.upsert_session(&old).unwrap();
3599 store.upsert_session(&new).unwrap();
3600
3601 let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3602 assert_eq!(rows.len(), 1);
3603 assert_eq!(rows[0].id, "new");
3604
3605 store
3606 .update_session_status("new", SessionStatus::Done)
3607 .unwrap();
3608 let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3609 assert_eq!(statuses.len(), 1);
3610 assert_eq!(statuses[0].status, SessionStatus::Done);
3611 }
3612
3613 #[test]
3614 fn summary_stats_empty() {
3615 let dir = TempDir::new().unwrap();
3616 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3617 let stats = store.summary_stats("/ws").unwrap();
3618 assert_eq!(stats.session_count, 0);
3619 assert_eq!(stats.total_cost_usd_e6, 0);
3620 }
3621
3622 #[test]
3623 fn summary_stats_counts_sessions() {
3624 let dir = TempDir::new().unwrap();
3625 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3626 store.upsert_session(&make_session("a")).unwrap();
3627 store.upsert_session(&make_session("b")).unwrap();
3628 let stats = store.summary_stats("/ws").unwrap();
3629 assert_eq!(stats.session_count, 2);
3630 assert_eq!(stats.by_agent.len(), 1);
3631 assert_eq!(stats.by_agent[0].0, "cursor");
3632 assert_eq!(stats.by_agent[0].1, 2);
3633 }
3634
3635 #[test]
3636 fn list_events_for_session_round_trip() {
3637 let dir = TempDir::new().unwrap();
3638 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3639 store.upsert_session(&make_session("s4")).unwrap();
3640 store.append_event(&make_event("s4", 0)).unwrap();
3641 store.append_event(&make_event("s4", 1)).unwrap();
3642 let events = store.list_events_for_session("s4").unwrap();
3643 assert_eq!(events.len(), 2);
3644 assert_eq!(events[0].seq, 0);
3645 assert_eq!(events[1].seq, 1);
3646 }
3647
3648 #[test]
3649 fn list_events_page_uses_inclusive_seq_cursor() {
3650 let dir = TempDir::new().unwrap();
3651 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3652 store.upsert_session(&make_session("paged")).unwrap();
3653 for seq in 0..5 {
3654 store.append_event(&make_event("paged", seq)).unwrap();
3655 }
3656 let first = store.list_events_page("paged", 0, 2).unwrap();
3657 assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3658 let second = store
3659 .list_events_page("paged", first[1].seq + 1, 2)
3660 .unwrap();
3661 assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3662 }
3663
3664 #[test]
3665 fn append_event_dedup() {
3666 let dir = TempDir::new().unwrap();
3667 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3668 store.upsert_session(&make_session("s5")).unwrap();
3669 store.append_event(&make_event("s5", 0)).unwrap();
3670 store.append_event(&make_event("s5", 0)).unwrap();
3672 let events = store.list_events_for_session("s5").unwrap();
3673 assert_eq!(events.len(), 1);
3674 }
3675
3676 #[test]
3677 fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3678 let dir = TempDir::new().unwrap();
3679 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3680 assert!(store.session_span_tree("missing").unwrap().is_empty());
3681 assert!(store.span_tree_cache.borrow().is_some());
3682
3683 store.upsert_session(&make_session("tree")).unwrap();
3684 let call = make_event("tree", 0);
3685 store.append_event(&call).unwrap();
3686 assert!(store.span_tree_cache.borrow().is_none());
3687 assert!(store.session_span_tree("tree").unwrap().is_empty());
3688 assert!(store.span_tree_cache.borrow().is_some());
3689 let mut result = make_event("tree", 1);
3690 result.kind = EventKind::ToolResult;
3691 result.tool_call_id = call.tool_call_id.clone();
3692 store.append_event(&result).unwrap();
3693 assert!(store.span_tree_cache.borrow().is_none());
3694 let first = store.session_span_tree("tree").unwrap();
3695 assert_eq!(first.len(), 1);
3696 assert!(store.span_tree_cache.borrow().is_some());
3697 store.append_event(&make_event("tree", 2)).unwrap();
3698 assert!(store.span_tree_cache.borrow().is_none());
3699 }
3700
3701 #[test]
3702 fn tool_spans_in_window_uses_started_then_ended_fallback() {
3703 let dir = TempDir::new().unwrap();
3704 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3705 store.upsert_session(&make_session("spans")).unwrap();
3706 for (id, started, ended) in [
3707 ("started", Some(200_i64), None),
3708 ("fallback", None, Some(250_i64)),
3709 ("outside", Some(400_i64), None),
3710 ("too_old", None, Some(50_i64)),
3711 ("started_wins", Some(500_i64), Some(200_i64)),
3712 ] {
3713 store
3714 .conn
3715 .execute(
3716 "INSERT INTO tool_spans
3717 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3718 VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3719 params![id, started, ended],
3720 )
3721 .unwrap();
3722 }
3723 let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3724 let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3725 assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3726 }
3727
3728 #[test]
3729 fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3730 let dir = TempDir::new().unwrap();
3731 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3732 store.upsert_session(&make_session("s1")).unwrap();
3733 for (id, started, ended) in [
3734 ("inside_started", Some(150_i64), None),
3735 ("inside_ended_only", None, Some(220_i64)),
3736 ("after_window", Some(400_i64), None),
3737 ("before_window", None, Some(50_i64)),
3738 ] {
3739 store
3740 .conn
3741 .execute(
3742 "INSERT INTO tool_spans
3743 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3744 VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3745 params![id, started, ended],
3746 )
3747 .unwrap();
3748 }
3749 let rows = store
3750 .tool_spans_sync_rows_in_window("/ws", 100, 300)
3751 .unwrap();
3752 let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
3753 assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
3754 assert!(rows.iter().all(|r| r.session_id == "s1"));
3755 }
3756
3757 #[test]
3758 fn upsert_idempotent() {
3759 let dir = TempDir::new().unwrap();
3760 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3761 let mut s = make_session("s3");
3762 store.upsert_session(&s).unwrap();
3763 s.status = SessionStatus::Running;
3764 store.upsert_session(&s).unwrap();
3765
3766 let got = store.get_session("s3").unwrap().unwrap();
3767 assert_eq!(got.status, SessionStatus::Running);
3768 }
3769
3770 #[test]
3771 fn append_event_indexes_path_from_payload() {
3772 let dir = TempDir::new().unwrap();
3773 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3774 store.upsert_session(&make_session("sx")).unwrap();
3775 let mut ev = make_event("sx", 0);
3776 ev.payload = json!({"input": {"path": "src/lib.rs"}});
3777 store.append_event(&ev).unwrap();
3778 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3779 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3780 }
3781
3782 #[test]
3783 fn update_session_status_changes_status() {
3784 let dir = TempDir::new().unwrap();
3785 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3786 store.upsert_session(&make_session("s6")).unwrap();
3787 store
3788 .update_session_status("s6", SessionStatus::Running)
3789 .unwrap();
3790 let got = store.get_session("s6").unwrap().unwrap();
3791 assert_eq!(got.status, SessionStatus::Running);
3792 }
3793
3794 #[test]
3795 fn prune_sessions_removes_old_rows_and_keeps_recent() {
3796 let dir = TempDir::new().unwrap();
3797 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3798 let mut old = make_session("old");
3799 old.started_at_ms = 1_000;
3800 let mut new = make_session("new");
3801 new.started_at_ms = 9_000_000_000_000;
3802 store.upsert_session(&old).unwrap();
3803 store.upsert_session(&new).unwrap();
3804 store.append_event(&make_event("old", 0)).unwrap();
3805
3806 let stats = store.prune_sessions_started_before(5_000).unwrap();
3807 assert_eq!(
3808 stats,
3809 PruneStats {
3810 sessions_removed: 1,
3811 events_removed: 1,
3812 }
3813 );
3814 assert!(store.get_session("old").unwrap().is_none());
3815 assert!(store.get_session("new").unwrap().is_some());
3816 let sessions = store.list_sessions("/ws").unwrap();
3817 assert_eq!(sessions.len(), 1);
3818 assert_eq!(sessions[0].id, "new");
3819 }
3820
3821 #[test]
3822 fn append_event_indexes_rules_from_payload() {
3823 let dir = TempDir::new().unwrap();
3824 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3825 store.upsert_session(&make_session("sr")).unwrap();
3826 let mut ev = make_event("sr", 0);
3827 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3828 store.append_event(&ev).unwrap();
3829 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3830 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3831 }
3832
3833 #[test]
3834 fn guidance_report_counts_skill_and_rule_sessions() {
3835 let dir = TempDir::new().unwrap();
3836 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3837 store.upsert_session(&make_session("sx")).unwrap();
3838 let mut ev = make_event("sx", 0);
3839 ev.payload =
3840 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3841 ev.cost_usd_e6 = Some(500_000);
3842 store.append_event(&ev).unwrap();
3843
3844 let mut skill_slugs = HashSet::new();
3845 skill_slugs.insert("tdd".into());
3846 let mut rule_slugs = HashSet::new();
3847 rule_slugs.insert("style".into());
3848
3849 let rep = store
3850 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3851 .unwrap();
3852 assert_eq!(rep.sessions_in_window, 1);
3853 let tdd = rep
3854 .rows
3855 .iter()
3856 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3857 .unwrap();
3858 assert_eq!(tdd.sessions, 1);
3859 assert!(tdd.on_disk);
3860 let style = rep
3861 .rows
3862 .iter()
3863 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3864 .unwrap();
3865 assert_eq!(style.sessions, 1);
3866 assert!(style.on_disk);
3867 }
3868
3869 #[test]
3870 fn prune_sessions_removes_rules_used_rows() {
3871 let dir = TempDir::new().unwrap();
3872 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3873 let mut old = make_session("old_r");
3874 old.started_at_ms = 1_000;
3875 store.upsert_session(&old).unwrap();
3876 let mut ev = make_event("old_r", 0);
3877 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3878 store.append_event(&ev).unwrap();
3879
3880 store.prune_sessions_started_before(5_000).unwrap();
3881 let n: i64 = store
3882 .conn
3883 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3884 .unwrap();
3885 assert_eq!(n, 0);
3886 }
3887}