1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
9use crate::store::tool_span_index::{
10 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
11};
12use crate::sync::context::SyncIngestContext;
13use crate::sync::outbound::outbound_event_from_row;
14use crate::sync::redact::redact_payload;
15use crate::sync::smart::enqueue_tool_spans_for_session;
16use anyhow::{Context, Result};
17use rusqlite::types::Value;
18use rusqlite::{
19 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
20};
21use std::cell::RefCell;
22use std::collections::{HashMap, HashSet};
23use std::path::Path;
24
25const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
28const DEFAULT_MMAP_MB: u64 = 256;
29const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
30 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
31 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
32 repo_file_count, repo_total_loc FROM sessions";
33
34const MIGRATIONS: &[&str] = &[
35 "CREATE TABLE IF NOT EXISTS sessions (
36 id TEXT PRIMARY KEY,
37 agent TEXT NOT NULL,
38 model TEXT,
39 workspace TEXT NOT NULL,
40 started_at_ms INTEGER NOT NULL,
41 ended_at_ms INTEGER,
42 status TEXT NOT NULL,
43 trace_path TEXT NOT NULL
44 )",
45 "CREATE TABLE IF NOT EXISTS events (
46 id INTEGER PRIMARY KEY AUTOINCREMENT,
47 session_id TEXT NOT NULL,
48 seq INTEGER NOT NULL,
49 ts_ms INTEGER NOT NULL,
50 kind TEXT NOT NULL,
51 source TEXT NOT NULL,
52 tool TEXT,
53 tokens_in INTEGER,
54 tokens_out INTEGER,
55 cost_usd_e6 INTEGER,
56 payload TEXT NOT NULL
57 )",
58 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
59 "CREATE TABLE IF NOT EXISTS files_touched (
60 id INTEGER PRIMARY KEY AUTOINCREMENT,
61 session_id TEXT NOT NULL,
62 path TEXT NOT NULL
63 )",
64 "CREATE TABLE IF NOT EXISTS skills_used (
65 id INTEGER PRIMARY KEY AUTOINCREMENT,
66 session_id TEXT NOT NULL,
67 skill TEXT NOT NULL
68 )",
69 "CREATE TABLE IF NOT EXISTS sync_outbox (
70 id INTEGER PRIMARY KEY AUTOINCREMENT,
71 session_id TEXT NOT NULL,
72 payload TEXT NOT NULL,
73 sent INTEGER NOT NULL DEFAULT 0
74 )",
75 "CREATE TABLE IF NOT EXISTS experiments (
76 id TEXT PRIMARY KEY,
77 name TEXT NOT NULL,
78 created_at_ms INTEGER NOT NULL,
79 metadata TEXT NOT NULL DEFAULT '{}'
80 )",
81 "CREATE TABLE IF NOT EXISTS experiment_tags (
82 experiment_id TEXT NOT NULL,
83 session_id TEXT NOT NULL,
84 variant TEXT NOT NULL,
85 PRIMARY KEY (experiment_id, session_id)
86 )",
87 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
88 "CREATE TABLE IF NOT EXISTS sync_state (
89 k TEXT PRIMARY KEY,
90 v TEXT NOT NULL
91 )",
92 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
93 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
94 "CREATE TABLE IF NOT EXISTS tool_spans (
95 span_id TEXT PRIMARY KEY,
96 session_id TEXT NOT NULL,
97 tool TEXT,
98 tool_call_id TEXT,
99 status TEXT NOT NULL,
100 started_at_ms INTEGER,
101 ended_at_ms INTEGER,
102 lead_time_ms INTEGER,
103 tokens_in INTEGER,
104 tokens_out INTEGER,
105 reasoning_tokens INTEGER,
106 cost_usd_e6 INTEGER,
107 paths_json TEXT NOT NULL DEFAULT '[]'
108 )",
109 "CREATE TABLE IF NOT EXISTS tool_span_paths (
110 span_id TEXT NOT NULL,
111 path TEXT NOT NULL,
112 PRIMARY KEY (span_id, path)
113 )",
114 "CREATE TABLE IF NOT EXISTS session_repo_binding (
115 session_id TEXT PRIMARY KEY,
116 start_commit TEXT,
117 end_commit TEXT,
118 branch TEXT,
119 dirty_start INTEGER,
120 dirty_end INTEGER,
121 repo_binding_source TEXT NOT NULL DEFAULT ''
122 )",
123 "CREATE TABLE IF NOT EXISTS repo_snapshots (
124 id TEXT PRIMARY KEY,
125 workspace TEXT NOT NULL,
126 head_commit TEXT,
127 dirty_fingerprint TEXT NOT NULL,
128 analyzer_version TEXT NOT NULL,
129 indexed_at_ms INTEGER NOT NULL,
130 dirty INTEGER NOT NULL DEFAULT 0,
131 graph_path TEXT NOT NULL
132 )",
133 "CREATE TABLE IF NOT EXISTS file_facts (
134 snapshot_id TEXT NOT NULL,
135 path TEXT NOT NULL,
136 language TEXT NOT NULL,
137 bytes INTEGER NOT NULL,
138 loc INTEGER NOT NULL,
139 sloc INTEGER NOT NULL,
140 complexity_total INTEGER NOT NULL,
141 max_fn_complexity INTEGER NOT NULL,
142 symbol_count INTEGER NOT NULL,
143 import_count INTEGER NOT NULL,
144 fan_in INTEGER NOT NULL,
145 fan_out INTEGER NOT NULL,
146 churn_30d INTEGER NOT NULL,
147 churn_90d INTEGER NOT NULL,
148 authors_90d INTEGER NOT NULL,
149 last_changed_ms INTEGER,
150 PRIMARY KEY (snapshot_id, path)
151 )",
152 "CREATE TABLE IF NOT EXISTS repo_edges (
153 snapshot_id TEXT NOT NULL,
154 from_id TEXT NOT NULL,
155 to_id TEXT NOT NULL,
156 kind TEXT NOT NULL,
157 weight INTEGER NOT NULL,
158 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
159 )",
160 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
162 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
164 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
165 ON sessions(workspace, started_at_ms DESC, id ASC)",
166 "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
167 ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
168 "CREATE TABLE IF NOT EXISTS rules_used (
169 id INTEGER PRIMARY KEY AUTOINCREMENT,
170 session_id TEXT NOT NULL,
171 rule TEXT NOT NULL
172 )",
173 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
174 "CREATE TABLE IF NOT EXISTS remote_pull_state (
176 id INTEGER PRIMARY KEY CHECK (id = 1),
177 query_provider TEXT NOT NULL DEFAULT 'none',
178 cursor_json TEXT NOT NULL DEFAULT '',
179 last_success_ms INTEGER
180 )",
181 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
182 "CREATE TABLE IF NOT EXISTS remote_sessions (
183 team_id TEXT NOT NULL,
184 workspace_hash TEXT NOT NULL,
185 session_id_hash TEXT NOT NULL,
186 json TEXT NOT NULL,
187 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
188 )",
189 "CREATE TABLE IF NOT EXISTS remote_events (
190 team_id TEXT NOT NULL,
191 workspace_hash TEXT NOT NULL,
192 session_id_hash TEXT NOT NULL,
193 event_seq INTEGER NOT NULL,
194 json TEXT NOT NULL,
195 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
196 )",
197 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
198 team_id TEXT NOT NULL,
199 workspace_hash TEXT NOT NULL,
200 span_id_hash TEXT NOT NULL,
201 json TEXT NOT NULL,
202 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
203 )",
204 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
205 team_id TEXT NOT NULL,
206 workspace_hash TEXT NOT NULL,
207 snapshot_id_hash TEXT NOT NULL,
208 chunk_index INTEGER NOT NULL,
209 json TEXT NOT NULL,
210 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
211 )",
212 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
213 team_id TEXT NOT NULL,
214 workspace_hash TEXT NOT NULL,
215 fact_key TEXT NOT NULL,
216 json TEXT NOT NULL,
217 PRIMARY KEY (team_id, workspace_hash, fact_key)
218 )",
219 "CREATE TABLE IF NOT EXISTS session_evals (
220 id TEXT PRIMARY KEY,
221 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
222 judge_model TEXT NOT NULL,
223 rubric_id TEXT NOT NULL,
224 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
225 rationale TEXT NOT NULL,
226 flagged INTEGER NOT NULL DEFAULT 0,
227 created_at_ms INTEGER NOT NULL
228 );
229 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
230 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
231 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
232 fingerprint TEXT PRIMARY KEY,
233 captured_at_ms INTEGER NOT NULL,
234 files_json TEXT NOT NULL,
235 total_bytes INTEGER NOT NULL
236 )",
237 "CREATE TABLE IF NOT EXISTS session_feedback (
238 id TEXT PRIMARY KEY,
239 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
240 score INTEGER CHECK(score BETWEEN 1 AND 5),
241 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
242 note TEXT,
243 created_at_ms INTEGER NOT NULL
244 );
245 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
246 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
247 "CREATE TABLE IF NOT EXISTS session_outcomes (
248 session_id TEXT PRIMARY KEY NOT NULL,
249 test_passed INTEGER,
250 test_failed INTEGER,
251 test_skipped INTEGER,
252 build_ok INTEGER,
253 lint_errors INTEGER,
254 revert_lines_14d INTEGER,
255 pr_open INTEGER,
256 ci_ok INTEGER,
257 measured_at_ms INTEGER NOT NULL,
258 measure_error TEXT
259 )",
260 "CREATE TABLE IF NOT EXISTS session_samples (
261 session_id TEXT NOT NULL,
262 ts_ms INTEGER NOT NULL,
263 pid INTEGER NOT NULL,
264 cpu_percent REAL,
265 rss_bytes INTEGER,
266 PRIMARY KEY (session_id, ts_ms, pid)
267 )",
268 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
269 "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
270 "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
271 "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
272 "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
273 "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
274 "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
275];
276
277#[derive(Clone)]
279pub struct InsightsStats {
280 pub total_sessions: u64,
281 pub running_sessions: u64,
282 pub total_events: u64,
283 pub sessions_by_day: Vec<(String, u64)>,
285 pub recent: Vec<(SessionRecord, u64)>,
287 pub top_tools: Vec<(String, u64)>,
289 pub total_cost_usd_e6: i64,
290 pub sessions_with_cost: u64,
291}
292
293pub struct SyncStatusSnapshot {
295 pub pending_outbox: u64,
296 pub last_success_ms: Option<u64>,
297 pub last_error: Option<String>,
298 pub consecutive_failures: u32,
299}
300
301#[derive(serde::Serialize)]
303pub struct SummaryStats {
304 pub session_count: u64,
305 pub total_cost_usd_e6: i64,
306 pub by_agent: Vec<(String, u64)>,
307 pub by_model: Vec<(String, u64)>,
308 pub top_tools: Vec<(String, u64)>,
309}
310
311#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
313#[serde(rename_all = "lowercase")]
314pub enum GuidanceKind {
315 Skill,
316 Rule,
317}
318
319#[derive(Clone, Debug, serde::Serialize)]
321pub struct GuidancePerfRow {
322 pub kind: GuidanceKind,
323 pub id: String,
324 pub sessions: u64,
325 pub sessions_pct: f64,
326 pub total_cost_usd_e6: i64,
327 pub avg_cost_per_session_usd: Option<f64>,
328 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
329 pub on_disk: bool,
330}
331
332#[derive(Clone, Debug, serde::Serialize)]
334pub struct GuidanceReport {
335 pub workspace: String,
336 pub window_start_ms: u64,
337 pub window_end_ms: u64,
338 pub sessions_in_window: u64,
339 pub workspace_avg_cost_per_session_usd: Option<f64>,
340 pub rows: Vec<GuidancePerfRow>,
341}
342
343#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
345pub struct PruneStats {
346 pub sessions_removed: u64,
347 pub events_removed: u64,
348}
349
350#[derive(Debug, Clone, Eq, PartialEq)]
352pub struct SessionOutcomeRow {
353 pub session_id: String,
354 pub test_passed: Option<i64>,
355 pub test_failed: Option<i64>,
356 pub test_skipped: Option<i64>,
357 pub build_ok: Option<bool>,
358 pub lint_errors: Option<i64>,
359 pub revert_lines_14d: Option<i64>,
360 pub pr_open: Option<i64>,
361 pub ci_ok: Option<bool>,
362 pub measured_at_ms: u64,
363 pub measure_error: Option<String>,
364}
365
366#[derive(Debug, Clone)]
368pub struct SessionSampleAgg {
369 pub session_id: String,
370 pub sample_count: u64,
371 pub max_cpu_percent: f64,
372 pub max_rss_bytes: u64,
373}
374
375pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
377pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
378
379pub struct ToolSpanSyncRow {
380 pub span_id: String,
381 pub session_id: String,
382 pub tool: Option<String>,
383 pub tool_call_id: Option<String>,
384 pub status: String,
385 pub started_at_ms: Option<u64>,
386 pub ended_at_ms: Option<u64>,
387 pub lead_time_ms: Option<u64>,
388 pub tokens_in: Option<u32>,
389 pub tokens_out: Option<u32>,
390 pub reasoning_tokens: Option<u32>,
391 pub cost_usd_e6: Option<i64>,
392 pub paths: Vec<String>,
393}
394
395#[derive(Debug, Clone, Copy, Eq, PartialEq)]
396pub enum StoreOpenMode {
397 ReadWrite,
398 ReadOnlyQuery,
399}
400
401#[derive(Debug, Clone)]
402pub struct SessionStatusRow {
403 pub id: String,
404 pub status: SessionStatus,
405 pub ended_at_ms: Option<u64>,
406}
407
408#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
409pub struct SessionFilter {
410 pub agent_prefix: Option<String>,
411 pub status: Option<SessionStatus>,
412 pub since_ms: Option<u64>,
413}
414
415#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
416pub struct SessionPage {
417 pub rows: Vec<SessionRecord>,
418 pub total: usize,
419 pub next_offset: Option<usize>,
420}
421
422#[derive(Clone)]
423struct SpanTreeCacheEntry {
424 session_id: String,
425 last_event_seq: Option<u64>,
426 nodes: Vec<crate::store::span_tree::SpanNode>,
427}
428
429pub struct Store {
430 conn: Connection,
431 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
432 projector: RefCell<Projector>,
433}
434
435impl Store {
436 pub(crate) fn conn(&self) -> &Connection {
437 &self.conn
438 }
439
440 pub fn open(path: &Path) -> Result<Self> {
441 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
442 }
443
444 pub fn open_read_only(path: &Path) -> Result<Self> {
445 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
446 }
447
448 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
449 if let Some(parent) = path.parent() {
450 std::fs::create_dir_all(parent)?;
451 }
452 let conn = match mode {
453 StoreOpenMode::ReadWrite => Connection::open(path),
454 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
455 path,
456 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
457 ),
458 }
459 .with_context(|| format!("open db: {}", path.display()))?;
460 apply_pragmas(&conn, mode)?;
461 if mode == StoreOpenMode::ReadWrite {
462 for sql in MIGRATIONS {
463 conn.execute_batch(sql)?;
464 }
465 ensure_schema_columns(&conn)?;
466 }
467 let store = Self {
468 conn,
469 span_tree_cache: RefCell::new(None),
470 projector: RefCell::new(Projector::default()),
471 };
472 if mode == StoreOpenMode::ReadWrite {
473 store.warm_projector()?;
474 }
475 Ok(store)
476 }
477
478 fn invalidate_span_tree_cache(&self) {
479 *self.span_tree_cache.borrow_mut() = None;
480 }
481
482 fn warm_projector(&self) -> Result<()> {
483 let ids = self.running_session_ids()?;
484 let mut projector = self.projector.borrow_mut();
485 for id in ids {
486 for event in self.list_events_for_session(&id)? {
487 let _ = projector.apply(&event);
488 }
489 }
490 Ok(())
491 }
492
493 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
494 self.conn.execute(
495 "INSERT INTO sessions (
496 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
497 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
498 prompt_fingerprint, parent_session_id, agent_version, os, arch,
499 repo_file_count, repo_total_loc
500 )
501 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
502 ?16, ?17, ?18, ?19, ?20, ?21)
503 ON CONFLICT(id) DO UPDATE SET
504 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
505 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
506 status=excluded.status, trace_path=excluded.trace_path,
507 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
508 branch=excluded.branch, dirty_start=excluded.dirty_start,
509 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
510 prompt_fingerprint=excluded.prompt_fingerprint,
511 parent_session_id=excluded.parent_session_id,
512 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
513 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
514 params![
515 s.id,
516 s.agent,
517 s.model,
518 s.workspace,
519 s.started_at_ms as i64,
520 s.ended_at_ms.map(|v| v as i64),
521 format!("{:?}", s.status),
522 s.trace_path,
523 s.start_commit,
524 s.end_commit,
525 s.branch,
526 s.dirty_start.map(bool_to_i64),
527 s.dirty_end.map(bool_to_i64),
528 s.repo_binding_source.clone().unwrap_or_default(),
529 s.prompt_fingerprint.as_deref(),
530 s.parent_session_id.as_deref(),
531 s.agent_version.as_deref(),
532 s.os.as_deref(),
533 s.arch.as_deref(),
534 s.repo_file_count.map(|v| v as i64),
535 s.repo_total_loc.map(|v| v as i64),
536 ],
537 )?;
538 self.conn.execute(
539 "INSERT INTO session_repo_binding (
540 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
541 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
542 ON CONFLICT(session_id) DO UPDATE SET
543 start_commit=excluded.start_commit,
544 end_commit=excluded.end_commit,
545 branch=excluded.branch,
546 dirty_start=excluded.dirty_start,
547 dirty_end=excluded.dirty_end,
548 repo_binding_source=excluded.repo_binding_source",
549 params![
550 s.id,
551 s.start_commit,
552 s.end_commit,
553 s.branch,
554 s.dirty_start.map(bool_to_i64),
555 s.dirty_end.map(bool_to_i64),
556 s.repo_binding_source.clone().unwrap_or_default(),
557 ],
558 )?;
559 Ok(())
560 }
561
562 pub fn ensure_session_stub(
565 &self,
566 id: &str,
567 agent: &str,
568 workspace: &str,
569 started_at_ms: u64,
570 ) -> Result<()> {
571 self.conn.execute(
572 "INSERT OR IGNORE INTO sessions (
573 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
574 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
575 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
576 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
577 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
578 params![id, agent, workspace, started_at_ms as i64],
579 )?;
580 Ok(())
581 }
582
583 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
585 let n: i64 = self.conn.query_row(
586 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
587 [session_id],
588 |r| r.get(0),
589 )?;
590 Ok(n as u64)
591 }
592
593 pub fn append_event(&self, e: &Event) -> Result<()> {
594 self.append_event_with_sync(e, None)
595 }
596
597 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
599 let last_before = if projector_legacy_mode() {
600 None
601 } else {
602 self.last_event_seq_for_session(&e.session_id)?
603 };
604 let payload = serde_json::to_string(&e.payload)?;
605 self.conn.execute(
606 "INSERT INTO events (
607 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
608 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
609 stop_reason, latency_ms, ttft_ms, retry_count,
610 context_used_tokens, context_max_tokens,
611 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
612 ) VALUES (
613 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
614 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
615 )
616 ON CONFLICT(session_id, seq) DO UPDATE SET
617 ts_ms = excluded.ts_ms,
618 ts_exact = excluded.ts_exact,
619 kind = excluded.kind,
620 source = excluded.source,
621 tool = excluded.tool,
622 tool_call_id = excluded.tool_call_id,
623 tokens_in = excluded.tokens_in,
624 tokens_out = excluded.tokens_out,
625 reasoning_tokens = excluded.reasoning_tokens,
626 cost_usd_e6 = excluded.cost_usd_e6,
627 payload = excluded.payload,
628 stop_reason = excluded.stop_reason,
629 latency_ms = excluded.latency_ms,
630 ttft_ms = excluded.ttft_ms,
631 retry_count = excluded.retry_count,
632 context_used_tokens = excluded.context_used_tokens,
633 context_max_tokens = excluded.context_max_tokens,
634 cache_creation_tokens = excluded.cache_creation_tokens,
635 cache_read_tokens = excluded.cache_read_tokens,
636 system_prompt_tokens = excluded.system_prompt_tokens",
637 params![
638 e.session_id,
639 e.seq as i64,
640 e.ts_ms as i64,
641 bool_to_i64(e.ts_exact),
642 format!("{:?}", e.kind),
643 format!("{:?}", e.source),
644 e.tool,
645 e.tool_call_id,
646 e.tokens_in.map(|v| v as i64),
647 e.tokens_out.map(|v| v as i64),
648 e.reasoning_tokens.map(|v| v as i64),
649 e.cost_usd_e6,
650 payload,
651 e.stop_reason,
652 e.latency_ms.map(|v| v as i64),
653 e.ttft_ms.map(|v| v as i64),
654 e.retry_count.map(|v| v as i64),
655 e.context_used_tokens.map(|v| v as i64),
656 e.context_max_tokens.map(|v| v as i64),
657 e.cache_creation_tokens.map(|v| v as i64),
658 e.cache_read_tokens.map(|v| v as i64),
659 e.system_prompt_tokens.map(|v| v as i64),
660 ],
661 )?;
662 if self.conn.changes() == 0 {
663 return Ok(());
664 }
665 if projector_legacy_mode() {
666 index_event_derived(&self.conn, e)?;
667 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
668 self.invalidate_span_tree_cache();
669 } else if last_before.is_some_and(|last| e.seq <= last) {
670 self.replay_projector_session(&e.session_id)?;
671 } else {
672 let deltas = self.projector.borrow_mut().apply(e);
673 self.apply_projector_events(&deltas)?;
674 let expired = self
675 .projector
676 .borrow_mut()
677 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
678 self.apply_projector_events(&expired)?;
679 if is_stop_event(e) {
680 let flushed = self
681 .projector
682 .borrow_mut()
683 .flush_session(&e.session_id, e.ts_ms);
684 self.apply_projector_events(&flushed)?;
685 }
686 self.invalidate_span_tree_cache();
687 }
688 let Some(ctx) = ctx else {
689 return Ok(());
690 };
691 let sync = &ctx.sync;
692 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
693 return Ok(());
694 }
695 let Some(salt) = try_team_salt(sync) else {
696 tracing::warn!(
697 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
698 );
699 return Ok(());
700 };
701 if sync.sample_rate < 1.0 {
702 let u: f64 = rand::random();
703 if u > sync.sample_rate {
704 return Ok(());
705 }
706 }
707 let Some(session) = self.get_session(&e.session_id)? else {
708 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
709 return Ok(());
710 };
711 let mut outbound = outbound_event_from_row(e, &session, &salt);
712 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
713 let row = serde_json::to_string(&outbound)?;
714 self.conn.execute(
715 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
716 params![e.session_id, row],
717 )?;
718 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
719 Ok(())
720 }
721
722 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
723 if projector_legacy_mode() {
724 rebuild_tool_spans_for_session(&self.conn, session_id)?;
725 self.invalidate_span_tree_cache();
726 return Ok(());
727 }
728 let deltas = self
729 .projector
730 .borrow_mut()
731 .flush_session(session_id, now_ms);
732 if self.apply_projector_events(&deltas)? {
733 self.invalidate_span_tree_cache();
734 }
735 Ok(())
736 }
737
738 fn replay_projector_session(&self, session_id: &str) -> Result<()> {
739 clear_session_spans(&self.conn, session_id)?;
740 self.projector.borrow_mut().reset_session(session_id);
741 let events = self.list_events_for_session(session_id)?;
742 let mut changed = false;
743 for event in &events {
744 let deltas = self.projector.borrow_mut().apply(event);
745 changed |= self.apply_projector_events(&deltas)?;
746 }
747 if self
748 .get_session(session_id)?
749 .is_some_and(|session| session.status == SessionStatus::Done)
750 {
751 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
752 let deltas = self
753 .projector
754 .borrow_mut()
755 .flush_session(session_id, now_ms);
756 changed |= self.apply_projector_events(&deltas)?;
757 }
758 if changed {
759 self.invalidate_span_tree_cache();
760 }
761 Ok(())
762 }
763
764 fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
765 let mut changed = false;
766 for delta in deltas {
767 match delta {
768 ProjectorEvent::SpanClosed(span, sample) => {
769 upsert_tool_span_record(&self.conn, span)?;
770 tracing::debug!(
771 session_id = %sample.session_id,
772 span_id = %sample.span_id,
773 tool = ?sample.tool,
774 lead_time_ms = ?sample.lead_time_ms,
775 tokens_in = ?sample.tokens_in,
776 tokens_out = ?sample.tokens_out,
777 reasoning_tokens = ?sample.reasoning_tokens,
778 cost_usd_e6 = ?sample.cost_usd_e6,
779 paths = ?sample.paths,
780 "tool span closed"
781 );
782 changed = true;
783 }
784 ProjectorEvent::SpanPatched(span) => {
785 upsert_tool_span_record(&self.conn, span)?;
786 changed = true;
787 }
788 ProjectorEvent::FileTouched { session, path } => {
789 self.conn.execute(
790 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
791 params![session, path],
792 )?;
793 changed = true;
794 }
795 ProjectorEvent::SkillUsed { session, skill } => {
796 self.conn.execute(
797 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
798 params![session, skill],
799 )?;
800 changed = true;
801 }
802 ProjectorEvent::RuleUsed { session, rule } => {
803 self.conn.execute(
804 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
805 params![session, rule],
806 )?;
807 changed = true;
808 }
809 }
810 }
811 Ok(changed)
812 }
813
814 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
815 let mut stmt = self.conn.prepare(
816 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
817 )?;
818 let rows = stmt.query_map(params![limit as i64], |row| {
819 Ok((
820 row.get::<_, i64>(0)?,
821 row.get::<_, String>(1)?,
822 row.get::<_, String>(2)?,
823 ))
824 })?;
825 let mut out = Vec::new();
826 for r in rows {
827 out.push(r?);
828 }
829 Ok(out)
830 }
831
832 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
833 for id in ids {
834 self.conn
835 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
836 }
837 Ok(())
838 }
839
840 pub fn replace_outbox_rows(
841 &self,
842 owner_id: &str,
843 kind: &str,
844 payloads: &[String],
845 ) -> Result<()> {
846 self.conn.execute(
847 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
848 params![owner_id, kind],
849 )?;
850 for payload in payloads {
851 self.conn.execute(
852 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
853 params![owner_id, kind, payload],
854 )?;
855 }
856 Ok(())
857 }
858
859 pub fn outbox_pending_count(&self) -> Result<u64> {
860 let c: i64 =
861 self.conn
862 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
863 r.get(0)
864 })?;
865 Ok(c as u64)
866 }
867
868 pub fn set_sync_state_ok(&self) -> Result<()> {
869 let now = now_ms().to_string();
870 self.conn.execute(
871 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
872 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
873 params![now],
874 )?;
875 self.conn.execute(
876 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
877 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
878 [],
879 )?;
880 self.conn
881 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
882 Ok(())
883 }
884
885 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
886 let prev: i64 = self
887 .conn
888 .query_row(
889 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
890 [],
891 |r| {
892 let s: String = r.get(0)?;
893 Ok(s.parse::<i64>().unwrap_or(0))
894 },
895 )
896 .optional()?
897 .unwrap_or(0);
898 let next = prev.saturating_add(1);
899 self.conn.execute(
900 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
901 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
902 params![msg],
903 )?;
904 self.conn.execute(
905 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
906 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
907 params![next.to_string()],
908 )?;
909 Ok(())
910 }
911
912 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
913 let pending_outbox = self.outbox_pending_count()?;
914 let last_success_ms = self
915 .conn
916 .query_row(
917 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
918 [],
919 |r| r.get::<_, String>(0),
920 )
921 .optional()?
922 .and_then(|s| s.parse().ok());
923 let last_error = self
924 .conn
925 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
926 r.get::<_, String>(0)
927 })
928 .optional()?;
929 let consecutive_failures = self
930 .conn
931 .query_row(
932 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
933 [],
934 |r| r.get::<_, String>(0),
935 )
936 .optional()?
937 .and_then(|s| s.parse().ok())
938 .unwrap_or(0);
939 Ok(SyncStatusSnapshot {
940 pending_outbox,
941 last_success_ms,
942 last_error,
943 consecutive_failures,
944 })
945 }
946
947 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
948 let row: Option<String> = self
949 .conn
950 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
951 r.get::<_, String>(0)
952 })
953 .optional()?;
954 Ok(row.and_then(|s| s.parse().ok()))
955 }
956
957 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
958 self.conn.execute(
959 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
960 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
961 params![key, v.to_string()],
962 )?;
963 Ok(())
964 }
965
966 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
968 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
969 let sessions_to_remove: i64 = tx.query_row(
970 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
971 params![cutoff_ms],
972 |r| r.get(0),
973 )?;
974 let events_to_remove: i64 = tx.query_row(
975 "SELECT COUNT(*) FROM events WHERE session_id IN \
976 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
977 params![cutoff_ms],
978 |r| r.get(0),
979 )?;
980
981 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
982 tx.execute(
983 &format!(
984 "DELETE FROM tool_span_paths WHERE span_id IN \
985 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
986 ),
987 params![cutoff_ms],
988 )?;
989 tx.execute(
990 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
991 params![cutoff_ms],
992 )?;
993 tx.execute(
994 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
995 params![cutoff_ms],
996 )?;
997 tx.execute(
998 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
999 params![cutoff_ms],
1000 )?;
1001 tx.execute(
1002 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1003 params![cutoff_ms],
1004 )?;
1005 tx.execute(
1006 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1007 params![cutoff_ms],
1008 )?;
1009 tx.execute(
1010 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1011 params![cutoff_ms],
1012 )?;
1013 tx.execute(
1014 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1015 params![cutoff_ms],
1016 )?;
1017 tx.execute(
1018 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1019 params![cutoff_ms],
1020 )?;
1021 tx.execute(
1022 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1023 params![cutoff_ms],
1024 )?;
1025 tx.execute(
1026 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1027 params![cutoff_ms],
1028 )?;
1029 tx.execute(
1030 "DELETE FROM sessions WHERE started_at_ms < ?1",
1031 params![cutoff_ms],
1032 )?;
1033 tx.commit()?;
1034 self.invalidate_span_tree_cache();
1035 Ok(PruneStats {
1036 sessions_removed: sessions_to_remove as u64,
1037 events_removed: events_to_remove as u64,
1038 })
1039 }
1040
1041 pub fn vacuum(&self) -> Result<()> {
1043 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1044 Ok(())
1045 }
1046
1047 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1048 Ok(self
1049 .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1050 .rows)
1051 }
1052
1053 pub fn list_sessions_page(
1054 &self,
1055 workspace: &str,
1056 offset: usize,
1057 limit: usize,
1058 filter: SessionFilter,
1059 ) -> Result<SessionPage> {
1060 let (where_sql, args) = session_filter_sql(workspace, &filter);
1061 let total = self.query_session_page_count(&where_sql, &args)?;
1062 let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1063 let next = offset.saturating_add(rows.len());
1064 Ok(SessionPage {
1065 rows,
1066 total,
1067 next_offset: (next < total).then_some(next),
1068 })
1069 }
1070
1071 fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1072 let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1073 let total: i64 = self
1074 .conn
1075 .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1076 Ok(total as usize)
1077 }
1078
1079 fn query_session_page_rows(
1080 &self,
1081 where_sql: &str,
1082 args: &[Value],
1083 offset: usize,
1084 limit: usize,
1085 ) -> Result<Vec<SessionRecord>> {
1086 let sql = format!(
1087 "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1088 );
1089 let mut values = args.to_vec();
1090 values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1091 values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1092 let mut stmt = self.conn.prepare(&sql)?;
1093 let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1094 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1095 }
1096
1097 pub fn list_sessions_started_after(
1098 &self,
1099 workspace: &str,
1100 after_started_at_ms: u64,
1101 ) -> Result<Vec<SessionRecord>> {
1102 let mut stmt = self.conn.prepare(
1103 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1104 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1105 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1106 repo_file_count, repo_total_loc
1107 FROM sessions
1108 WHERE workspace = ?1 AND started_at_ms > ?2
1109 ORDER BY started_at_ms DESC, id ASC",
1110 )?;
1111 let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1112 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1113 }
1114
1115 pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1116 if ids.is_empty() {
1117 return Ok(Vec::new());
1118 }
1119 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1120 let sql =
1121 format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1122 let mut stmt = self.conn.prepare(&sql)?;
1123 let params: Vec<&dyn rusqlite::ToSql> =
1124 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1125 let rows = stmt.query_map(params.as_slice(), |r| {
1126 let status: String = r.get(1)?;
1127 Ok(SessionStatusRow {
1128 id: r.get(0)?,
1129 status: status_from_str(&status),
1130 ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1131 })
1132 })?;
1133 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1134 }
1135
1136 fn running_session_ids(&self) -> Result<Vec<String>> {
1137 let mut stmt = self
1138 .conn
1139 .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1140 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1141 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1142 }
1143
1144 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1145 let session_count: i64 = self.conn.query_row(
1146 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1147 params![workspace],
1148 |r| r.get(0),
1149 )?;
1150
1151 let total_cost: i64 = self.conn.query_row(
1152 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1153 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1154 params![workspace],
1155 |r| r.get(0),
1156 )?;
1157
1158 let mut stmt = self.conn.prepare(
1159 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1160 )?;
1161 let by_agent: Vec<(String, u64)> = stmt
1162 .query_map(params![workspace], |r| {
1163 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1164 })?
1165 .filter_map(|r| r.ok())
1166 .collect();
1167
1168 let mut stmt = self.conn.prepare(
1169 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1170 )?;
1171 let by_model: Vec<(String, u64)> = stmt
1172 .query_map(params![workspace], |r| {
1173 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1174 })?
1175 .filter_map(|r| r.ok())
1176 .collect();
1177
1178 let mut stmt = self.conn.prepare(
1179 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1180 WHERE s.workspace = ?1 AND tool IS NOT NULL
1181 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1182 )?;
1183 let top_tools: Vec<(String, u64)> = stmt
1184 .query_map(params![workspace], |r| {
1185 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1186 })?
1187 .filter_map(|r| r.ok())
1188 .collect();
1189
1190 Ok(SummaryStats {
1191 session_count: session_count as u64,
1192 total_cost_usd_e6: total_cost,
1193 by_agent,
1194 by_model,
1195 top_tools,
1196 })
1197 }
1198
1199 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1200 self.list_events_page(session_id, 0, i64::MAX as usize)
1201 }
1202
1203 pub fn list_events_page(
1204 &self,
1205 session_id: &str,
1206 after_seq: u64,
1207 limit: usize,
1208 ) -> Result<Vec<Event>> {
1209 let mut stmt = self.conn.prepare(
1210 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1211 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1212 stop_reason, latency_ms, ttft_ms, retry_count,
1213 context_used_tokens, context_max_tokens,
1214 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1215 FROM events
1216 WHERE session_id = ?1 AND seq >= ?2
1217 ORDER BY seq ASC LIMIT ?3",
1218 )?;
1219 let rows = stmt.query_map(
1220 params![
1221 session_id,
1222 after_seq as i64,
1223 limit.min(i64::MAX as usize) as i64
1224 ],
1225 event_row,
1226 )?;
1227 let mut events = Vec::new();
1228 for row in rows {
1229 events.push(row?);
1230 }
1231 Ok(events)
1232 }
1233
1234 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1236 self.conn.execute(
1237 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1238 params![format!("{:?}", status), id],
1239 )?;
1240 Ok(())
1241 }
1242
1243 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1245 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1246 Ok(InsightsStats {
1247 total_sessions: count_q(
1248 &self.conn,
1249 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1250 workspace,
1251 )?,
1252 running_sessions: count_q(
1253 &self.conn,
1254 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1255 workspace,
1256 )?,
1257 total_events: count_q(
1258 &self.conn,
1259 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1260 workspace,
1261 )?,
1262 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1263 recent: recent_sessions_3(&self.conn, workspace)?,
1264 top_tools: top_tools_5(&self.conn, workspace)?,
1265 total_cost_usd_e6,
1266 sessions_with_cost,
1267 })
1268 }
1269
1270 pub fn retro_events_in_window(
1272 &self,
1273 workspace: &str,
1274 start_ms: u64,
1275 end_ms: u64,
1276 ) -> Result<Vec<(SessionRecord, Event)>> {
1277 let mut stmt = self.conn.prepare(
1278 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1279 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1280 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1281 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1282 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1283 s.repo_file_count, s.repo_total_loc,
1284 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1285 e.context_used_tokens, e.context_max_tokens,
1286 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1287 FROM events e
1288 JOIN sessions s ON s.id = e.session_id
1289 WHERE s.workspace = ?1
1290 AND (
1291 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1292 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1293 )
1294 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1295 )?;
1296 let rows = stmt.query_map(
1297 params![
1298 workspace,
1299 start_ms as i64,
1300 end_ms as i64,
1301 SYNTHETIC_TS_CEILING_MS,
1302 ],
1303 |row| {
1304 let payload_str: String = row.get(12)?;
1305 let status_str: String = row.get(19)?;
1306 Ok((
1307 SessionRecord {
1308 id: row.get(13)?,
1309 agent: row.get(14)?,
1310 model: row.get(15)?,
1311 workspace: row.get(16)?,
1312 started_at_ms: row.get::<_, i64>(17)? as u64,
1313 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1314 status: status_from_str(&status_str),
1315 trace_path: row.get(20)?,
1316 start_commit: row.get(21)?,
1317 end_commit: row.get(22)?,
1318 branch: row.get(23)?,
1319 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1320 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1321 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1322 prompt_fingerprint: row.get(27)?,
1323 parent_session_id: row.get(28)?,
1324 agent_version: row.get(29)?,
1325 os: row.get(30)?,
1326 arch: row.get(31)?,
1327 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1328 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1329 },
1330 Event {
1331 session_id: row.get(0)?,
1332 seq: row.get::<_, i64>(1)? as u64,
1333 ts_ms: row.get::<_, i64>(2)? as u64,
1334 ts_exact: row.get::<_, i64>(3)? != 0,
1335 kind: kind_from_str(&row.get::<_, String>(4)?),
1336 source: source_from_str(&row.get::<_, String>(5)?),
1337 tool: row.get(6)?,
1338 tool_call_id: row.get(7)?,
1339 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1340 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1341 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1342 cost_usd_e6: row.get(11)?,
1343 payload: serde_json::from_str(&payload_str)
1344 .unwrap_or(serde_json::Value::Null),
1345 stop_reason: row.get(34)?,
1346 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1347 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1348 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1349 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1350 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1351 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1352 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1353 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1354 },
1355 ))
1356 },
1357 )?;
1358
1359 let mut out = Vec::new();
1360 for r in rows {
1361 out.push(r?);
1362 }
1363 Ok(out)
1364 }
1365
1366 pub fn files_touched_in_window(
1368 &self,
1369 workspace: &str,
1370 start_ms: u64,
1371 end_ms: u64,
1372 ) -> Result<Vec<(String, String)>> {
1373 let mut stmt = self.conn.prepare(
1374 "SELECT DISTINCT ft.session_id, ft.path
1375 FROM files_touched ft
1376 JOIN sessions s ON s.id = ft.session_id
1377 WHERE s.workspace = ?1
1378 AND EXISTS (
1379 SELECT 1 FROM events e
1380 JOIN sessions ss ON ss.id = e.session_id
1381 WHERE e.session_id = ft.session_id
1382 AND (
1383 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1384 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1385 )
1386 )
1387 ORDER BY ft.session_id, ft.path",
1388 )?;
1389 let out: Vec<(String, String)> = stmt
1390 .query_map(
1391 params![
1392 workspace,
1393 start_ms as i64,
1394 end_ms as i64,
1395 SYNTHETIC_TS_CEILING_MS,
1396 ],
1397 |r| Ok((r.get(0)?, r.get(1)?)),
1398 )?
1399 .filter_map(|r| r.ok())
1400 .collect();
1401 Ok(out)
1402 }
1403
1404 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1407 let mut stmt = self.conn.prepare(
1408 "SELECT DISTINCT su.skill
1409 FROM skills_used su
1410 JOIN sessions s ON s.id = su.session_id
1411 WHERE s.workspace = ?1
1412 AND EXISTS (
1413 SELECT 1 FROM events e
1414 JOIN sessions ss ON ss.id = e.session_id
1415 WHERE e.session_id = su.session_id
1416 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1417 )
1418 ORDER BY su.skill",
1419 )?;
1420 let out: Vec<String> = stmt
1421 .query_map(
1422 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1423 |r| r.get::<_, String>(0),
1424 )?
1425 .filter_map(|r| r.ok())
1426 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1427 .collect();
1428 Ok(out)
1429 }
1430
1431 pub fn skills_used_in_window(
1433 &self,
1434 workspace: &str,
1435 start_ms: u64,
1436 end_ms: u64,
1437 ) -> Result<Vec<(String, String)>> {
1438 let mut stmt = self.conn.prepare(
1439 "SELECT DISTINCT su.session_id, su.skill
1440 FROM skills_used su
1441 JOIN sessions s ON s.id = su.session_id
1442 WHERE s.workspace = ?1
1443 AND EXISTS (
1444 SELECT 1 FROM events e
1445 JOIN sessions ss ON ss.id = e.session_id
1446 WHERE e.session_id = su.session_id
1447 AND (
1448 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1449 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1450 )
1451 )
1452 ORDER BY su.session_id, su.skill",
1453 )?;
1454 let out: Vec<(String, String)> = stmt
1455 .query_map(
1456 params![
1457 workspace,
1458 start_ms as i64,
1459 end_ms as i64,
1460 SYNTHETIC_TS_CEILING_MS,
1461 ],
1462 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1463 )?
1464 .filter_map(|r| r.ok())
1465 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1466 .collect();
1467 Ok(out)
1468 }
1469
1470 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1472 let mut stmt = self.conn.prepare(
1473 "SELECT DISTINCT ru.rule
1474 FROM rules_used ru
1475 JOIN sessions s ON s.id = ru.session_id
1476 WHERE s.workspace = ?1
1477 AND EXISTS (
1478 SELECT 1 FROM events e
1479 JOIN sessions ss ON ss.id = e.session_id
1480 WHERE e.session_id = ru.session_id
1481 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1482 )
1483 ORDER BY ru.rule",
1484 )?;
1485 let out: Vec<String> = stmt
1486 .query_map(
1487 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1488 |r| r.get::<_, String>(0),
1489 )?
1490 .filter_map(|r| r.ok())
1491 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1492 .collect();
1493 Ok(out)
1494 }
1495
1496 pub fn rules_used_in_window(
1498 &self,
1499 workspace: &str,
1500 start_ms: u64,
1501 end_ms: u64,
1502 ) -> Result<Vec<(String, String)>> {
1503 let mut stmt = self.conn.prepare(
1504 "SELECT DISTINCT ru.session_id, ru.rule
1505 FROM rules_used ru
1506 JOIN sessions s ON s.id = ru.session_id
1507 WHERE s.workspace = ?1
1508 AND EXISTS (
1509 SELECT 1 FROM events e
1510 JOIN sessions ss ON ss.id = e.session_id
1511 WHERE e.session_id = ru.session_id
1512 AND (
1513 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1514 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1515 )
1516 )
1517 ORDER BY ru.session_id, ru.rule",
1518 )?;
1519 let out: Vec<(String, String)> = stmt
1520 .query_map(
1521 params![
1522 workspace,
1523 start_ms as i64,
1524 end_ms as i64,
1525 SYNTHETIC_TS_CEILING_MS,
1526 ],
1527 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1528 )?
1529 .filter_map(|r| r.ok())
1530 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1531 .collect();
1532 Ok(out)
1533 }
1534
1535 pub fn sessions_active_in_window(
1537 &self,
1538 workspace: &str,
1539 start_ms: u64,
1540 end_ms: u64,
1541 ) -> Result<HashSet<String>> {
1542 let mut stmt = self.conn.prepare(
1543 "SELECT DISTINCT s.id
1544 FROM sessions s
1545 WHERE s.workspace = ?1
1546 AND EXISTS (
1547 SELECT 1 FROM events e
1548 WHERE e.session_id = s.id
1549 AND (
1550 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1551 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1552 )
1553 )",
1554 )?;
1555 let out: HashSet<String> = stmt
1556 .query_map(
1557 params![
1558 workspace,
1559 start_ms as i64,
1560 end_ms as i64,
1561 SYNTHETIC_TS_CEILING_MS,
1562 ],
1563 |r| r.get(0),
1564 )?
1565 .filter_map(|r| r.ok())
1566 .collect();
1567 Ok(out)
1568 }
1569
1570 pub fn session_costs_usd_e6_in_window(
1572 &self,
1573 workspace: &str,
1574 start_ms: u64,
1575 end_ms: u64,
1576 ) -> Result<HashMap<String, i64>> {
1577 let mut stmt = self.conn.prepare(
1578 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1579 FROM events e
1580 JOIN sessions s ON s.id = e.session_id
1581 WHERE s.workspace = ?1
1582 AND (
1583 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1584 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1585 )
1586 GROUP BY e.session_id",
1587 )?;
1588 let rows: Vec<(String, i64)> = stmt
1589 .query_map(
1590 params![
1591 workspace,
1592 start_ms as i64,
1593 end_ms as i64,
1594 SYNTHETIC_TS_CEILING_MS,
1595 ],
1596 |r| Ok((r.get(0)?, r.get(1)?)),
1597 )?
1598 .filter_map(|r| r.ok())
1599 .collect();
1600 Ok(rows.into_iter().collect())
1601 }
1602
1603 pub fn guidance_report(
1605 &self,
1606 workspace: &str,
1607 window_start_ms: u64,
1608 window_end_ms: u64,
1609 skill_slugs_on_disk: &HashSet<String>,
1610 rule_slugs_on_disk: &HashSet<String>,
1611 ) -> Result<GuidanceReport> {
1612 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1613 let denom = active.len() as u64;
1614 let costs =
1615 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1616
1617 let workspace_avg_cost_per_session_usd = if denom > 0 {
1618 let total_e6: i64 = active
1619 .iter()
1620 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1621 .sum();
1622 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1623 } else {
1624 None
1625 };
1626
1627 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1628 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1629 skill_sessions.entry(skill).or_default().insert(sid);
1630 }
1631 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1632 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1633 rule_sessions.entry(rule).or_default().insert(sid);
1634 }
1635
1636 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1637
1638 let mut push_row =
1639 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1640 let sessions = sids.len() as u64;
1641 let sessions_pct = if denom > 0 {
1642 sessions as f64 * 100.0 / denom as f64
1643 } else {
1644 0.0
1645 };
1646 let total_cost_usd_e6: i64 = sids
1647 .iter()
1648 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1649 .sum();
1650 let avg_cost_per_session_usd = if sessions > 0 {
1651 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1652 } else {
1653 None
1654 };
1655 let vs_workspace_avg_cost_per_session_usd =
1656 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1657 (Some(avg), Some(w)) => Some(avg - w),
1658 _ => None,
1659 };
1660 rows.push(GuidancePerfRow {
1661 kind,
1662 id,
1663 sessions,
1664 sessions_pct,
1665 total_cost_usd_e6,
1666 avg_cost_per_session_usd,
1667 vs_workspace_avg_cost_per_session_usd,
1668 on_disk,
1669 });
1670 };
1671
1672 let mut seen_skills: HashSet<String> = HashSet::new();
1673 for (id, sids) in &skill_sessions {
1674 seen_skills.insert(id.clone());
1675 push_row(
1676 GuidanceKind::Skill,
1677 id.clone(),
1678 sids,
1679 skill_slugs_on_disk.contains(id),
1680 );
1681 }
1682 for slug in skill_slugs_on_disk {
1683 if seen_skills.contains(slug) {
1684 continue;
1685 }
1686 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1687 }
1688
1689 let mut seen_rules: HashSet<String> = HashSet::new();
1690 for (id, sids) in &rule_sessions {
1691 seen_rules.insert(id.clone());
1692 push_row(
1693 GuidanceKind::Rule,
1694 id.clone(),
1695 sids,
1696 rule_slugs_on_disk.contains(id),
1697 );
1698 }
1699 for slug in rule_slugs_on_disk {
1700 if seen_rules.contains(slug) {
1701 continue;
1702 }
1703 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1704 }
1705
1706 rows.sort_by(|a, b| {
1707 b.sessions
1708 .cmp(&a.sessions)
1709 .then_with(|| a.kind.cmp(&b.kind))
1710 .then_with(|| a.id.cmp(&b.id))
1711 });
1712
1713 Ok(GuidanceReport {
1714 workspace: workspace.to_string(),
1715 window_start_ms,
1716 window_end_ms,
1717 sessions_in_window: denom,
1718 workspace_avg_cost_per_session_usd,
1719 rows,
1720 })
1721 }
1722
1723 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1724 let mut stmt = self.conn.prepare(
1725 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1726 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1727 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1728 repo_file_count, repo_total_loc
1729 FROM sessions WHERE id = ?1",
1730 )?;
1731 let mut rows = stmt.query_map(params![id], |row| {
1732 Ok((
1733 row.get::<_, String>(0)?,
1734 row.get::<_, String>(1)?,
1735 row.get::<_, Option<String>>(2)?,
1736 row.get::<_, String>(3)?,
1737 row.get::<_, i64>(4)?,
1738 row.get::<_, Option<i64>>(5)?,
1739 row.get::<_, String>(6)?,
1740 row.get::<_, String>(7)?,
1741 row.get::<_, Option<String>>(8)?,
1742 row.get::<_, Option<String>>(9)?,
1743 row.get::<_, Option<String>>(10)?,
1744 row.get::<_, Option<i64>>(11)?,
1745 row.get::<_, Option<i64>>(12)?,
1746 row.get::<_, String>(13)?,
1747 row.get::<_, Option<String>>(14)?,
1748 row.get::<_, Option<String>>(15)?,
1749 row.get::<_, Option<String>>(16)?,
1750 row.get::<_, Option<String>>(17)?,
1751 row.get::<_, Option<String>>(18)?,
1752 row.get::<_, Option<i64>>(19)?,
1753 row.get::<_, Option<i64>>(20)?,
1754 ))
1755 })?;
1756
1757 if let Some(row) = rows.next() {
1758 let (
1759 id,
1760 agent,
1761 model,
1762 workspace,
1763 started,
1764 ended,
1765 status_str,
1766 trace,
1767 start_commit,
1768 end_commit,
1769 branch,
1770 dirty_start,
1771 dirty_end,
1772 source,
1773 prompt_fingerprint,
1774 parent_session_id,
1775 agent_version,
1776 os,
1777 arch,
1778 repo_file_count,
1779 repo_total_loc,
1780 ) = row?;
1781 Ok(Some(SessionRecord {
1782 id,
1783 agent,
1784 model,
1785 workspace,
1786 started_at_ms: started as u64,
1787 ended_at_ms: ended.map(|v| v as u64),
1788 status: status_from_str(&status_str),
1789 trace_path: trace,
1790 start_commit,
1791 end_commit,
1792 branch,
1793 dirty_start: dirty_start.map(i64_to_bool),
1794 dirty_end: dirty_end.map(i64_to_bool),
1795 repo_binding_source: empty_to_none(source),
1796 prompt_fingerprint,
1797 parent_session_id,
1798 agent_version,
1799 os,
1800 arch,
1801 repo_file_count: repo_file_count.map(|v| v as u32),
1802 repo_total_loc: repo_total_loc.map(|v| v as u64),
1803 }))
1804 } else {
1805 Ok(None)
1806 }
1807 }
1808
1809 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1810 let mut stmt = self.conn.prepare(
1811 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1812 indexed_at_ms, dirty, graph_path
1813 FROM repo_snapshots WHERE workspace = ?1
1814 ORDER BY indexed_at_ms DESC LIMIT 1",
1815 )?;
1816 let mut rows = stmt.query_map(params![workspace], |row| {
1817 Ok(RepoSnapshotRecord {
1818 id: row.get(0)?,
1819 workspace: row.get(1)?,
1820 head_commit: row.get(2)?,
1821 dirty_fingerprint: row.get(3)?,
1822 analyzer_version: row.get(4)?,
1823 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1824 dirty: row.get::<_, i64>(6)? != 0,
1825 graph_path: row.get(7)?,
1826 })
1827 })?;
1828 Ok(rows.next().transpose()?)
1829 }
1830
1831 pub fn save_repo_snapshot(
1832 &self,
1833 snapshot: &RepoSnapshotRecord,
1834 facts: &[FileFact],
1835 edges: &[RepoEdge],
1836 ) -> Result<()> {
1837 self.conn.execute(
1838 "INSERT INTO repo_snapshots (
1839 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1840 indexed_at_ms, dirty, graph_path
1841 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1842 ON CONFLICT(id) DO UPDATE SET
1843 workspace=excluded.workspace,
1844 head_commit=excluded.head_commit,
1845 dirty_fingerprint=excluded.dirty_fingerprint,
1846 analyzer_version=excluded.analyzer_version,
1847 indexed_at_ms=excluded.indexed_at_ms,
1848 dirty=excluded.dirty,
1849 graph_path=excluded.graph_path",
1850 params![
1851 snapshot.id,
1852 snapshot.workspace,
1853 snapshot.head_commit,
1854 snapshot.dirty_fingerprint,
1855 snapshot.analyzer_version,
1856 snapshot.indexed_at_ms as i64,
1857 bool_to_i64(snapshot.dirty),
1858 snapshot.graph_path,
1859 ],
1860 )?;
1861 self.conn.execute(
1862 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1863 params![snapshot.id],
1864 )?;
1865 self.conn.execute(
1866 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1867 params![snapshot.id],
1868 )?;
1869 for fact in facts {
1870 self.conn.execute(
1871 "INSERT INTO file_facts (
1872 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1873 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1874 churn_30d, churn_90d, authors_90d, last_changed_ms
1875 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1876 params![
1877 fact.snapshot_id,
1878 fact.path,
1879 fact.language,
1880 fact.bytes as i64,
1881 fact.loc as i64,
1882 fact.sloc as i64,
1883 fact.complexity_total as i64,
1884 fact.max_fn_complexity as i64,
1885 fact.symbol_count as i64,
1886 fact.import_count as i64,
1887 fact.fan_in as i64,
1888 fact.fan_out as i64,
1889 fact.churn_30d as i64,
1890 fact.churn_90d as i64,
1891 fact.authors_90d as i64,
1892 fact.last_changed_ms.map(|v| v as i64),
1893 ],
1894 )?;
1895 }
1896 for edge in edges {
1897 self.conn.execute(
1898 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1899 VALUES (?1, ?2, ?3, ?4, ?5)
1900 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1901 DO UPDATE SET weight = weight + excluded.weight",
1902 params![
1903 snapshot.id,
1904 edge.from_path,
1905 edge.to_path,
1906 edge.kind,
1907 edge.weight as i64,
1908 ],
1909 )?;
1910 }
1911 Ok(())
1912 }
1913
1914 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1915 let mut stmt = self.conn.prepare(
1916 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1917 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1918 churn_30d, churn_90d, authors_90d, last_changed_ms
1919 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1920 )?;
1921 let rows = stmt.query_map(params![snapshot_id], |row| {
1922 Ok(FileFact {
1923 snapshot_id: row.get(0)?,
1924 path: row.get(1)?,
1925 language: row.get(2)?,
1926 bytes: row.get::<_, i64>(3)? as u64,
1927 loc: row.get::<_, i64>(4)? as u32,
1928 sloc: row.get::<_, i64>(5)? as u32,
1929 complexity_total: row.get::<_, i64>(6)? as u32,
1930 max_fn_complexity: row.get::<_, i64>(7)? as u32,
1931 symbol_count: row.get::<_, i64>(8)? as u32,
1932 import_count: row.get::<_, i64>(9)? as u32,
1933 fan_in: row.get::<_, i64>(10)? as u32,
1934 fan_out: row.get::<_, i64>(11)? as u32,
1935 churn_30d: row.get::<_, i64>(12)? as u32,
1936 churn_90d: row.get::<_, i64>(13)? as u32,
1937 authors_90d: row.get::<_, i64>(14)? as u32,
1938 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1939 })
1940 })?;
1941 Ok(rows.filter_map(|row| row.ok()).collect())
1942 }
1943
1944 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1945 let mut stmt = self.conn.prepare(
1946 "SELECT from_id, to_id, kind, weight
1947 FROM repo_edges WHERE snapshot_id = ?1
1948 ORDER BY kind, from_id, to_id",
1949 )?;
1950 let rows = stmt.query_map(params![snapshot_id], |row| {
1951 Ok(RepoEdge {
1952 from_path: row.get(0)?,
1953 to_path: row.get(1)?,
1954 kind: row.get(2)?,
1955 weight: row.get::<_, i64>(3)? as u32,
1956 })
1957 })?;
1958 Ok(rows.filter_map(|row| row.ok()).collect())
1959 }
1960
1961 pub fn tool_spans_in_window(
1962 &self,
1963 workspace: &str,
1964 start_ms: u64,
1965 end_ms: u64,
1966 ) -> Result<Vec<ToolSpanView>> {
1967 let mut stmt = self.conn.prepare(
1968 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1969 reasoning_tokens, cost_usd_e6, paths_json,
1970 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1971 FROM (
1972 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1973 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1974 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1975 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1976 ts.started_at_ms AS sort_ms
1977 FROM tool_spans ts
1978 JOIN sessions s ON s.id = ts.session_id
1979 WHERE s.workspace = ?1
1980 AND ts.started_at_ms >= ?2
1981 AND ts.started_at_ms <= ?3
1982 UNION ALL
1983 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1984 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1985 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1986 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1987 ts.ended_at_ms AS sort_ms
1988 FROM tool_spans ts
1989 JOIN sessions s ON s.id = ts.session_id
1990 WHERE s.workspace = ?1
1991 AND ts.started_at_ms IS NULL
1992 AND ts.ended_at_ms >= ?2
1993 AND ts.ended_at_ms <= ?3
1994 )
1995 ORDER BY sort_ms DESC",
1996 )?;
1997 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1998 let paths_json: String = row.get(8)?;
1999 Ok(ToolSpanView {
2000 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2001 tool: row
2002 .get::<_, Option<String>>(1)?
2003 .unwrap_or_else(|| "unknown".into()),
2004 status: row.get(2)?,
2005 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2006 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2007 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2008 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2009 cost_usd_e6: row.get(7)?,
2010 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2011 parent_span_id: row.get(9)?,
2012 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2013 subtree_cost_usd_e6: row.get(11)?,
2014 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2015 })
2016 })?;
2017 Ok(rows.filter_map(|row| row.ok()).collect())
2018 }
2019
2020 pub fn session_span_tree(
2021 &self,
2022 session_id: &str,
2023 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2024 let last_event_seq = self.last_event_seq_for_session(session_id)?;
2025 if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2026 && entry.session_id == session_id
2027 && entry.last_event_seq == last_event_seq
2028 {
2029 return Ok(entry.nodes.clone());
2030 }
2031 let mut stmt = self.conn.prepare(
2032 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2033 reasoning_tokens, cost_usd_e6, paths_json,
2034 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2035 FROM tool_spans
2036 WHERE session_id = ?1
2037 ORDER BY depth ASC, started_at_ms ASC",
2038 )?;
2039 let rows = stmt.query_map(params![session_id], |row| {
2040 let paths_json: String = row.get(8)?;
2041 Ok(crate::metrics::types::ToolSpanView {
2042 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2043 tool: row
2044 .get::<_, Option<String>>(1)?
2045 .unwrap_or_else(|| "unknown".into()),
2046 status: row.get(2)?,
2047 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2048 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2049 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2050 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2051 cost_usd_e6: row.get(7)?,
2052 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2053 parent_span_id: row.get(9)?,
2054 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2055 subtree_cost_usd_e6: row.get(11)?,
2056 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2057 })
2058 })?;
2059 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2060 let nodes = crate::store::span_tree::build_tree(spans);
2061 *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2062 session_id: session_id.to_string(),
2063 last_event_seq,
2064 nodes: nodes.clone(),
2065 });
2066 Ok(nodes)
2067 }
2068
2069 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2070 let seq = self
2071 .conn
2072 .query_row(
2073 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2074 params![session_id],
2075 |r| r.get::<_, Option<i64>>(0),
2076 )?
2077 .map(|v| v as u64);
2078 Ok(seq)
2079 }
2080
2081 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2082 let mut stmt = self.conn.prepare(
2083 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2084 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2085 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2086 )?;
2087 let rows = stmt.query_map(params![session_id], |row| {
2088 let paths_json: String = row.get(12)?;
2089 Ok(ToolSpanSyncRow {
2090 span_id: row.get(0)?,
2091 session_id: row.get(1)?,
2092 tool: row.get(2)?,
2093 tool_call_id: row.get(3)?,
2094 status: row.get(4)?,
2095 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2096 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2097 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2098 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2099 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2100 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2101 cost_usd_e6: row.get(11)?,
2102 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2103 })
2104 })?;
2105 Ok(rows.filter_map(|row| row.ok()).collect())
2106 }
2107
2108 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2109 self.conn.execute(
2110 "INSERT OR REPLACE INTO session_evals
2111 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2112 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2113 rusqlite::params![
2114 eval.id,
2115 eval.session_id,
2116 eval.judge_model,
2117 eval.rubric_id,
2118 eval.score,
2119 eval.rationale,
2120 eval.flagged as i64,
2121 eval.created_at_ms as i64,
2122 ],
2123 )?;
2124 Ok(())
2125 }
2126
2127 pub fn list_evals_in_window(
2128 &self,
2129 start_ms: u64,
2130 end_ms: u64,
2131 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2132 let mut stmt = self.conn.prepare(
2133 "SELECT id, session_id, judge_model, rubric_id, score,
2134 rationale, flagged, created_at_ms
2135 FROM session_evals
2136 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2137 ORDER BY created_at_ms ASC",
2138 )?;
2139 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2140 Ok(crate::eval::types::EvalRow {
2141 id: r.get(0)?,
2142 session_id: r.get(1)?,
2143 judge_model: r.get(2)?,
2144 rubric_id: r.get(3)?,
2145 score: r.get(4)?,
2146 rationale: r.get(5)?,
2147 flagged: r.get::<_, i64>(6)? != 0,
2148 created_at_ms: r.get::<_, i64>(7)? as u64,
2149 })
2150 })?;
2151 rows.collect()
2152 }
2153
2154 pub fn list_evals_for_session(
2155 &self,
2156 session_id: &str,
2157 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2158 let mut stmt = self.conn.prepare(
2159 "SELECT id, session_id, judge_model, rubric_id, score,
2160 rationale, flagged, created_at_ms
2161 FROM session_evals
2162 WHERE session_id = ?1
2163 ORDER BY created_at_ms DESC",
2164 )?;
2165 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2166 Ok(crate::eval::types::EvalRow {
2167 id: r.get(0)?,
2168 session_id: r.get(1)?,
2169 judge_model: r.get(2)?,
2170 rubric_id: r.get(3)?,
2171 score: r.get(4)?,
2172 rationale: r.get(5)?,
2173 flagged: r.get::<_, i64>(6)? != 0,
2174 created_at_ms: r.get::<_, i64>(7)? as u64,
2175 })
2176 })?;
2177 rows.collect()
2178 }
2179
2180 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2181 use crate::feedback::types::FeedbackLabel;
2182 self.conn.execute(
2183 "INSERT OR REPLACE INTO session_feedback
2184 (id, session_id, score, label, note, created_at_ms)
2185 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2186 rusqlite::params![
2187 r.id,
2188 r.session_id,
2189 r.score.as_ref().map(|s| s.0 as i64),
2190 r.label.as_ref().map(FeedbackLabel::to_db_str),
2191 r.note,
2192 r.created_at_ms as i64,
2193 ],
2194 )?;
2195 let payload = serde_json::to_string(r).unwrap_or_default();
2196 self.conn.execute(
2197 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2198 VALUES (?1, 'session_feedback', ?2, 0)",
2199 rusqlite::params![r.session_id, payload],
2200 )?;
2201 Ok(())
2202 }
2203
2204 pub fn list_feedback_in_window(
2205 &self,
2206 start_ms: u64,
2207 end_ms: u64,
2208 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2209 let mut stmt = self.conn.prepare(
2210 "SELECT id, session_id, score, label, note, created_at_ms
2211 FROM session_feedback
2212 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2213 ORDER BY created_at_ms ASC",
2214 )?;
2215 let rows = stmt.query_map(
2216 rusqlite::params![start_ms as i64, end_ms as i64],
2217 feedback_row,
2218 )?;
2219 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2220 }
2221
2222 pub fn feedback_for_sessions(
2223 &self,
2224 ids: &[String],
2225 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2226 if ids.is_empty() {
2227 return Ok(std::collections::HashMap::new());
2228 }
2229 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2230 let sql = format!(
2231 "SELECT id, session_id, score, label, note, created_at_ms
2232 FROM session_feedback WHERE session_id IN ({placeholders})
2233 ORDER BY created_at_ms DESC"
2234 );
2235 let mut stmt = self.conn.prepare(&sql)?;
2236 let params: Vec<&dyn rusqlite::ToSql> =
2237 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2238 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2239 let mut map = std::collections::HashMap::new();
2240 for row in rows {
2241 let r = row?;
2242 map.entry(r.session_id.clone()).or_insert(r);
2243 }
2244 Ok(map)
2245 }
2246
2247 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2248 self.conn.execute(
2249 "INSERT INTO session_outcomes (
2250 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2251 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2252 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2253 ON CONFLICT(session_id) DO UPDATE SET
2254 test_passed=excluded.test_passed,
2255 test_failed=excluded.test_failed,
2256 test_skipped=excluded.test_skipped,
2257 build_ok=excluded.build_ok,
2258 lint_errors=excluded.lint_errors,
2259 revert_lines_14d=excluded.revert_lines_14d,
2260 pr_open=excluded.pr_open,
2261 ci_ok=excluded.ci_ok,
2262 measured_at_ms=excluded.measured_at_ms,
2263 measure_error=excluded.measure_error",
2264 params![
2265 row.session_id,
2266 row.test_passed,
2267 row.test_failed,
2268 row.test_skipped,
2269 row.build_ok.map(bool_to_i64),
2270 row.lint_errors,
2271 row.revert_lines_14d,
2272 row.pr_open,
2273 row.ci_ok.map(bool_to_i64),
2274 row.measured_at_ms as i64,
2275 row.measure_error.as_deref(),
2276 ],
2277 )?;
2278 Ok(())
2279 }
2280
2281 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2282 let mut stmt = self.conn.prepare(
2283 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2284 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2285 FROM session_outcomes WHERE session_id = ?1",
2286 )?;
2287 let row = stmt
2288 .query_row(params![session_id], outcome_row)
2289 .optional()?;
2290 Ok(row)
2291 }
2292
2293 pub fn list_session_outcomes_in_window(
2295 &self,
2296 workspace: &str,
2297 start_ms: u64,
2298 end_ms: u64,
2299 ) -> Result<Vec<SessionOutcomeRow>> {
2300 let mut stmt = self.conn.prepare(
2301 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2302 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2303 FROM session_outcomes o
2304 JOIN sessions s ON s.id = o.session_id
2305 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2306 ORDER BY o.measured_at_ms ASC",
2307 )?;
2308 let rows = stmt.query_map(
2309 params![workspace, start_ms as i64, end_ms as i64],
2310 outcome_row,
2311 )?;
2312 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2313 }
2314
2315 pub fn append_session_sample(
2316 &self,
2317 session_id: &str,
2318 ts_ms: u64,
2319 pid: u32,
2320 cpu_percent: Option<f64>,
2321 rss_bytes: Option<u64>,
2322 ) -> Result<()> {
2323 self.conn.execute(
2324 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2325 VALUES (?1, ?2, ?3, ?4, ?5)",
2326 params![
2327 session_id,
2328 ts_ms as i64,
2329 pid as i64,
2330 cpu_percent,
2331 rss_bytes.map(|b| b as i64)
2332 ],
2333 )?;
2334 Ok(())
2335 }
2336
2337 pub fn list_session_sample_aggs_in_window(
2339 &self,
2340 workspace: &str,
2341 start_ms: u64,
2342 end_ms: u64,
2343 ) -> Result<Vec<SessionSampleAgg>> {
2344 let mut stmt = self.conn.prepare(
2345 "SELECT ss.session_id, COUNT(*) AS n,
2346 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2347 FROM session_samples ss
2348 JOIN sessions s ON s.id = ss.session_id
2349 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2350 GROUP BY ss.session_id",
2351 )?;
2352 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2353 let sid: String = r.get(0)?;
2354 let n: i64 = r.get(1)?;
2355 let max_cpu: Option<f64> = r.get(2)?;
2356 let max_rss: Option<i64> = r.get(3)?;
2357 Ok(SessionSampleAgg {
2358 session_id: sid,
2359 sample_count: n as u64,
2360 max_cpu_percent: max_cpu.unwrap_or(0.0),
2361 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2362 })
2363 })?;
2364 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2365 }
2366
2367 pub fn list_sessions_for_eval(
2368 &self,
2369 since_ms: u64,
2370 min_cost_usd: f64,
2371 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2372 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2373 let mut stmt = self.conn.prepare(
2374 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2375 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2376 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2377 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2378 FROM sessions s
2379 WHERE s.started_at_ms >= ?1
2380 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2381 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2382 ORDER BY s.started_at_ms DESC",
2383 )?;
2384 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2385 Ok((
2386 r.get::<_, String>(0)?,
2387 r.get::<_, String>(1)?,
2388 r.get::<_, Option<String>>(2)?,
2389 r.get::<_, String>(3)?,
2390 r.get::<_, i64>(4)?,
2391 r.get::<_, Option<i64>>(5)?,
2392 r.get::<_, String>(6)?,
2393 r.get::<_, String>(7)?,
2394 r.get::<_, Option<String>>(8)?,
2395 r.get::<_, Option<String>>(9)?,
2396 r.get::<_, Option<String>>(10)?,
2397 r.get::<_, Option<i64>>(11)?,
2398 r.get::<_, Option<i64>>(12)?,
2399 r.get::<_, Option<String>>(13)?,
2400 r.get::<_, Option<String>>(14)?,
2401 r.get::<_, Option<String>>(15)?,
2402 r.get::<_, Option<String>>(16)?,
2403 r.get::<_, Option<String>>(17)?,
2404 r.get::<_, Option<String>>(18)?,
2405 r.get::<_, Option<i64>>(19)?,
2406 r.get::<_, Option<i64>>(20)?,
2407 ))
2408 })?;
2409 let mut out = Vec::new();
2410 for row in rows {
2411 let (
2412 id,
2413 agent,
2414 model,
2415 workspace,
2416 started,
2417 ended,
2418 status_str,
2419 trace,
2420 start_commit,
2421 end_commit,
2422 branch,
2423 dirty_start,
2424 dirty_end,
2425 source,
2426 prompt_fingerprint,
2427 parent_session_id,
2428 agent_version,
2429 os,
2430 arch,
2431 repo_file_count,
2432 repo_total_loc,
2433 ) = row?;
2434 out.push(crate::core::event::SessionRecord {
2435 id,
2436 agent,
2437 model,
2438 workspace,
2439 started_at_ms: started as u64,
2440 ended_at_ms: ended.map(|v| v as u64),
2441 status: status_from_str(&status_str),
2442 trace_path: trace,
2443 start_commit,
2444 end_commit,
2445 branch,
2446 dirty_start: dirty_start.map(i64_to_bool),
2447 dirty_end: dirty_end.map(i64_to_bool),
2448 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2449 prompt_fingerprint,
2450 parent_session_id,
2451 agent_version,
2452 os,
2453 arch,
2454 repo_file_count: repo_file_count.map(|v| v as u32),
2455 repo_total_loc: repo_total_loc.map(|v| v as u64),
2456 });
2457 }
2458 Ok(out)
2459 }
2460
2461 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2462 self.conn.execute(
2463 "INSERT OR IGNORE INTO prompt_snapshots
2464 (fingerprint, captured_at_ms, files_json, total_bytes)
2465 VALUES (?1, ?2, ?3, ?4)",
2466 params![
2467 snap.fingerprint,
2468 snap.captured_at_ms as i64,
2469 snap.files_json,
2470 snap.total_bytes as i64
2471 ],
2472 )?;
2473 Ok(())
2474 }
2475
2476 pub fn get_prompt_snapshot(
2477 &self,
2478 fingerprint: &str,
2479 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2480 self.conn
2481 .query_row(
2482 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2483 FROM prompt_snapshots WHERE fingerprint = ?1",
2484 params![fingerprint],
2485 |r| {
2486 Ok(crate::prompt::PromptSnapshot {
2487 fingerprint: r.get(0)?,
2488 captured_at_ms: r.get::<_, i64>(1)? as u64,
2489 files_json: r.get(2)?,
2490 total_bytes: r.get::<_, i64>(3)? as u64,
2491 })
2492 },
2493 )
2494 .optional()
2495 .map_err(Into::into)
2496 }
2497
2498 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2499 let mut stmt = self.conn.prepare(
2500 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2501 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2502 )?;
2503 let rows = stmt.query_map([], |r| {
2504 Ok(crate::prompt::PromptSnapshot {
2505 fingerprint: r.get(0)?,
2506 captured_at_ms: r.get::<_, i64>(1)? as u64,
2507 files_json: r.get(2)?,
2508 total_bytes: r.get::<_, i64>(3)? as u64,
2509 })
2510 })?;
2511 Ok(rows.filter_map(|r| r.ok()).collect())
2512 }
2513
2514 pub fn sessions_with_prompt_fingerprint(
2516 &self,
2517 workspace: &str,
2518 start_ms: u64,
2519 end_ms: u64,
2520 ) -> Result<Vec<(String, String)>> {
2521 let mut stmt = self.conn.prepare(
2522 "SELECT id, prompt_fingerprint FROM sessions
2523 WHERE workspace = ?1
2524 AND started_at_ms >= ?2 AND started_at_ms < ?3
2525 AND prompt_fingerprint IS NOT NULL",
2526 )?;
2527 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2528 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2529 })?;
2530 Ok(rows.filter_map(|r| r.ok()).collect())
2531 }
2532}
2533
2534fn now_ms() -> u64 {
2535 std::time::SystemTime::now()
2536 .duration_since(std::time::UNIX_EPOCH)
2537 .unwrap_or_default()
2538 .as_millis() as u64
2539}
2540
2541fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2542 raw.and_then(|s| s.trim().parse::<u64>().ok())
2543 .unwrap_or(DEFAULT_MMAP_MB)
2544 .saturating_mul(1024)
2545 .saturating_mul(1024)
2546 .min(i64::MAX as u64) as i64
2547}
2548
2549fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2550 let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2551 conn.execute_batch(&format!(
2552 "
2553 PRAGMA journal_mode=WAL;
2554 PRAGMA busy_timeout=5000;
2555 PRAGMA synchronous=NORMAL;
2556 PRAGMA cache_size=-65536;
2557 PRAGMA mmap_size={mmap_size};
2558 PRAGMA temp_store=MEMORY;
2559 PRAGMA wal_autocheckpoint=1000;
2560 "
2561 ))?;
2562 if mode == StoreOpenMode::ReadOnlyQuery {
2563 conn.execute_batch("PRAGMA query_only=ON;")?;
2564 }
2565 Ok(())
2566}
2567
2568fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2569 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2570}
2571
2572fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2573 let status_str: String = row.get(6)?;
2574 Ok(SessionRecord {
2575 id: row.get(0)?,
2576 agent: row.get(1)?,
2577 model: row.get(2)?,
2578 workspace: row.get(3)?,
2579 started_at_ms: row.get::<_, i64>(4)? as u64,
2580 ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2581 status: status_from_str(&status_str),
2582 trace_path: row.get(7)?,
2583 start_commit: row.get(8)?,
2584 end_commit: row.get(9)?,
2585 branch: row.get(10)?,
2586 dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2587 dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2588 repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2589 prompt_fingerprint: row.get(14)?,
2590 parent_session_id: row.get(15)?,
2591 agent_version: row.get(16)?,
2592 os: row.get(17)?,
2593 arch: row.get(18)?,
2594 repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2595 repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2596 })
2597}
2598
2599fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2600 let payload_str: String = row.get(12)?;
2601 Ok(Event {
2602 session_id: row.get(0)?,
2603 seq: row.get::<_, i64>(1)? as u64,
2604 ts_ms: row.get::<_, i64>(2)? as u64,
2605 ts_exact: row.get::<_, i64>(3)? != 0,
2606 kind: kind_from_str(&row.get::<_, String>(4)?),
2607 source: source_from_str(&row.get::<_, String>(5)?),
2608 tool: row.get(6)?,
2609 tool_call_id: row.get(7)?,
2610 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2611 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2612 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2613 cost_usd_e6: row.get(11)?,
2614 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2615 stop_reason: row.get(13)?,
2616 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2617 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2618 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2619 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2620 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2621 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2622 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2623 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2624 })
2625}
2626
2627fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2628 let mut clauses = vec!["workspace = ?".to_string()];
2629 let mut args = vec![Value::Text(workspace.to_string())];
2630 if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2631 clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2632 args.push(Value::Text(format!("{}%", escape_like(prefix))));
2633 }
2634 if let Some(status) = &filter.status {
2635 clauses.push("status = ?".to_string());
2636 args.push(Value::Text(format!("{status:?}")));
2637 }
2638 if let Some(since_ms) = filter.since_ms {
2639 clauses.push("started_at_ms >= ?".to_string());
2640 args.push(Value::Integer(since_ms as i64));
2641 }
2642 (format!("WHERE {}", clauses.join(" AND ")), args)
2643}
2644
2645fn escape_like(raw: &str) -> String {
2646 raw.to_lowercase()
2647 .replace('\\', "\\\\")
2648 .replace('%', "\\%")
2649 .replace('_', "\\_")
2650}
2651
2652fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2653 let cost: i64 = conn.query_row(
2654 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2655 params![workspace], |r| r.get(0),
2656 )?;
2657 let with_cost: i64 = conn.query_row(
2658 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2659 params![workspace], |r| r.get(0),
2660 )?;
2661 Ok((cost, with_cost as u64))
2662}
2663
2664fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2665 let build_raw: Option<i64> = r.get(4)?;
2666 let ci_raw: Option<i64> = r.get(8)?;
2667 Ok(SessionOutcomeRow {
2668 session_id: r.get(0)?,
2669 test_passed: r.get(1)?,
2670 test_failed: r.get(2)?,
2671 test_skipped: r.get(3)?,
2672 build_ok: build_raw.map(|v| v != 0),
2673 lint_errors: r.get(5)?,
2674 revert_lines_14d: r.get(6)?,
2675 pr_open: r.get(7)?,
2676 ci_ok: ci_raw.map(|v| v != 0),
2677 measured_at_ms: r.get::<_, i64>(9)? as u64,
2678 measure_error: r.get(10)?,
2679 })
2680}
2681
2682fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2683 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2684 let score = r
2685 .get::<_, Option<i64>>(2)?
2686 .and_then(|v| FeedbackScore::new(v as u8));
2687 let label = r
2688 .get::<_, Option<String>>(3)?
2689 .and_then(|s| FeedbackLabel::from_str_opt(&s));
2690 Ok(FeedbackRecord {
2691 id: r.get(0)?,
2692 session_id: r.get(1)?,
2693 score,
2694 label,
2695 note: r.get(4)?,
2696 created_at_ms: r.get::<_, i64>(5)? as u64,
2697 })
2698}
2699
2700fn day_label(day_idx: u64) -> &'static str {
2701 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2702}
2703
2704fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2705 let week_ago = now.saturating_sub(7 * 86_400_000);
2706 let mut stmt = conn
2707 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2708 let days: Vec<u64> = stmt
2709 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2710 .filter_map(|r| r.ok())
2711 .map(|v| v as u64 / 86_400_000)
2712 .collect();
2713 let today = now / 86_400_000;
2714 Ok((0u64..7)
2715 .map(|i| {
2716 let d = today.saturating_sub(6 - i);
2717 (
2718 day_label(d).to_string(),
2719 days.iter().filter(|&&x| x == d).count() as u64,
2720 )
2721 })
2722 .collect())
2723}
2724
2725fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2726 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2727 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2728 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2729 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2730 COUNT(e.id) FROM sessions s \
2731 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2732 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2733 let mut stmt = conn.prepare(sql)?;
2734 let out: Vec<(SessionRecord, u64)> = stmt
2735 .query_map(params![workspace], |r| {
2736 let st: String = r.get(6)?;
2737 Ok((
2738 SessionRecord {
2739 id: r.get(0)?,
2740 agent: r.get(1)?,
2741 model: r.get(2)?,
2742 workspace: r.get(3)?,
2743 started_at_ms: r.get::<_, i64>(4)? as u64,
2744 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2745 status: status_from_str(&st),
2746 trace_path: r.get(7)?,
2747 start_commit: r.get(8)?,
2748 end_commit: r.get(9)?,
2749 branch: r.get(10)?,
2750 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2751 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2752 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2753 prompt_fingerprint: r.get(14)?,
2754 parent_session_id: r.get(15)?,
2755 agent_version: r.get(16)?,
2756 os: r.get(17)?,
2757 arch: r.get(18)?,
2758 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2759 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2760 },
2761 r.get::<_, i64>(21)? as u64,
2762 ))
2763 })?
2764 .filter_map(|r| r.ok())
2765 .collect();
2766 Ok(out)
2767}
2768
2769fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2770 let mut stmt = conn.prepare(
2771 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2772 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2773 )?;
2774 let out: Vec<(String, u64)> = stmt
2775 .query_map(params![workspace], |r| {
2776 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2777 })?
2778 .filter_map(|r| r.ok())
2779 .collect();
2780 Ok(out)
2781}
2782
2783fn status_from_str(s: &str) -> SessionStatus {
2784 match s {
2785 "Running" => SessionStatus::Running,
2786 "Waiting" => SessionStatus::Waiting,
2787 "Idle" => SessionStatus::Idle,
2788 _ => SessionStatus::Done,
2789 }
2790}
2791
2792fn projector_legacy_mode() -> bool {
2793 std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
2794}
2795
2796fn is_stop_event(e: &Event) -> bool {
2797 if !matches!(e.kind, EventKind::Hook) {
2798 return false;
2799 }
2800 e.payload
2801 .get("event")
2802 .and_then(|v| v.as_str())
2803 .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
2804 == Some("Stop")
2805}
2806
2807fn kind_from_str(s: &str) -> EventKind {
2808 match s {
2809 "ToolCall" => EventKind::ToolCall,
2810 "ToolResult" => EventKind::ToolResult,
2811 "Message" => EventKind::Message,
2812 "Error" => EventKind::Error,
2813 "Cost" => EventKind::Cost,
2814 "Hook" => EventKind::Hook,
2815 "Lifecycle" => EventKind::Lifecycle,
2816 _ => EventKind::Hook,
2817 }
2818}
2819
2820fn source_from_str(s: &str) -> EventSource {
2821 match s {
2822 "Tail" => EventSource::Tail,
2823 "Hook" => EventSource::Hook,
2824 _ => EventSource::Proxy,
2825 }
2826}
2827
2828fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2829 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2830 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2831 ensure_column(conn, "sessions", "branch", "TEXT")?;
2832 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2833 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2834 ensure_column(
2835 conn,
2836 "sessions",
2837 "repo_binding_source",
2838 "TEXT NOT NULL DEFAULT ''",
2839 )?;
2840 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2841 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2842 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2843 ensure_column(conn, "events", "stop_reason", "TEXT")?;
2844 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2845 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2846 ensure_column(conn, "events", "retry_count", "INTEGER")?;
2847 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2848 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2849 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2850 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2851 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2852 ensure_column(
2853 conn,
2854 "sync_outbox",
2855 "kind",
2856 "TEXT NOT NULL DEFAULT 'events'",
2857 )?;
2858 ensure_column(
2859 conn,
2860 "experiments",
2861 "state",
2862 "TEXT NOT NULL DEFAULT 'Draft'",
2863 )?;
2864 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2865 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2866 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2867 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2868 ensure_column(conn, "sessions", "os", "TEXT")?;
2869 ensure_column(conn, "sessions", "arch", "TEXT")?;
2870 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2871 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2872 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2873 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2874 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2875 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2876 conn.execute_batch(
2877 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2878 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2879 )?;
2880 Ok(())
2881}
2882
2883fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2884 if has_column(conn, table, column)? {
2885 return Ok(());
2886 }
2887 conn.execute(
2888 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2889 [],
2890 )?;
2891 Ok(())
2892}
2893
2894fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2895 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2896 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2897 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2898}
2899
2900fn bool_to_i64(v: bool) -> i64 {
2901 if v { 1 } else { 0 }
2902}
2903
2904fn i64_to_bool(v: i64) -> bool {
2905 v != 0
2906}
2907
2908fn empty_to_none(s: String) -> Option<String> {
2909 if s.is_empty() { None } else { Some(s) }
2910}
2911
2912#[cfg(test)]
2913mod tests {
2914 use super::*;
2915 use serde_json::json;
2916 use std::collections::HashSet;
2917 use tempfile::TempDir;
2918
2919 fn make_session(id: &str) -> SessionRecord {
2920 SessionRecord {
2921 id: id.to_string(),
2922 agent: "cursor".to_string(),
2923 model: None,
2924 workspace: "/ws".to_string(),
2925 started_at_ms: 1000,
2926 ended_at_ms: None,
2927 status: SessionStatus::Done,
2928 trace_path: "/trace".to_string(),
2929 start_commit: None,
2930 end_commit: None,
2931 branch: None,
2932 dirty_start: None,
2933 dirty_end: None,
2934 repo_binding_source: None,
2935 prompt_fingerprint: None,
2936 parent_session_id: None,
2937 agent_version: None,
2938 os: None,
2939 arch: None,
2940 repo_file_count: None,
2941 repo_total_loc: None,
2942 }
2943 }
2944
2945 fn make_event(session_id: &str, seq: u64) -> Event {
2946 Event {
2947 session_id: session_id.to_string(),
2948 seq,
2949 ts_ms: 1000 + seq * 100,
2950 ts_exact: false,
2951 kind: EventKind::ToolCall,
2952 source: EventSource::Tail,
2953 tool: Some("read_file".to_string()),
2954 tool_call_id: Some(format!("call_{seq}")),
2955 tokens_in: None,
2956 tokens_out: None,
2957 reasoning_tokens: None,
2958 cost_usd_e6: None,
2959 stop_reason: None,
2960 latency_ms: None,
2961 ttft_ms: None,
2962 retry_count: None,
2963 context_used_tokens: None,
2964 context_max_tokens: None,
2965 cache_creation_tokens: None,
2966 cache_read_tokens: None,
2967 system_prompt_tokens: None,
2968 payload: json!({}),
2969 }
2970 }
2971
2972 #[test]
2973 fn open_and_wal_mode() {
2974 let dir = TempDir::new().unwrap();
2975 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2976 let mode: String = store
2977 .conn
2978 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2979 .unwrap();
2980 assert_eq!(mode, "wal");
2981 }
2982
2983 #[test]
2984 fn open_applies_phase0_pragmas() {
2985 let dir = TempDir::new().unwrap();
2986 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2987 let synchronous: i64 = store
2988 .conn
2989 .query_row("PRAGMA synchronous", [], |r| r.get(0))
2990 .unwrap();
2991 let cache_size: i64 = store
2992 .conn
2993 .query_row("PRAGMA cache_size", [], |r| r.get(0))
2994 .unwrap();
2995 let temp_store: i64 = store
2996 .conn
2997 .query_row("PRAGMA temp_store", [], |r| r.get(0))
2998 .unwrap();
2999 let wal_autocheckpoint: i64 = store
3000 .conn
3001 .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3002 .unwrap();
3003 assert_eq!(synchronous, 1);
3004 assert_eq!(cache_size, -65_536);
3005 assert_eq!(temp_store, 2);
3006 assert_eq!(wal_autocheckpoint, 1_000);
3007 assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3008 }
3009
3010 #[test]
3011 fn read_only_open_sets_query_only() {
3012 let dir = TempDir::new().unwrap();
3013 let db = dir.path().join("kaizen.db");
3014 Store::open(&db).unwrap();
3015 let store = Store::open_read_only(&db).unwrap();
3016 let query_only: i64 = store
3017 .conn
3018 .query_row("PRAGMA query_only", [], |r| r.get(0))
3019 .unwrap();
3020 assert_eq!(query_only, 1);
3021 }
3022
3023 #[test]
3024 fn phase0_indexes_exist() {
3025 let dir = TempDir::new().unwrap();
3026 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3027 for name in [
3028 "tool_spans_session_idx",
3029 "tool_spans_started_idx",
3030 "session_samples_ts_idx",
3031 "events_ts_idx",
3032 "feedback_session_idx",
3033 ] {
3034 let found: i64 = store
3035 .conn
3036 .query_row(
3037 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3038 params![name],
3039 |r| r.get(0),
3040 )
3041 .unwrap();
3042 assert_eq!(found, 1, "{name}");
3043 }
3044 }
3045
3046 #[test]
3047 fn upsert_and_get_session() {
3048 let dir = TempDir::new().unwrap();
3049 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3050 let s = make_session("s1");
3051 store.upsert_session(&s).unwrap();
3052
3053 let got = store.get_session("s1").unwrap().unwrap();
3054 assert_eq!(got.id, "s1");
3055 assert_eq!(got.status, SessionStatus::Done);
3056 }
3057
3058 #[test]
3059 fn append_and_list_events_round_trip() {
3060 let dir = TempDir::new().unwrap();
3061 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3062 let s = make_session("s2");
3063 store.upsert_session(&s).unwrap();
3064 store.append_event(&make_event("s2", 0)).unwrap();
3065 store.append_event(&make_event("s2", 1)).unwrap();
3066
3067 let sessions = store.list_sessions("/ws").unwrap();
3068 assert_eq!(sessions.len(), 1);
3069 assert_eq!(sessions[0].id, "s2");
3070 }
3071
3072 #[test]
3073 fn list_sessions_page_orders_and_counts() {
3074 let dir = TempDir::new().unwrap();
3075 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3076 let mut a = make_session("a");
3077 a.started_at_ms = 2_000;
3078 let mut b = make_session("b");
3079 b.started_at_ms = 2_000;
3080 let mut c = make_session("c");
3081 c.started_at_ms = 1_000;
3082 store.upsert_session(&c).unwrap();
3083 store.upsert_session(&b).unwrap();
3084 store.upsert_session(&a).unwrap();
3085
3086 let page = store
3087 .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3088 .unwrap();
3089 assert_eq!(page.total, 3);
3090 assert_eq!(page.next_offset, Some(2));
3091 assert_eq!(
3092 page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3093 vec!["a", "b"]
3094 );
3095
3096 let all = store.list_sessions("/ws").unwrap();
3097 assert_eq!(
3098 all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3099 vec!["a", "b", "c"]
3100 );
3101 }
3102
3103 #[test]
3104 fn list_sessions_page_filters_in_sql_shape() {
3105 let dir = TempDir::new().unwrap();
3106 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3107 let mut cursor = make_session("cursor");
3108 cursor.agent = "Cursor".into();
3109 cursor.started_at_ms = 2_000;
3110 cursor.status = SessionStatus::Running;
3111 let mut claude = make_session("claude");
3112 claude.agent = "claude".into();
3113 claude.started_at_ms = 3_000;
3114 store.upsert_session(&cursor).unwrap();
3115 store.upsert_session(&claude).unwrap();
3116
3117 let page = store
3118 .list_sessions_page(
3119 "/ws",
3120 0,
3121 10,
3122 SessionFilter {
3123 agent_prefix: Some("cur".into()),
3124 status: Some(SessionStatus::Running),
3125 since_ms: Some(1_500),
3126 },
3127 )
3128 .unwrap();
3129 assert_eq!(page.total, 1);
3130 assert_eq!(page.rows[0].id, "cursor");
3131 }
3132
3133 #[test]
3134 fn incremental_session_helpers_find_new_rows_and_statuses() {
3135 let dir = TempDir::new().unwrap();
3136 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3137 let mut old = make_session("old");
3138 old.started_at_ms = 1_000;
3139 let mut new = make_session("new");
3140 new.started_at_ms = 2_000;
3141 new.status = SessionStatus::Running;
3142 store.upsert_session(&old).unwrap();
3143 store.upsert_session(&new).unwrap();
3144
3145 let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3146 assert_eq!(rows.len(), 1);
3147 assert_eq!(rows[0].id, "new");
3148
3149 store
3150 .update_session_status("new", SessionStatus::Done)
3151 .unwrap();
3152 let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3153 assert_eq!(statuses.len(), 1);
3154 assert_eq!(statuses[0].status, SessionStatus::Done);
3155 }
3156
3157 #[test]
3158 fn summary_stats_empty() {
3159 let dir = TempDir::new().unwrap();
3160 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3161 let stats = store.summary_stats("/ws").unwrap();
3162 assert_eq!(stats.session_count, 0);
3163 assert_eq!(stats.total_cost_usd_e6, 0);
3164 }
3165
3166 #[test]
3167 fn summary_stats_counts_sessions() {
3168 let dir = TempDir::new().unwrap();
3169 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3170 store.upsert_session(&make_session("a")).unwrap();
3171 store.upsert_session(&make_session("b")).unwrap();
3172 let stats = store.summary_stats("/ws").unwrap();
3173 assert_eq!(stats.session_count, 2);
3174 assert_eq!(stats.by_agent.len(), 1);
3175 assert_eq!(stats.by_agent[0].0, "cursor");
3176 assert_eq!(stats.by_agent[0].1, 2);
3177 }
3178
3179 #[test]
3180 fn list_events_for_session_round_trip() {
3181 let dir = TempDir::new().unwrap();
3182 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3183 store.upsert_session(&make_session("s4")).unwrap();
3184 store.append_event(&make_event("s4", 0)).unwrap();
3185 store.append_event(&make_event("s4", 1)).unwrap();
3186 let events = store.list_events_for_session("s4").unwrap();
3187 assert_eq!(events.len(), 2);
3188 assert_eq!(events[0].seq, 0);
3189 assert_eq!(events[1].seq, 1);
3190 }
3191
3192 #[test]
3193 fn list_events_page_uses_inclusive_seq_cursor() {
3194 let dir = TempDir::new().unwrap();
3195 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3196 store.upsert_session(&make_session("paged")).unwrap();
3197 for seq in 0..5 {
3198 store.append_event(&make_event("paged", seq)).unwrap();
3199 }
3200 let first = store.list_events_page("paged", 0, 2).unwrap();
3201 assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3202 let second = store
3203 .list_events_page("paged", first[1].seq + 1, 2)
3204 .unwrap();
3205 assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3206 }
3207
3208 #[test]
3209 fn append_event_dedup() {
3210 let dir = TempDir::new().unwrap();
3211 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3212 store.upsert_session(&make_session("s5")).unwrap();
3213 store.append_event(&make_event("s5", 0)).unwrap();
3214 store.append_event(&make_event("s5", 0)).unwrap();
3216 let events = store.list_events_for_session("s5").unwrap();
3217 assert_eq!(events.len(), 1);
3218 }
3219
3220 #[test]
3221 fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3222 let dir = TempDir::new().unwrap();
3223 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3224 assert!(store.session_span_tree("missing").unwrap().is_empty());
3225 assert!(store.span_tree_cache.borrow().is_some());
3226
3227 store.upsert_session(&make_session("tree")).unwrap();
3228 let call = make_event("tree", 0);
3229 store.append_event(&call).unwrap();
3230 assert!(store.span_tree_cache.borrow().is_none());
3231 assert!(store.session_span_tree("tree").unwrap().is_empty());
3232 assert!(store.span_tree_cache.borrow().is_some());
3233 let mut result = make_event("tree", 1);
3234 result.kind = EventKind::ToolResult;
3235 result.tool_call_id = call.tool_call_id.clone();
3236 store.append_event(&result).unwrap();
3237 assert!(store.span_tree_cache.borrow().is_none());
3238 let first = store.session_span_tree("tree").unwrap();
3239 assert_eq!(first.len(), 1);
3240 assert!(store.span_tree_cache.borrow().is_some());
3241 store.append_event(&make_event("tree", 2)).unwrap();
3242 assert!(store.span_tree_cache.borrow().is_none());
3243 }
3244
3245 #[test]
3246 fn tool_spans_in_window_uses_started_then_ended_fallback() {
3247 let dir = TempDir::new().unwrap();
3248 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3249 store.upsert_session(&make_session("spans")).unwrap();
3250 for (id, started, ended) in [
3251 ("started", Some(200_i64), None),
3252 ("fallback", None, Some(250_i64)),
3253 ("outside", Some(400_i64), None),
3254 ("too_old", None, Some(50_i64)),
3255 ("started_wins", Some(500_i64), Some(200_i64)),
3256 ] {
3257 store
3258 .conn
3259 .execute(
3260 "INSERT INTO tool_spans
3261 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3262 VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3263 params![id, started, ended],
3264 )
3265 .unwrap();
3266 }
3267 let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3268 let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3269 assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3270 }
3271
3272 #[test]
3273 fn upsert_idempotent() {
3274 let dir = TempDir::new().unwrap();
3275 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3276 let mut s = make_session("s3");
3277 store.upsert_session(&s).unwrap();
3278 s.status = SessionStatus::Running;
3279 store.upsert_session(&s).unwrap();
3280
3281 let got = store.get_session("s3").unwrap().unwrap();
3282 assert_eq!(got.status, SessionStatus::Running);
3283 }
3284
3285 #[test]
3286 fn append_event_indexes_path_from_payload() {
3287 let dir = TempDir::new().unwrap();
3288 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3289 store.upsert_session(&make_session("sx")).unwrap();
3290 let mut ev = make_event("sx", 0);
3291 ev.payload = json!({"input": {"path": "src/lib.rs"}});
3292 store.append_event(&ev).unwrap();
3293 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3294 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3295 }
3296
3297 #[test]
3298 fn update_session_status_changes_status() {
3299 let dir = TempDir::new().unwrap();
3300 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3301 store.upsert_session(&make_session("s6")).unwrap();
3302 store
3303 .update_session_status("s6", SessionStatus::Running)
3304 .unwrap();
3305 let got = store.get_session("s6").unwrap().unwrap();
3306 assert_eq!(got.status, SessionStatus::Running);
3307 }
3308
3309 #[test]
3310 fn prune_sessions_removes_old_rows_and_keeps_recent() {
3311 let dir = TempDir::new().unwrap();
3312 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3313 let mut old = make_session("old");
3314 old.started_at_ms = 1_000;
3315 let mut new = make_session("new");
3316 new.started_at_ms = 9_000_000_000_000;
3317 store.upsert_session(&old).unwrap();
3318 store.upsert_session(&new).unwrap();
3319 store.append_event(&make_event("old", 0)).unwrap();
3320
3321 let stats = store.prune_sessions_started_before(5_000).unwrap();
3322 assert_eq!(
3323 stats,
3324 PruneStats {
3325 sessions_removed: 1,
3326 events_removed: 1,
3327 }
3328 );
3329 assert!(store.get_session("old").unwrap().is_none());
3330 assert!(store.get_session("new").unwrap().is_some());
3331 let sessions = store.list_sessions("/ws").unwrap();
3332 assert_eq!(sessions.len(), 1);
3333 assert_eq!(sessions[0].id, "new");
3334 }
3335
3336 #[test]
3337 fn append_event_indexes_rules_from_payload() {
3338 let dir = TempDir::new().unwrap();
3339 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3340 store.upsert_session(&make_session("sr")).unwrap();
3341 let mut ev = make_event("sr", 0);
3342 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3343 store.append_event(&ev).unwrap();
3344 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3345 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3346 }
3347
3348 #[test]
3349 fn guidance_report_counts_skill_and_rule_sessions() {
3350 let dir = TempDir::new().unwrap();
3351 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3352 store.upsert_session(&make_session("sx")).unwrap();
3353 let mut ev = make_event("sx", 0);
3354 ev.payload =
3355 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3356 ev.cost_usd_e6 = Some(500_000);
3357 store.append_event(&ev).unwrap();
3358
3359 let mut skill_slugs = HashSet::new();
3360 skill_slugs.insert("tdd".into());
3361 let mut rule_slugs = HashSet::new();
3362 rule_slugs.insert("style".into());
3363
3364 let rep = store
3365 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3366 .unwrap();
3367 assert_eq!(rep.sessions_in_window, 1);
3368 let tdd = rep
3369 .rows
3370 .iter()
3371 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3372 .unwrap();
3373 assert_eq!(tdd.sessions, 1);
3374 assert!(tdd.on_disk);
3375 let style = rep
3376 .rows
3377 .iter()
3378 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3379 .unwrap();
3380 assert_eq!(style.sessions, 1);
3381 assert!(style.on_disk);
3382 }
3383
3384 #[test]
3385 fn prune_sessions_removes_rules_used_rows() {
3386 let dir = TempDir::new().unwrap();
3387 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3388 let mut old = make_session("old_r");
3389 old.started_at_ms = 1_000;
3390 store.upsert_session(&old).unwrap();
3391 let mut ev = make_event("old_r", 0);
3392 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3393 store.append_event(&ev).unwrap();
3394
3395 store.prune_sessions_started_before(5_000).unwrap();
3396 let n: i64 = store
3397 .conn
3398 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3399 .unwrap();
3400 assert_eq!(n, 0);
3401 }
3402}