1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::types::Value;
15use rusqlite::{
16 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
17};
18use std::cell::RefCell;
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21
22const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
25const DEFAULT_MMAP_MB: u64 = 256;
26const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
27 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
28 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
29 repo_file_count, repo_total_loc FROM sessions";
30
31const MIGRATIONS: &[&str] = &[
32 "CREATE TABLE IF NOT EXISTS sessions (
33 id TEXT PRIMARY KEY,
34 agent TEXT NOT NULL,
35 model TEXT,
36 workspace TEXT NOT NULL,
37 started_at_ms INTEGER NOT NULL,
38 ended_at_ms INTEGER,
39 status TEXT NOT NULL,
40 trace_path TEXT NOT NULL
41 )",
42 "CREATE TABLE IF NOT EXISTS events (
43 id INTEGER PRIMARY KEY AUTOINCREMENT,
44 session_id TEXT NOT NULL,
45 seq INTEGER NOT NULL,
46 ts_ms INTEGER NOT NULL,
47 kind TEXT NOT NULL,
48 source TEXT NOT NULL,
49 tool TEXT,
50 tokens_in INTEGER,
51 tokens_out INTEGER,
52 cost_usd_e6 INTEGER,
53 payload TEXT NOT NULL
54 )",
55 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
56 "CREATE TABLE IF NOT EXISTS files_touched (
57 id INTEGER PRIMARY KEY AUTOINCREMENT,
58 session_id TEXT NOT NULL,
59 path TEXT NOT NULL
60 )",
61 "CREATE TABLE IF NOT EXISTS skills_used (
62 id INTEGER PRIMARY KEY AUTOINCREMENT,
63 session_id TEXT NOT NULL,
64 skill TEXT NOT NULL
65 )",
66 "CREATE TABLE IF NOT EXISTS sync_outbox (
67 id INTEGER PRIMARY KEY AUTOINCREMENT,
68 session_id TEXT NOT NULL,
69 payload TEXT NOT NULL,
70 sent INTEGER NOT NULL DEFAULT 0
71 )",
72 "CREATE TABLE IF NOT EXISTS experiments (
73 id TEXT PRIMARY KEY,
74 name TEXT NOT NULL,
75 created_at_ms INTEGER NOT NULL,
76 metadata TEXT NOT NULL DEFAULT '{}'
77 )",
78 "CREATE TABLE IF NOT EXISTS experiment_tags (
79 experiment_id TEXT NOT NULL,
80 session_id TEXT NOT NULL,
81 variant TEXT NOT NULL,
82 PRIMARY KEY (experiment_id, session_id)
83 )",
84 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
85 "CREATE TABLE IF NOT EXISTS sync_state (
86 k TEXT PRIMARY KEY,
87 v TEXT NOT NULL
88 )",
89 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
90 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
91 "CREATE TABLE IF NOT EXISTS tool_spans (
92 span_id TEXT PRIMARY KEY,
93 session_id TEXT NOT NULL,
94 tool TEXT,
95 tool_call_id TEXT,
96 status TEXT NOT NULL,
97 started_at_ms INTEGER,
98 ended_at_ms INTEGER,
99 lead_time_ms INTEGER,
100 tokens_in INTEGER,
101 tokens_out INTEGER,
102 reasoning_tokens INTEGER,
103 cost_usd_e6 INTEGER,
104 paths_json TEXT NOT NULL DEFAULT '[]'
105 )",
106 "CREATE TABLE IF NOT EXISTS tool_span_paths (
107 span_id TEXT NOT NULL,
108 path TEXT NOT NULL,
109 PRIMARY KEY (span_id, path)
110 )",
111 "CREATE TABLE IF NOT EXISTS session_repo_binding (
112 session_id TEXT PRIMARY KEY,
113 start_commit TEXT,
114 end_commit TEXT,
115 branch TEXT,
116 dirty_start INTEGER,
117 dirty_end INTEGER,
118 repo_binding_source TEXT NOT NULL DEFAULT ''
119 )",
120 "CREATE TABLE IF NOT EXISTS repo_snapshots (
121 id TEXT PRIMARY KEY,
122 workspace TEXT NOT NULL,
123 head_commit TEXT,
124 dirty_fingerprint TEXT NOT NULL,
125 analyzer_version TEXT NOT NULL,
126 indexed_at_ms INTEGER NOT NULL,
127 dirty INTEGER NOT NULL DEFAULT 0,
128 graph_path TEXT NOT NULL
129 )",
130 "CREATE TABLE IF NOT EXISTS file_facts (
131 snapshot_id TEXT NOT NULL,
132 path TEXT NOT NULL,
133 language TEXT NOT NULL,
134 bytes INTEGER NOT NULL,
135 loc INTEGER NOT NULL,
136 sloc INTEGER NOT NULL,
137 complexity_total INTEGER NOT NULL,
138 max_fn_complexity INTEGER NOT NULL,
139 symbol_count INTEGER NOT NULL,
140 import_count INTEGER NOT NULL,
141 fan_in INTEGER NOT NULL,
142 fan_out INTEGER NOT NULL,
143 churn_30d INTEGER NOT NULL,
144 churn_90d INTEGER NOT NULL,
145 authors_90d INTEGER NOT NULL,
146 last_changed_ms INTEGER,
147 PRIMARY KEY (snapshot_id, path)
148 )",
149 "CREATE TABLE IF NOT EXISTS repo_edges (
150 snapshot_id TEXT NOT NULL,
151 from_id TEXT NOT NULL,
152 to_id TEXT NOT NULL,
153 kind TEXT NOT NULL,
154 weight INTEGER NOT NULL,
155 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
156 )",
157 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
159 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
161 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
162 ON sessions(workspace, started_at_ms DESC, id ASC)",
163 "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
164 ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
165 "CREATE TABLE IF NOT EXISTS rules_used (
166 id INTEGER PRIMARY KEY AUTOINCREMENT,
167 session_id TEXT NOT NULL,
168 rule TEXT NOT NULL
169 )",
170 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
171 "CREATE TABLE IF NOT EXISTS remote_pull_state (
173 id INTEGER PRIMARY KEY CHECK (id = 1),
174 query_provider TEXT NOT NULL DEFAULT 'none',
175 cursor_json TEXT NOT NULL DEFAULT '',
176 last_success_ms INTEGER
177 )",
178 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
179 "CREATE TABLE IF NOT EXISTS remote_sessions (
180 team_id TEXT NOT NULL,
181 workspace_hash TEXT NOT NULL,
182 session_id_hash TEXT NOT NULL,
183 json TEXT NOT NULL,
184 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
185 )",
186 "CREATE TABLE IF NOT EXISTS remote_events (
187 team_id TEXT NOT NULL,
188 workspace_hash TEXT NOT NULL,
189 session_id_hash TEXT NOT NULL,
190 event_seq INTEGER NOT NULL,
191 json TEXT NOT NULL,
192 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
193 )",
194 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
195 team_id TEXT NOT NULL,
196 workspace_hash TEXT NOT NULL,
197 span_id_hash TEXT NOT NULL,
198 json TEXT NOT NULL,
199 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
200 )",
201 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
202 team_id TEXT NOT NULL,
203 workspace_hash TEXT NOT NULL,
204 snapshot_id_hash TEXT NOT NULL,
205 chunk_index INTEGER NOT NULL,
206 json TEXT NOT NULL,
207 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
208 )",
209 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
210 team_id TEXT NOT NULL,
211 workspace_hash TEXT NOT NULL,
212 fact_key TEXT NOT NULL,
213 json TEXT NOT NULL,
214 PRIMARY KEY (team_id, workspace_hash, fact_key)
215 )",
216 "CREATE TABLE IF NOT EXISTS session_evals (
217 id TEXT PRIMARY KEY,
218 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
219 judge_model TEXT NOT NULL,
220 rubric_id TEXT NOT NULL,
221 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
222 rationale TEXT NOT NULL,
223 flagged INTEGER NOT NULL DEFAULT 0,
224 created_at_ms INTEGER NOT NULL
225 );
226 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
227 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
228 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
229 fingerprint TEXT PRIMARY KEY,
230 captured_at_ms INTEGER NOT NULL,
231 files_json TEXT NOT NULL,
232 total_bytes INTEGER NOT NULL
233 )",
234 "CREATE TABLE IF NOT EXISTS session_feedback (
235 id TEXT PRIMARY KEY,
236 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
237 score INTEGER CHECK(score BETWEEN 1 AND 5),
238 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
239 note TEXT,
240 created_at_ms INTEGER NOT NULL
241 );
242 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
243 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
244 "CREATE TABLE IF NOT EXISTS session_outcomes (
245 session_id TEXT PRIMARY KEY NOT NULL,
246 test_passed INTEGER,
247 test_failed INTEGER,
248 test_skipped INTEGER,
249 build_ok INTEGER,
250 lint_errors INTEGER,
251 revert_lines_14d INTEGER,
252 pr_open INTEGER,
253 ci_ok INTEGER,
254 measured_at_ms INTEGER NOT NULL,
255 measure_error TEXT
256 )",
257 "CREATE TABLE IF NOT EXISTS session_samples (
258 session_id TEXT NOT NULL,
259 ts_ms INTEGER NOT NULL,
260 pid INTEGER NOT NULL,
261 cpu_percent REAL,
262 rss_bytes INTEGER,
263 PRIMARY KEY (session_id, ts_ms, pid)
264 )",
265 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
266 "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
267 "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
268 "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
269 "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
270 "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
271 "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
272];
273
274#[derive(Clone)]
276pub struct InsightsStats {
277 pub total_sessions: u64,
278 pub running_sessions: u64,
279 pub total_events: u64,
280 pub sessions_by_day: Vec<(String, u64)>,
282 pub recent: Vec<(SessionRecord, u64)>,
284 pub top_tools: Vec<(String, u64)>,
286 pub total_cost_usd_e6: i64,
287 pub sessions_with_cost: u64,
288}
289
290pub struct SyncStatusSnapshot {
292 pub pending_outbox: u64,
293 pub last_success_ms: Option<u64>,
294 pub last_error: Option<String>,
295 pub consecutive_failures: u32,
296}
297
298#[derive(serde::Serialize)]
300pub struct SummaryStats {
301 pub session_count: u64,
302 pub total_cost_usd_e6: i64,
303 pub by_agent: Vec<(String, u64)>,
304 pub by_model: Vec<(String, u64)>,
305 pub top_tools: Vec<(String, u64)>,
306}
307
308#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
310#[serde(rename_all = "lowercase")]
311pub enum GuidanceKind {
312 Skill,
313 Rule,
314}
315
316#[derive(Clone, Debug, serde::Serialize)]
318pub struct GuidancePerfRow {
319 pub kind: GuidanceKind,
320 pub id: String,
321 pub sessions: u64,
322 pub sessions_pct: f64,
323 pub total_cost_usd_e6: i64,
324 pub avg_cost_per_session_usd: Option<f64>,
325 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
326 pub on_disk: bool,
327}
328
329#[derive(Clone, Debug, serde::Serialize)]
331pub struct GuidanceReport {
332 pub workspace: String,
333 pub window_start_ms: u64,
334 pub window_end_ms: u64,
335 pub sessions_in_window: u64,
336 pub workspace_avg_cost_per_session_usd: Option<f64>,
337 pub rows: Vec<GuidancePerfRow>,
338}
339
340#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
342pub struct PruneStats {
343 pub sessions_removed: u64,
344 pub events_removed: u64,
345}
346
347#[derive(Debug, Clone, Eq, PartialEq)]
349pub struct SessionOutcomeRow {
350 pub session_id: String,
351 pub test_passed: Option<i64>,
352 pub test_failed: Option<i64>,
353 pub test_skipped: Option<i64>,
354 pub build_ok: Option<bool>,
355 pub lint_errors: Option<i64>,
356 pub revert_lines_14d: Option<i64>,
357 pub pr_open: Option<i64>,
358 pub ci_ok: Option<bool>,
359 pub measured_at_ms: u64,
360 pub measure_error: Option<String>,
361}
362
363#[derive(Debug, Clone)]
365pub struct SessionSampleAgg {
366 pub session_id: String,
367 pub sample_count: u64,
368 pub max_cpu_percent: f64,
369 pub max_rss_bytes: u64,
370}
371
372pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
374pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
375
376pub struct ToolSpanSyncRow {
377 pub span_id: String,
378 pub session_id: String,
379 pub tool: Option<String>,
380 pub tool_call_id: Option<String>,
381 pub status: String,
382 pub started_at_ms: Option<u64>,
383 pub ended_at_ms: Option<u64>,
384 pub lead_time_ms: Option<u64>,
385 pub tokens_in: Option<u32>,
386 pub tokens_out: Option<u32>,
387 pub reasoning_tokens: Option<u32>,
388 pub cost_usd_e6: Option<i64>,
389 pub paths: Vec<String>,
390}
391
392#[derive(Debug, Clone, Copy, Eq, PartialEq)]
393pub enum StoreOpenMode {
394 ReadWrite,
395 ReadOnlyQuery,
396}
397
398#[derive(Debug, Clone)]
399pub struct SessionStatusRow {
400 pub id: String,
401 pub status: SessionStatus,
402 pub ended_at_ms: Option<u64>,
403}
404
405#[derive(Debug, Clone, Default)]
406pub struct SessionFilter {
407 pub agent_prefix: Option<String>,
408 pub status: Option<SessionStatus>,
409 pub since_ms: Option<u64>,
410}
411
412#[derive(Debug, Clone)]
413pub struct SessionPage {
414 pub rows: Vec<SessionRecord>,
415 pub total: usize,
416 pub next_offset: Option<usize>,
417}
418
419#[derive(Clone)]
420struct SpanTreeCacheEntry {
421 session_id: String,
422 last_event_seq: Option<u64>,
423 nodes: Vec<crate::store::span_tree::SpanNode>,
424}
425
426pub struct Store {
427 conn: Connection,
428 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
429}
430
431impl Store {
432 pub(crate) fn conn(&self) -> &Connection {
433 &self.conn
434 }
435
436 pub fn open(path: &Path) -> Result<Self> {
437 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
438 }
439
440 pub fn open_read_only(path: &Path) -> Result<Self> {
441 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
442 }
443
444 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
445 if let Some(parent) = path.parent() {
446 std::fs::create_dir_all(parent)?;
447 }
448 let conn = match mode {
449 StoreOpenMode::ReadWrite => Connection::open(path),
450 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
451 path,
452 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
453 ),
454 }
455 .with_context(|| format!("open db: {}", path.display()))?;
456 apply_pragmas(&conn, mode)?;
457 if mode == StoreOpenMode::ReadWrite {
458 for sql in MIGRATIONS {
459 conn.execute_batch(sql)?;
460 }
461 ensure_schema_columns(&conn)?;
462 }
463 Ok(Self {
464 conn,
465 span_tree_cache: RefCell::new(None),
466 })
467 }
468
469 fn invalidate_span_tree_cache(&self) {
470 *self.span_tree_cache.borrow_mut() = None;
471 }
472
473 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
474 self.conn.execute(
475 "INSERT INTO sessions (
476 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
477 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
478 prompt_fingerprint, parent_session_id, agent_version, os, arch,
479 repo_file_count, repo_total_loc
480 )
481 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
482 ?16, ?17, ?18, ?19, ?20, ?21)
483 ON CONFLICT(id) DO UPDATE SET
484 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
485 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
486 status=excluded.status, trace_path=excluded.trace_path,
487 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
488 branch=excluded.branch, dirty_start=excluded.dirty_start,
489 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
490 prompt_fingerprint=excluded.prompt_fingerprint,
491 parent_session_id=excluded.parent_session_id,
492 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
493 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
494 params![
495 s.id,
496 s.agent,
497 s.model,
498 s.workspace,
499 s.started_at_ms as i64,
500 s.ended_at_ms.map(|v| v as i64),
501 format!("{:?}", s.status),
502 s.trace_path,
503 s.start_commit,
504 s.end_commit,
505 s.branch,
506 s.dirty_start.map(bool_to_i64),
507 s.dirty_end.map(bool_to_i64),
508 s.repo_binding_source.clone().unwrap_or_default(),
509 s.prompt_fingerprint.as_deref(),
510 s.parent_session_id.as_deref(),
511 s.agent_version.as_deref(),
512 s.os.as_deref(),
513 s.arch.as_deref(),
514 s.repo_file_count.map(|v| v as i64),
515 s.repo_total_loc.map(|v| v as i64),
516 ],
517 )?;
518 self.conn.execute(
519 "INSERT INTO session_repo_binding (
520 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
521 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
522 ON CONFLICT(session_id) DO UPDATE SET
523 start_commit=excluded.start_commit,
524 end_commit=excluded.end_commit,
525 branch=excluded.branch,
526 dirty_start=excluded.dirty_start,
527 dirty_end=excluded.dirty_end,
528 repo_binding_source=excluded.repo_binding_source",
529 params![
530 s.id,
531 s.start_commit,
532 s.end_commit,
533 s.branch,
534 s.dirty_start.map(bool_to_i64),
535 s.dirty_end.map(bool_to_i64),
536 s.repo_binding_source.clone().unwrap_or_default(),
537 ],
538 )?;
539 Ok(())
540 }
541
542 pub fn ensure_session_stub(
545 &self,
546 id: &str,
547 agent: &str,
548 workspace: &str,
549 started_at_ms: u64,
550 ) -> Result<()> {
551 self.conn.execute(
552 "INSERT OR IGNORE INTO sessions (
553 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
554 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
555 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
556 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
557 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
558 params![id, agent, workspace, started_at_ms as i64],
559 )?;
560 Ok(())
561 }
562
563 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
565 let n: i64 = self.conn.query_row(
566 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
567 [session_id],
568 |r| r.get(0),
569 )?;
570 Ok(n as u64)
571 }
572
573 pub fn append_event(&self, e: &Event) -> Result<()> {
574 self.append_event_with_sync(e, None)
575 }
576
577 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
579 let payload = serde_json::to_string(&e.payload)?;
580 self.conn.execute(
581 "INSERT INTO events (
582 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
583 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
584 stop_reason, latency_ms, ttft_ms, retry_count,
585 context_used_tokens, context_max_tokens,
586 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
587 ) VALUES (
588 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
589 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
590 )
591 ON CONFLICT(session_id, seq) DO UPDATE SET
592 ts_ms = excluded.ts_ms,
593 ts_exact = excluded.ts_exact,
594 kind = excluded.kind,
595 source = excluded.source,
596 tool = excluded.tool,
597 tool_call_id = excluded.tool_call_id,
598 tokens_in = excluded.tokens_in,
599 tokens_out = excluded.tokens_out,
600 reasoning_tokens = excluded.reasoning_tokens,
601 cost_usd_e6 = excluded.cost_usd_e6,
602 payload = excluded.payload,
603 stop_reason = excluded.stop_reason,
604 latency_ms = excluded.latency_ms,
605 ttft_ms = excluded.ttft_ms,
606 retry_count = excluded.retry_count,
607 context_used_tokens = excluded.context_used_tokens,
608 context_max_tokens = excluded.context_max_tokens,
609 cache_creation_tokens = excluded.cache_creation_tokens,
610 cache_read_tokens = excluded.cache_read_tokens,
611 system_prompt_tokens = excluded.system_prompt_tokens",
612 params![
613 e.session_id,
614 e.seq as i64,
615 e.ts_ms as i64,
616 bool_to_i64(e.ts_exact),
617 format!("{:?}", e.kind),
618 format!("{:?}", e.source),
619 e.tool,
620 e.tool_call_id,
621 e.tokens_in.map(|v| v as i64),
622 e.tokens_out.map(|v| v as i64),
623 e.reasoning_tokens.map(|v| v as i64),
624 e.cost_usd_e6,
625 payload,
626 e.stop_reason,
627 e.latency_ms.map(|v| v as i64),
628 e.ttft_ms.map(|v| v as i64),
629 e.retry_count.map(|v| v as i64),
630 e.context_used_tokens.map(|v| v as i64),
631 e.context_max_tokens.map(|v| v as i64),
632 e.cache_creation_tokens.map(|v| v as i64),
633 e.cache_read_tokens.map(|v| v as i64),
634 e.system_prompt_tokens.map(|v| v as i64),
635 ],
636 )?;
637 if self.conn.changes() == 0 {
638 return Ok(());
639 }
640 index_event_derived(&self.conn, e)?;
641 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
642 self.invalidate_span_tree_cache();
643 let Some(ctx) = ctx else {
644 return Ok(());
645 };
646 let sync = &ctx.sync;
647 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
648 return Ok(());
649 }
650 let Some(salt) = try_team_salt(sync) else {
651 tracing::warn!(
652 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
653 );
654 return Ok(());
655 };
656 if sync.sample_rate < 1.0 {
657 let u: f64 = rand::random();
658 if u > sync.sample_rate {
659 return Ok(());
660 }
661 }
662 let Some(session) = self.get_session(&e.session_id)? else {
663 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
664 return Ok(());
665 };
666 let mut outbound = outbound_event_from_row(e, &session, &salt);
667 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
668 let row = serde_json::to_string(&outbound)?;
669 self.conn.execute(
670 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
671 params![e.session_id, row],
672 )?;
673 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
674 Ok(())
675 }
676
677 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
678 let mut stmt = self.conn.prepare(
679 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
680 )?;
681 let rows = stmt.query_map(params![limit as i64], |row| {
682 Ok((
683 row.get::<_, i64>(0)?,
684 row.get::<_, String>(1)?,
685 row.get::<_, String>(2)?,
686 ))
687 })?;
688 let mut out = Vec::new();
689 for r in rows {
690 out.push(r?);
691 }
692 Ok(out)
693 }
694
695 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
696 for id in ids {
697 self.conn
698 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
699 }
700 Ok(())
701 }
702
703 pub fn replace_outbox_rows(
704 &self,
705 owner_id: &str,
706 kind: &str,
707 payloads: &[String],
708 ) -> Result<()> {
709 self.conn.execute(
710 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
711 params![owner_id, kind],
712 )?;
713 for payload in payloads {
714 self.conn.execute(
715 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
716 params![owner_id, kind, payload],
717 )?;
718 }
719 Ok(())
720 }
721
722 pub fn outbox_pending_count(&self) -> Result<u64> {
723 let c: i64 =
724 self.conn
725 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
726 r.get(0)
727 })?;
728 Ok(c as u64)
729 }
730
731 pub fn set_sync_state_ok(&self) -> Result<()> {
732 let now = now_ms().to_string();
733 self.conn.execute(
734 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
735 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
736 params![now],
737 )?;
738 self.conn.execute(
739 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
740 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
741 [],
742 )?;
743 self.conn
744 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
745 Ok(())
746 }
747
748 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
749 let prev: i64 = self
750 .conn
751 .query_row(
752 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
753 [],
754 |r| {
755 let s: String = r.get(0)?;
756 Ok(s.parse::<i64>().unwrap_or(0))
757 },
758 )
759 .optional()?
760 .unwrap_or(0);
761 let next = prev.saturating_add(1);
762 self.conn.execute(
763 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
764 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
765 params![msg],
766 )?;
767 self.conn.execute(
768 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
769 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
770 params![next.to_string()],
771 )?;
772 Ok(())
773 }
774
775 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
776 let pending_outbox = self.outbox_pending_count()?;
777 let last_success_ms = self
778 .conn
779 .query_row(
780 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
781 [],
782 |r| r.get::<_, String>(0),
783 )
784 .optional()?
785 .and_then(|s| s.parse().ok());
786 let last_error = self
787 .conn
788 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
789 r.get::<_, String>(0)
790 })
791 .optional()?;
792 let consecutive_failures = self
793 .conn
794 .query_row(
795 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
796 [],
797 |r| r.get::<_, String>(0),
798 )
799 .optional()?
800 .and_then(|s| s.parse().ok())
801 .unwrap_or(0);
802 Ok(SyncStatusSnapshot {
803 pending_outbox,
804 last_success_ms,
805 last_error,
806 consecutive_failures,
807 })
808 }
809
810 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
811 let row: Option<String> = self
812 .conn
813 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
814 r.get::<_, String>(0)
815 })
816 .optional()?;
817 Ok(row.and_then(|s| s.parse().ok()))
818 }
819
820 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
821 self.conn.execute(
822 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
823 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
824 params![key, v.to_string()],
825 )?;
826 Ok(())
827 }
828
829 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
831 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
832 let sessions_to_remove: i64 = tx.query_row(
833 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
834 params![cutoff_ms],
835 |r| r.get(0),
836 )?;
837 let events_to_remove: i64 = tx.query_row(
838 "SELECT COUNT(*) FROM events WHERE session_id IN \
839 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
840 params![cutoff_ms],
841 |r| r.get(0),
842 )?;
843
844 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
845 tx.execute(
846 &format!(
847 "DELETE FROM tool_span_paths WHERE span_id IN \
848 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
849 ),
850 params![cutoff_ms],
851 )?;
852 tx.execute(
853 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
854 params![cutoff_ms],
855 )?;
856 tx.execute(
857 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
858 params![cutoff_ms],
859 )?;
860 tx.execute(
861 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
862 params![cutoff_ms],
863 )?;
864 tx.execute(
865 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
866 params![cutoff_ms],
867 )?;
868 tx.execute(
869 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
870 params![cutoff_ms],
871 )?;
872 tx.execute(
873 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
874 params![cutoff_ms],
875 )?;
876 tx.execute(
877 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
878 params![cutoff_ms],
879 )?;
880 tx.execute(
881 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
882 params![cutoff_ms],
883 )?;
884 tx.execute(
885 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
886 params![cutoff_ms],
887 )?;
888 tx.execute(
889 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
890 params![cutoff_ms],
891 )?;
892 tx.execute(
893 "DELETE FROM sessions WHERE started_at_ms < ?1",
894 params![cutoff_ms],
895 )?;
896 tx.commit()?;
897 self.invalidate_span_tree_cache();
898 Ok(PruneStats {
899 sessions_removed: sessions_to_remove as u64,
900 events_removed: events_to_remove as u64,
901 })
902 }
903
904 pub fn vacuum(&self) -> Result<()> {
906 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
907 Ok(())
908 }
909
910 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
911 Ok(self
912 .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
913 .rows)
914 }
915
916 pub fn list_sessions_page(
917 &self,
918 workspace: &str,
919 offset: usize,
920 limit: usize,
921 filter: SessionFilter,
922 ) -> Result<SessionPage> {
923 let (where_sql, args) = session_filter_sql(workspace, &filter);
924 let total = self.query_session_page_count(&where_sql, &args)?;
925 let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
926 let next = offset.saturating_add(rows.len());
927 Ok(SessionPage {
928 rows,
929 total,
930 next_offset: (next < total).then_some(next),
931 })
932 }
933
934 fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
935 let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
936 let total: i64 = self
937 .conn
938 .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
939 Ok(total as usize)
940 }
941
942 fn query_session_page_rows(
943 &self,
944 where_sql: &str,
945 args: &[Value],
946 offset: usize,
947 limit: usize,
948 ) -> Result<Vec<SessionRecord>> {
949 let sql = format!(
950 "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
951 );
952 let mut values = args.to_vec();
953 values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
954 values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
955 let mut stmt = self.conn.prepare(&sql)?;
956 let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
957 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
958 }
959
960 pub fn list_sessions_started_after(
961 &self,
962 workspace: &str,
963 after_started_at_ms: u64,
964 ) -> Result<Vec<SessionRecord>> {
965 let mut stmt = self.conn.prepare(
966 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
967 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
968 prompt_fingerprint, parent_session_id, agent_version, os, arch,
969 repo_file_count, repo_total_loc
970 FROM sessions
971 WHERE workspace = ?1 AND started_at_ms > ?2
972 ORDER BY started_at_ms DESC, id ASC",
973 )?;
974 let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
975 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
976 }
977
978 pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
979 if ids.is_empty() {
980 return Ok(Vec::new());
981 }
982 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
983 let sql =
984 format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
985 let mut stmt = self.conn.prepare(&sql)?;
986 let params: Vec<&dyn rusqlite::ToSql> =
987 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
988 let rows = stmt.query_map(params.as_slice(), |r| {
989 let status: String = r.get(1)?;
990 Ok(SessionStatusRow {
991 id: r.get(0)?,
992 status: status_from_str(&status),
993 ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
994 })
995 })?;
996 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
997 }
998
999 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1000 let session_count: i64 = self.conn.query_row(
1001 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1002 params![workspace],
1003 |r| r.get(0),
1004 )?;
1005
1006 let total_cost: i64 = self.conn.query_row(
1007 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1008 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1009 params![workspace],
1010 |r| r.get(0),
1011 )?;
1012
1013 let mut stmt = self.conn.prepare(
1014 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1015 )?;
1016 let by_agent: Vec<(String, u64)> = stmt
1017 .query_map(params![workspace], |r| {
1018 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1019 })?
1020 .filter_map(|r| r.ok())
1021 .collect();
1022
1023 let mut stmt = self.conn.prepare(
1024 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1025 )?;
1026 let by_model: Vec<(String, u64)> = stmt
1027 .query_map(params![workspace], |r| {
1028 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1029 })?
1030 .filter_map(|r| r.ok())
1031 .collect();
1032
1033 let mut stmt = self.conn.prepare(
1034 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1035 WHERE s.workspace = ?1 AND tool IS NOT NULL
1036 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1037 )?;
1038 let top_tools: Vec<(String, u64)> = stmt
1039 .query_map(params![workspace], |r| {
1040 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1041 })?
1042 .filter_map(|r| r.ok())
1043 .collect();
1044
1045 Ok(SummaryStats {
1046 session_count: session_count as u64,
1047 total_cost_usd_e6: total_cost,
1048 by_agent,
1049 by_model,
1050 top_tools,
1051 })
1052 }
1053
1054 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1055 self.list_events_page(session_id, 0, i64::MAX as usize)
1056 }
1057
1058 pub fn list_events_page(
1059 &self,
1060 session_id: &str,
1061 after_seq: u64,
1062 limit: usize,
1063 ) -> Result<Vec<Event>> {
1064 let mut stmt = self.conn.prepare(
1065 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1066 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1067 stop_reason, latency_ms, ttft_ms, retry_count,
1068 context_used_tokens, context_max_tokens,
1069 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1070 FROM events
1071 WHERE session_id = ?1 AND seq >= ?2
1072 ORDER BY seq ASC LIMIT ?3",
1073 )?;
1074 let rows = stmt.query_map(
1075 params![
1076 session_id,
1077 after_seq as i64,
1078 limit.min(i64::MAX as usize) as i64
1079 ],
1080 event_row,
1081 )?;
1082 let mut events = Vec::new();
1083 for row in rows {
1084 events.push(row?);
1085 }
1086 Ok(events)
1087 }
1088
1089 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1091 self.conn.execute(
1092 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1093 params![format!("{:?}", status), id],
1094 )?;
1095 Ok(())
1096 }
1097
1098 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1100 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1101 Ok(InsightsStats {
1102 total_sessions: count_q(
1103 &self.conn,
1104 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1105 workspace,
1106 )?,
1107 running_sessions: count_q(
1108 &self.conn,
1109 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1110 workspace,
1111 )?,
1112 total_events: count_q(
1113 &self.conn,
1114 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1115 workspace,
1116 )?,
1117 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1118 recent: recent_sessions_3(&self.conn, workspace)?,
1119 top_tools: top_tools_5(&self.conn, workspace)?,
1120 total_cost_usd_e6,
1121 sessions_with_cost,
1122 })
1123 }
1124
1125 pub fn retro_events_in_window(
1127 &self,
1128 workspace: &str,
1129 start_ms: u64,
1130 end_ms: u64,
1131 ) -> Result<Vec<(SessionRecord, Event)>> {
1132 let mut stmt = self.conn.prepare(
1133 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1134 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1135 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1136 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1137 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1138 s.repo_file_count, s.repo_total_loc,
1139 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1140 e.context_used_tokens, e.context_max_tokens,
1141 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1142 FROM events e
1143 JOIN sessions s ON s.id = e.session_id
1144 WHERE s.workspace = ?1
1145 AND (
1146 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1147 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1148 )
1149 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1150 )?;
1151 let rows = stmt.query_map(
1152 params![
1153 workspace,
1154 start_ms as i64,
1155 end_ms as i64,
1156 SYNTHETIC_TS_CEILING_MS,
1157 ],
1158 |row| {
1159 let payload_str: String = row.get(12)?;
1160 let status_str: String = row.get(19)?;
1161 Ok((
1162 SessionRecord {
1163 id: row.get(13)?,
1164 agent: row.get(14)?,
1165 model: row.get(15)?,
1166 workspace: row.get(16)?,
1167 started_at_ms: row.get::<_, i64>(17)? as u64,
1168 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1169 status: status_from_str(&status_str),
1170 trace_path: row.get(20)?,
1171 start_commit: row.get(21)?,
1172 end_commit: row.get(22)?,
1173 branch: row.get(23)?,
1174 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1175 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1176 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1177 prompt_fingerprint: row.get(27)?,
1178 parent_session_id: row.get(28)?,
1179 agent_version: row.get(29)?,
1180 os: row.get(30)?,
1181 arch: row.get(31)?,
1182 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1183 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1184 },
1185 Event {
1186 session_id: row.get(0)?,
1187 seq: row.get::<_, i64>(1)? as u64,
1188 ts_ms: row.get::<_, i64>(2)? as u64,
1189 ts_exact: row.get::<_, i64>(3)? != 0,
1190 kind: kind_from_str(&row.get::<_, String>(4)?),
1191 source: source_from_str(&row.get::<_, String>(5)?),
1192 tool: row.get(6)?,
1193 tool_call_id: row.get(7)?,
1194 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1195 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1196 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1197 cost_usd_e6: row.get(11)?,
1198 payload: serde_json::from_str(&payload_str)
1199 .unwrap_or(serde_json::Value::Null),
1200 stop_reason: row.get(34)?,
1201 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1202 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1203 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1204 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1205 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1206 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1207 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1208 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1209 },
1210 ))
1211 },
1212 )?;
1213
1214 let mut out = Vec::new();
1215 for r in rows {
1216 out.push(r?);
1217 }
1218 Ok(out)
1219 }
1220
1221 pub fn files_touched_in_window(
1223 &self,
1224 workspace: &str,
1225 start_ms: u64,
1226 end_ms: u64,
1227 ) -> Result<Vec<(String, String)>> {
1228 let mut stmt = self.conn.prepare(
1229 "SELECT DISTINCT ft.session_id, ft.path
1230 FROM files_touched ft
1231 JOIN sessions s ON s.id = ft.session_id
1232 WHERE s.workspace = ?1
1233 AND EXISTS (
1234 SELECT 1 FROM events e
1235 JOIN sessions ss ON ss.id = e.session_id
1236 WHERE e.session_id = ft.session_id
1237 AND (
1238 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1239 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1240 )
1241 )
1242 ORDER BY ft.session_id, ft.path",
1243 )?;
1244 let out: Vec<(String, String)> = stmt
1245 .query_map(
1246 params![
1247 workspace,
1248 start_ms as i64,
1249 end_ms as i64,
1250 SYNTHETIC_TS_CEILING_MS,
1251 ],
1252 |r| Ok((r.get(0)?, r.get(1)?)),
1253 )?
1254 .filter_map(|r| r.ok())
1255 .collect();
1256 Ok(out)
1257 }
1258
1259 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1262 let mut stmt = self.conn.prepare(
1263 "SELECT DISTINCT su.skill
1264 FROM skills_used su
1265 JOIN sessions s ON s.id = su.session_id
1266 WHERE s.workspace = ?1
1267 AND EXISTS (
1268 SELECT 1 FROM events e
1269 JOIN sessions ss ON ss.id = e.session_id
1270 WHERE e.session_id = su.session_id
1271 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1272 )
1273 ORDER BY su.skill",
1274 )?;
1275 let out: Vec<String> = stmt
1276 .query_map(
1277 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1278 |r| r.get::<_, String>(0),
1279 )?
1280 .filter_map(|r| r.ok())
1281 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1282 .collect();
1283 Ok(out)
1284 }
1285
1286 pub fn skills_used_in_window(
1288 &self,
1289 workspace: &str,
1290 start_ms: u64,
1291 end_ms: u64,
1292 ) -> Result<Vec<(String, String)>> {
1293 let mut stmt = self.conn.prepare(
1294 "SELECT DISTINCT su.session_id, su.skill
1295 FROM skills_used su
1296 JOIN sessions s ON s.id = su.session_id
1297 WHERE s.workspace = ?1
1298 AND EXISTS (
1299 SELECT 1 FROM events e
1300 JOIN sessions ss ON ss.id = e.session_id
1301 WHERE e.session_id = su.session_id
1302 AND (
1303 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1304 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1305 )
1306 )
1307 ORDER BY su.session_id, su.skill",
1308 )?;
1309 let out: Vec<(String, String)> = stmt
1310 .query_map(
1311 params![
1312 workspace,
1313 start_ms as i64,
1314 end_ms as i64,
1315 SYNTHETIC_TS_CEILING_MS,
1316 ],
1317 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1318 )?
1319 .filter_map(|r| r.ok())
1320 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1321 .collect();
1322 Ok(out)
1323 }
1324
1325 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1327 let mut stmt = self.conn.prepare(
1328 "SELECT DISTINCT ru.rule
1329 FROM rules_used ru
1330 JOIN sessions s ON s.id = ru.session_id
1331 WHERE s.workspace = ?1
1332 AND EXISTS (
1333 SELECT 1 FROM events e
1334 JOIN sessions ss ON ss.id = e.session_id
1335 WHERE e.session_id = ru.session_id
1336 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1337 )
1338 ORDER BY ru.rule",
1339 )?;
1340 let out: Vec<String> = stmt
1341 .query_map(
1342 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1343 |r| r.get::<_, String>(0),
1344 )?
1345 .filter_map(|r| r.ok())
1346 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1347 .collect();
1348 Ok(out)
1349 }
1350
1351 pub fn rules_used_in_window(
1353 &self,
1354 workspace: &str,
1355 start_ms: u64,
1356 end_ms: u64,
1357 ) -> Result<Vec<(String, String)>> {
1358 let mut stmt = self.conn.prepare(
1359 "SELECT DISTINCT ru.session_id, ru.rule
1360 FROM rules_used ru
1361 JOIN sessions s ON s.id = ru.session_id
1362 WHERE s.workspace = ?1
1363 AND EXISTS (
1364 SELECT 1 FROM events e
1365 JOIN sessions ss ON ss.id = e.session_id
1366 WHERE e.session_id = ru.session_id
1367 AND (
1368 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1369 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1370 )
1371 )
1372 ORDER BY ru.session_id, ru.rule",
1373 )?;
1374 let out: Vec<(String, String)> = stmt
1375 .query_map(
1376 params![
1377 workspace,
1378 start_ms as i64,
1379 end_ms as i64,
1380 SYNTHETIC_TS_CEILING_MS,
1381 ],
1382 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1383 )?
1384 .filter_map(|r| r.ok())
1385 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1386 .collect();
1387 Ok(out)
1388 }
1389
1390 pub fn sessions_active_in_window(
1392 &self,
1393 workspace: &str,
1394 start_ms: u64,
1395 end_ms: u64,
1396 ) -> Result<HashSet<String>> {
1397 let mut stmt = self.conn.prepare(
1398 "SELECT DISTINCT s.id
1399 FROM sessions s
1400 WHERE s.workspace = ?1
1401 AND EXISTS (
1402 SELECT 1 FROM events e
1403 WHERE e.session_id = s.id
1404 AND (
1405 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1406 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1407 )
1408 )",
1409 )?;
1410 let out: HashSet<String> = stmt
1411 .query_map(
1412 params![
1413 workspace,
1414 start_ms as i64,
1415 end_ms as i64,
1416 SYNTHETIC_TS_CEILING_MS,
1417 ],
1418 |r| r.get(0),
1419 )?
1420 .filter_map(|r| r.ok())
1421 .collect();
1422 Ok(out)
1423 }
1424
1425 pub fn session_costs_usd_e6_in_window(
1427 &self,
1428 workspace: &str,
1429 start_ms: u64,
1430 end_ms: u64,
1431 ) -> Result<HashMap<String, i64>> {
1432 let mut stmt = self.conn.prepare(
1433 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1434 FROM events e
1435 JOIN sessions s ON s.id = e.session_id
1436 WHERE s.workspace = ?1
1437 AND (
1438 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1439 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1440 )
1441 GROUP BY e.session_id",
1442 )?;
1443 let rows: Vec<(String, i64)> = stmt
1444 .query_map(
1445 params![
1446 workspace,
1447 start_ms as i64,
1448 end_ms as i64,
1449 SYNTHETIC_TS_CEILING_MS,
1450 ],
1451 |r| Ok((r.get(0)?, r.get(1)?)),
1452 )?
1453 .filter_map(|r| r.ok())
1454 .collect();
1455 Ok(rows.into_iter().collect())
1456 }
1457
1458 pub fn guidance_report(
1460 &self,
1461 workspace: &str,
1462 window_start_ms: u64,
1463 window_end_ms: u64,
1464 skill_slugs_on_disk: &HashSet<String>,
1465 rule_slugs_on_disk: &HashSet<String>,
1466 ) -> Result<GuidanceReport> {
1467 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1468 let denom = active.len() as u64;
1469 let costs =
1470 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1471
1472 let workspace_avg_cost_per_session_usd = if denom > 0 {
1473 let total_e6: i64 = active
1474 .iter()
1475 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1476 .sum();
1477 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1478 } else {
1479 None
1480 };
1481
1482 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1483 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1484 skill_sessions.entry(skill).or_default().insert(sid);
1485 }
1486 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1487 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1488 rule_sessions.entry(rule).or_default().insert(sid);
1489 }
1490
1491 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1492
1493 let mut push_row =
1494 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1495 let sessions = sids.len() as u64;
1496 let sessions_pct = if denom > 0 {
1497 sessions as f64 * 100.0 / denom as f64
1498 } else {
1499 0.0
1500 };
1501 let total_cost_usd_e6: i64 = sids
1502 .iter()
1503 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1504 .sum();
1505 let avg_cost_per_session_usd = if sessions > 0 {
1506 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1507 } else {
1508 None
1509 };
1510 let vs_workspace_avg_cost_per_session_usd =
1511 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1512 (Some(avg), Some(w)) => Some(avg - w),
1513 _ => None,
1514 };
1515 rows.push(GuidancePerfRow {
1516 kind,
1517 id,
1518 sessions,
1519 sessions_pct,
1520 total_cost_usd_e6,
1521 avg_cost_per_session_usd,
1522 vs_workspace_avg_cost_per_session_usd,
1523 on_disk,
1524 });
1525 };
1526
1527 let mut seen_skills: HashSet<String> = HashSet::new();
1528 for (id, sids) in &skill_sessions {
1529 seen_skills.insert(id.clone());
1530 push_row(
1531 GuidanceKind::Skill,
1532 id.clone(),
1533 sids,
1534 skill_slugs_on_disk.contains(id),
1535 );
1536 }
1537 for slug in skill_slugs_on_disk {
1538 if seen_skills.contains(slug) {
1539 continue;
1540 }
1541 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1542 }
1543
1544 let mut seen_rules: HashSet<String> = HashSet::new();
1545 for (id, sids) in &rule_sessions {
1546 seen_rules.insert(id.clone());
1547 push_row(
1548 GuidanceKind::Rule,
1549 id.clone(),
1550 sids,
1551 rule_slugs_on_disk.contains(id),
1552 );
1553 }
1554 for slug in rule_slugs_on_disk {
1555 if seen_rules.contains(slug) {
1556 continue;
1557 }
1558 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1559 }
1560
1561 rows.sort_by(|a, b| {
1562 b.sessions
1563 .cmp(&a.sessions)
1564 .then_with(|| a.kind.cmp(&b.kind))
1565 .then_with(|| a.id.cmp(&b.id))
1566 });
1567
1568 Ok(GuidanceReport {
1569 workspace: workspace.to_string(),
1570 window_start_ms,
1571 window_end_ms,
1572 sessions_in_window: denom,
1573 workspace_avg_cost_per_session_usd,
1574 rows,
1575 })
1576 }
1577
1578 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1579 let mut stmt = self.conn.prepare(
1580 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1581 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1582 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1583 repo_file_count, repo_total_loc
1584 FROM sessions WHERE id = ?1",
1585 )?;
1586 let mut rows = stmt.query_map(params![id], |row| {
1587 Ok((
1588 row.get::<_, String>(0)?,
1589 row.get::<_, String>(1)?,
1590 row.get::<_, Option<String>>(2)?,
1591 row.get::<_, String>(3)?,
1592 row.get::<_, i64>(4)?,
1593 row.get::<_, Option<i64>>(5)?,
1594 row.get::<_, String>(6)?,
1595 row.get::<_, String>(7)?,
1596 row.get::<_, Option<String>>(8)?,
1597 row.get::<_, Option<String>>(9)?,
1598 row.get::<_, Option<String>>(10)?,
1599 row.get::<_, Option<i64>>(11)?,
1600 row.get::<_, Option<i64>>(12)?,
1601 row.get::<_, String>(13)?,
1602 row.get::<_, Option<String>>(14)?,
1603 row.get::<_, Option<String>>(15)?,
1604 row.get::<_, Option<String>>(16)?,
1605 row.get::<_, Option<String>>(17)?,
1606 row.get::<_, Option<String>>(18)?,
1607 row.get::<_, Option<i64>>(19)?,
1608 row.get::<_, Option<i64>>(20)?,
1609 ))
1610 })?;
1611
1612 if let Some(row) = rows.next() {
1613 let (
1614 id,
1615 agent,
1616 model,
1617 workspace,
1618 started,
1619 ended,
1620 status_str,
1621 trace,
1622 start_commit,
1623 end_commit,
1624 branch,
1625 dirty_start,
1626 dirty_end,
1627 source,
1628 prompt_fingerprint,
1629 parent_session_id,
1630 agent_version,
1631 os,
1632 arch,
1633 repo_file_count,
1634 repo_total_loc,
1635 ) = row?;
1636 Ok(Some(SessionRecord {
1637 id,
1638 agent,
1639 model,
1640 workspace,
1641 started_at_ms: started as u64,
1642 ended_at_ms: ended.map(|v| v as u64),
1643 status: status_from_str(&status_str),
1644 trace_path: trace,
1645 start_commit,
1646 end_commit,
1647 branch,
1648 dirty_start: dirty_start.map(i64_to_bool),
1649 dirty_end: dirty_end.map(i64_to_bool),
1650 repo_binding_source: empty_to_none(source),
1651 prompt_fingerprint,
1652 parent_session_id,
1653 agent_version,
1654 os,
1655 arch,
1656 repo_file_count: repo_file_count.map(|v| v as u32),
1657 repo_total_loc: repo_total_loc.map(|v| v as u64),
1658 }))
1659 } else {
1660 Ok(None)
1661 }
1662 }
1663
1664 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1665 let mut stmt = self.conn.prepare(
1666 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1667 indexed_at_ms, dirty, graph_path
1668 FROM repo_snapshots WHERE workspace = ?1
1669 ORDER BY indexed_at_ms DESC LIMIT 1",
1670 )?;
1671 let mut rows = stmt.query_map(params![workspace], |row| {
1672 Ok(RepoSnapshotRecord {
1673 id: row.get(0)?,
1674 workspace: row.get(1)?,
1675 head_commit: row.get(2)?,
1676 dirty_fingerprint: row.get(3)?,
1677 analyzer_version: row.get(4)?,
1678 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1679 dirty: row.get::<_, i64>(6)? != 0,
1680 graph_path: row.get(7)?,
1681 })
1682 })?;
1683 Ok(rows.next().transpose()?)
1684 }
1685
1686 pub fn save_repo_snapshot(
1687 &self,
1688 snapshot: &RepoSnapshotRecord,
1689 facts: &[FileFact],
1690 edges: &[RepoEdge],
1691 ) -> Result<()> {
1692 self.conn.execute(
1693 "INSERT INTO repo_snapshots (
1694 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1695 indexed_at_ms, dirty, graph_path
1696 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1697 ON CONFLICT(id) DO UPDATE SET
1698 workspace=excluded.workspace,
1699 head_commit=excluded.head_commit,
1700 dirty_fingerprint=excluded.dirty_fingerprint,
1701 analyzer_version=excluded.analyzer_version,
1702 indexed_at_ms=excluded.indexed_at_ms,
1703 dirty=excluded.dirty,
1704 graph_path=excluded.graph_path",
1705 params![
1706 snapshot.id,
1707 snapshot.workspace,
1708 snapshot.head_commit,
1709 snapshot.dirty_fingerprint,
1710 snapshot.analyzer_version,
1711 snapshot.indexed_at_ms as i64,
1712 bool_to_i64(snapshot.dirty),
1713 snapshot.graph_path,
1714 ],
1715 )?;
1716 self.conn.execute(
1717 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1718 params![snapshot.id],
1719 )?;
1720 self.conn.execute(
1721 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1722 params![snapshot.id],
1723 )?;
1724 for fact in facts {
1725 self.conn.execute(
1726 "INSERT INTO file_facts (
1727 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1728 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1729 churn_30d, churn_90d, authors_90d, last_changed_ms
1730 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1731 params![
1732 fact.snapshot_id,
1733 fact.path,
1734 fact.language,
1735 fact.bytes as i64,
1736 fact.loc as i64,
1737 fact.sloc as i64,
1738 fact.complexity_total as i64,
1739 fact.max_fn_complexity as i64,
1740 fact.symbol_count as i64,
1741 fact.import_count as i64,
1742 fact.fan_in as i64,
1743 fact.fan_out as i64,
1744 fact.churn_30d as i64,
1745 fact.churn_90d as i64,
1746 fact.authors_90d as i64,
1747 fact.last_changed_ms.map(|v| v as i64),
1748 ],
1749 )?;
1750 }
1751 for edge in edges {
1752 self.conn.execute(
1753 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1754 VALUES (?1, ?2, ?3, ?4, ?5)
1755 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1756 DO UPDATE SET weight = weight + excluded.weight",
1757 params![
1758 snapshot.id,
1759 edge.from_path,
1760 edge.to_path,
1761 edge.kind,
1762 edge.weight as i64,
1763 ],
1764 )?;
1765 }
1766 Ok(())
1767 }
1768
1769 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1770 let mut stmt = self.conn.prepare(
1771 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1772 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1773 churn_30d, churn_90d, authors_90d, last_changed_ms
1774 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1775 )?;
1776 let rows = stmt.query_map(params![snapshot_id], |row| {
1777 Ok(FileFact {
1778 snapshot_id: row.get(0)?,
1779 path: row.get(1)?,
1780 language: row.get(2)?,
1781 bytes: row.get::<_, i64>(3)? as u64,
1782 loc: row.get::<_, i64>(4)? as u32,
1783 sloc: row.get::<_, i64>(5)? as u32,
1784 complexity_total: row.get::<_, i64>(6)? as u32,
1785 max_fn_complexity: row.get::<_, i64>(7)? as u32,
1786 symbol_count: row.get::<_, i64>(8)? as u32,
1787 import_count: row.get::<_, i64>(9)? as u32,
1788 fan_in: row.get::<_, i64>(10)? as u32,
1789 fan_out: row.get::<_, i64>(11)? as u32,
1790 churn_30d: row.get::<_, i64>(12)? as u32,
1791 churn_90d: row.get::<_, i64>(13)? as u32,
1792 authors_90d: row.get::<_, i64>(14)? as u32,
1793 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1794 })
1795 })?;
1796 Ok(rows.filter_map(|row| row.ok()).collect())
1797 }
1798
1799 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1800 let mut stmt = self.conn.prepare(
1801 "SELECT from_id, to_id, kind, weight
1802 FROM repo_edges WHERE snapshot_id = ?1
1803 ORDER BY kind, from_id, to_id",
1804 )?;
1805 let rows = stmt.query_map(params![snapshot_id], |row| {
1806 Ok(RepoEdge {
1807 from_path: row.get(0)?,
1808 to_path: row.get(1)?,
1809 kind: row.get(2)?,
1810 weight: row.get::<_, i64>(3)? as u32,
1811 })
1812 })?;
1813 Ok(rows.filter_map(|row| row.ok()).collect())
1814 }
1815
1816 pub fn tool_spans_in_window(
1817 &self,
1818 workspace: &str,
1819 start_ms: u64,
1820 end_ms: u64,
1821 ) -> Result<Vec<ToolSpanView>> {
1822 let mut stmt = self.conn.prepare(
1823 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1824 reasoning_tokens, cost_usd_e6, paths_json,
1825 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1826 FROM (
1827 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1828 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1829 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1830 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1831 ts.started_at_ms AS sort_ms
1832 FROM tool_spans ts
1833 JOIN sessions s ON s.id = ts.session_id
1834 WHERE s.workspace = ?1
1835 AND ts.started_at_ms >= ?2
1836 AND ts.started_at_ms <= ?3
1837 UNION ALL
1838 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
1839 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
1840 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
1841 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
1842 ts.ended_at_ms AS sort_ms
1843 FROM tool_spans ts
1844 JOIN sessions s ON s.id = ts.session_id
1845 WHERE s.workspace = ?1
1846 AND ts.started_at_ms IS NULL
1847 AND ts.ended_at_ms >= ?2
1848 AND ts.ended_at_ms <= ?3
1849 )
1850 ORDER BY sort_ms DESC",
1851 )?;
1852 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1853 let paths_json: String = row.get(8)?;
1854 Ok(ToolSpanView {
1855 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1856 tool: row
1857 .get::<_, Option<String>>(1)?
1858 .unwrap_or_else(|| "unknown".into()),
1859 status: row.get(2)?,
1860 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1861 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1862 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1863 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1864 cost_usd_e6: row.get(7)?,
1865 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1866 parent_span_id: row.get(9)?,
1867 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1868 subtree_cost_usd_e6: row.get(11)?,
1869 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1870 })
1871 })?;
1872 Ok(rows.filter_map(|row| row.ok()).collect())
1873 }
1874
1875 pub fn session_span_tree(
1876 &self,
1877 session_id: &str,
1878 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
1879 let last_event_seq = self.last_event_seq_for_session(session_id)?;
1880 if let Some(entry) = self.span_tree_cache.borrow().as_ref()
1881 && entry.session_id == session_id
1882 && entry.last_event_seq == last_event_seq
1883 {
1884 return Ok(entry.nodes.clone());
1885 }
1886 let mut stmt = self.conn.prepare(
1887 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1888 reasoning_tokens, cost_usd_e6, paths_json,
1889 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1890 FROM tool_spans
1891 WHERE session_id = ?1
1892 ORDER BY depth ASC, started_at_ms ASC",
1893 )?;
1894 let rows = stmt.query_map(params![session_id], |row| {
1895 let paths_json: String = row.get(8)?;
1896 Ok(crate::metrics::types::ToolSpanView {
1897 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1898 tool: row
1899 .get::<_, Option<String>>(1)?
1900 .unwrap_or_else(|| "unknown".into()),
1901 status: row.get(2)?,
1902 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1903 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1904 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1905 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1906 cost_usd_e6: row.get(7)?,
1907 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1908 parent_span_id: row.get(9)?,
1909 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1910 subtree_cost_usd_e6: row.get(11)?,
1911 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1912 })
1913 })?;
1914 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1915 let nodes = crate::store::span_tree::build_tree(spans);
1916 *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
1917 session_id: session_id.to_string(),
1918 last_event_seq,
1919 nodes: nodes.clone(),
1920 });
1921 Ok(nodes)
1922 }
1923
1924 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
1925 let seq = self
1926 .conn
1927 .query_row(
1928 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
1929 params![session_id],
1930 |r| r.get::<_, Option<i64>>(0),
1931 )?
1932 .map(|v| v as u64);
1933 Ok(seq)
1934 }
1935
1936 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1937 let mut stmt = self.conn.prepare(
1938 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1939 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1940 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1941 )?;
1942 let rows = stmt.query_map(params![session_id], |row| {
1943 let paths_json: String = row.get(12)?;
1944 Ok(ToolSpanSyncRow {
1945 span_id: row.get(0)?,
1946 session_id: row.get(1)?,
1947 tool: row.get(2)?,
1948 tool_call_id: row.get(3)?,
1949 status: row.get(4)?,
1950 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1951 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1952 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1953 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1954 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1955 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1956 cost_usd_e6: row.get(11)?,
1957 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1958 })
1959 })?;
1960 Ok(rows.filter_map(|row| row.ok()).collect())
1961 }
1962
1963 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1964 self.conn.execute(
1965 "INSERT OR REPLACE INTO session_evals
1966 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1967 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1968 rusqlite::params![
1969 eval.id,
1970 eval.session_id,
1971 eval.judge_model,
1972 eval.rubric_id,
1973 eval.score,
1974 eval.rationale,
1975 eval.flagged as i64,
1976 eval.created_at_ms as i64,
1977 ],
1978 )?;
1979 Ok(())
1980 }
1981
1982 pub fn list_evals_in_window(
1983 &self,
1984 start_ms: u64,
1985 end_ms: u64,
1986 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1987 let mut stmt = self.conn.prepare(
1988 "SELECT id, session_id, judge_model, rubric_id, score,
1989 rationale, flagged, created_at_ms
1990 FROM session_evals
1991 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1992 ORDER BY created_at_ms ASC",
1993 )?;
1994 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1995 Ok(crate::eval::types::EvalRow {
1996 id: r.get(0)?,
1997 session_id: r.get(1)?,
1998 judge_model: r.get(2)?,
1999 rubric_id: r.get(3)?,
2000 score: r.get(4)?,
2001 rationale: r.get(5)?,
2002 flagged: r.get::<_, i64>(6)? != 0,
2003 created_at_ms: r.get::<_, i64>(7)? as u64,
2004 })
2005 })?;
2006 rows.collect()
2007 }
2008
2009 pub fn list_evals_for_session(
2010 &self,
2011 session_id: &str,
2012 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2013 let mut stmt = self.conn.prepare(
2014 "SELECT id, session_id, judge_model, rubric_id, score,
2015 rationale, flagged, created_at_ms
2016 FROM session_evals
2017 WHERE session_id = ?1
2018 ORDER BY created_at_ms DESC",
2019 )?;
2020 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2021 Ok(crate::eval::types::EvalRow {
2022 id: r.get(0)?,
2023 session_id: r.get(1)?,
2024 judge_model: r.get(2)?,
2025 rubric_id: r.get(3)?,
2026 score: r.get(4)?,
2027 rationale: r.get(5)?,
2028 flagged: r.get::<_, i64>(6)? != 0,
2029 created_at_ms: r.get::<_, i64>(7)? as u64,
2030 })
2031 })?;
2032 rows.collect()
2033 }
2034
2035 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2036 use crate::feedback::types::FeedbackLabel;
2037 self.conn.execute(
2038 "INSERT OR REPLACE INTO session_feedback
2039 (id, session_id, score, label, note, created_at_ms)
2040 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2041 rusqlite::params![
2042 r.id,
2043 r.session_id,
2044 r.score.as_ref().map(|s| s.0 as i64),
2045 r.label.as_ref().map(FeedbackLabel::to_db_str),
2046 r.note,
2047 r.created_at_ms as i64,
2048 ],
2049 )?;
2050 let payload = serde_json::to_string(r).unwrap_or_default();
2051 self.conn.execute(
2052 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2053 VALUES (?1, 'session_feedback', ?2, 0)",
2054 rusqlite::params![r.session_id, payload],
2055 )?;
2056 Ok(())
2057 }
2058
2059 pub fn list_feedback_in_window(
2060 &self,
2061 start_ms: u64,
2062 end_ms: u64,
2063 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2064 let mut stmt = self.conn.prepare(
2065 "SELECT id, session_id, score, label, note, created_at_ms
2066 FROM session_feedback
2067 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2068 ORDER BY created_at_ms ASC",
2069 )?;
2070 let rows = stmt.query_map(
2071 rusqlite::params![start_ms as i64, end_ms as i64],
2072 feedback_row,
2073 )?;
2074 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2075 }
2076
2077 pub fn feedback_for_sessions(
2078 &self,
2079 ids: &[String],
2080 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2081 if ids.is_empty() {
2082 return Ok(std::collections::HashMap::new());
2083 }
2084 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2085 let sql = format!(
2086 "SELECT id, session_id, score, label, note, created_at_ms
2087 FROM session_feedback WHERE session_id IN ({placeholders})
2088 ORDER BY created_at_ms DESC"
2089 );
2090 let mut stmt = self.conn.prepare(&sql)?;
2091 let params: Vec<&dyn rusqlite::ToSql> =
2092 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2093 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2094 let mut map = std::collections::HashMap::new();
2095 for row in rows {
2096 let r = row?;
2097 map.entry(r.session_id.clone()).or_insert(r);
2098 }
2099 Ok(map)
2100 }
2101
2102 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2103 self.conn.execute(
2104 "INSERT INTO session_outcomes (
2105 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2106 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2107 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2108 ON CONFLICT(session_id) DO UPDATE SET
2109 test_passed=excluded.test_passed,
2110 test_failed=excluded.test_failed,
2111 test_skipped=excluded.test_skipped,
2112 build_ok=excluded.build_ok,
2113 lint_errors=excluded.lint_errors,
2114 revert_lines_14d=excluded.revert_lines_14d,
2115 pr_open=excluded.pr_open,
2116 ci_ok=excluded.ci_ok,
2117 measured_at_ms=excluded.measured_at_ms,
2118 measure_error=excluded.measure_error",
2119 params![
2120 row.session_id,
2121 row.test_passed,
2122 row.test_failed,
2123 row.test_skipped,
2124 row.build_ok.map(bool_to_i64),
2125 row.lint_errors,
2126 row.revert_lines_14d,
2127 row.pr_open,
2128 row.ci_ok.map(bool_to_i64),
2129 row.measured_at_ms as i64,
2130 row.measure_error.as_deref(),
2131 ],
2132 )?;
2133 Ok(())
2134 }
2135
2136 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2137 let mut stmt = self.conn.prepare(
2138 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2139 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2140 FROM session_outcomes WHERE session_id = ?1",
2141 )?;
2142 let row = stmt
2143 .query_row(params![session_id], outcome_row)
2144 .optional()?;
2145 Ok(row)
2146 }
2147
2148 pub fn list_session_outcomes_in_window(
2150 &self,
2151 workspace: &str,
2152 start_ms: u64,
2153 end_ms: u64,
2154 ) -> Result<Vec<SessionOutcomeRow>> {
2155 let mut stmt = self.conn.prepare(
2156 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2157 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2158 FROM session_outcomes o
2159 JOIN sessions s ON s.id = o.session_id
2160 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2161 ORDER BY o.measured_at_ms ASC",
2162 )?;
2163 let rows = stmt.query_map(
2164 params![workspace, start_ms as i64, end_ms as i64],
2165 outcome_row,
2166 )?;
2167 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2168 }
2169
2170 pub fn append_session_sample(
2171 &self,
2172 session_id: &str,
2173 ts_ms: u64,
2174 pid: u32,
2175 cpu_percent: Option<f64>,
2176 rss_bytes: Option<u64>,
2177 ) -> Result<()> {
2178 self.conn.execute(
2179 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2180 VALUES (?1, ?2, ?3, ?4, ?5)",
2181 params![
2182 session_id,
2183 ts_ms as i64,
2184 pid as i64,
2185 cpu_percent,
2186 rss_bytes.map(|b| b as i64)
2187 ],
2188 )?;
2189 Ok(())
2190 }
2191
2192 pub fn list_session_sample_aggs_in_window(
2194 &self,
2195 workspace: &str,
2196 start_ms: u64,
2197 end_ms: u64,
2198 ) -> Result<Vec<SessionSampleAgg>> {
2199 let mut stmt = self.conn.prepare(
2200 "SELECT ss.session_id, COUNT(*) AS n,
2201 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2202 FROM session_samples ss
2203 JOIN sessions s ON s.id = ss.session_id
2204 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2205 GROUP BY ss.session_id",
2206 )?;
2207 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2208 let sid: String = r.get(0)?;
2209 let n: i64 = r.get(1)?;
2210 let max_cpu: Option<f64> = r.get(2)?;
2211 let max_rss: Option<i64> = r.get(3)?;
2212 Ok(SessionSampleAgg {
2213 session_id: sid,
2214 sample_count: n as u64,
2215 max_cpu_percent: max_cpu.unwrap_or(0.0),
2216 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2217 })
2218 })?;
2219 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2220 }
2221
2222 pub fn list_sessions_for_eval(
2223 &self,
2224 since_ms: u64,
2225 min_cost_usd: f64,
2226 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2227 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2228 let mut stmt = self.conn.prepare(
2229 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2230 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2231 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2232 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2233 FROM sessions s
2234 WHERE s.started_at_ms >= ?1
2235 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2236 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2237 ORDER BY s.started_at_ms DESC",
2238 )?;
2239 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2240 Ok((
2241 r.get::<_, String>(0)?,
2242 r.get::<_, String>(1)?,
2243 r.get::<_, Option<String>>(2)?,
2244 r.get::<_, String>(3)?,
2245 r.get::<_, i64>(4)?,
2246 r.get::<_, Option<i64>>(5)?,
2247 r.get::<_, String>(6)?,
2248 r.get::<_, String>(7)?,
2249 r.get::<_, Option<String>>(8)?,
2250 r.get::<_, Option<String>>(9)?,
2251 r.get::<_, Option<String>>(10)?,
2252 r.get::<_, Option<i64>>(11)?,
2253 r.get::<_, Option<i64>>(12)?,
2254 r.get::<_, Option<String>>(13)?,
2255 r.get::<_, Option<String>>(14)?,
2256 r.get::<_, Option<String>>(15)?,
2257 r.get::<_, Option<String>>(16)?,
2258 r.get::<_, Option<String>>(17)?,
2259 r.get::<_, Option<String>>(18)?,
2260 r.get::<_, Option<i64>>(19)?,
2261 r.get::<_, Option<i64>>(20)?,
2262 ))
2263 })?;
2264 let mut out = Vec::new();
2265 for row in rows {
2266 let (
2267 id,
2268 agent,
2269 model,
2270 workspace,
2271 started,
2272 ended,
2273 status_str,
2274 trace,
2275 start_commit,
2276 end_commit,
2277 branch,
2278 dirty_start,
2279 dirty_end,
2280 source,
2281 prompt_fingerprint,
2282 parent_session_id,
2283 agent_version,
2284 os,
2285 arch,
2286 repo_file_count,
2287 repo_total_loc,
2288 ) = row?;
2289 out.push(crate::core::event::SessionRecord {
2290 id,
2291 agent,
2292 model,
2293 workspace,
2294 started_at_ms: started as u64,
2295 ended_at_ms: ended.map(|v| v as u64),
2296 status: status_from_str(&status_str),
2297 trace_path: trace,
2298 start_commit,
2299 end_commit,
2300 branch,
2301 dirty_start: dirty_start.map(i64_to_bool),
2302 dirty_end: dirty_end.map(i64_to_bool),
2303 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2304 prompt_fingerprint,
2305 parent_session_id,
2306 agent_version,
2307 os,
2308 arch,
2309 repo_file_count: repo_file_count.map(|v| v as u32),
2310 repo_total_loc: repo_total_loc.map(|v| v as u64),
2311 });
2312 }
2313 Ok(out)
2314 }
2315
2316 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2317 self.conn.execute(
2318 "INSERT OR IGNORE INTO prompt_snapshots
2319 (fingerprint, captured_at_ms, files_json, total_bytes)
2320 VALUES (?1, ?2, ?3, ?4)",
2321 params![
2322 snap.fingerprint,
2323 snap.captured_at_ms as i64,
2324 snap.files_json,
2325 snap.total_bytes as i64
2326 ],
2327 )?;
2328 Ok(())
2329 }
2330
2331 pub fn get_prompt_snapshot(
2332 &self,
2333 fingerprint: &str,
2334 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2335 self.conn
2336 .query_row(
2337 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2338 FROM prompt_snapshots WHERE fingerprint = ?1",
2339 params![fingerprint],
2340 |r| {
2341 Ok(crate::prompt::PromptSnapshot {
2342 fingerprint: r.get(0)?,
2343 captured_at_ms: r.get::<_, i64>(1)? as u64,
2344 files_json: r.get(2)?,
2345 total_bytes: r.get::<_, i64>(3)? as u64,
2346 })
2347 },
2348 )
2349 .optional()
2350 .map_err(Into::into)
2351 }
2352
2353 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2354 let mut stmt = self.conn.prepare(
2355 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2356 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2357 )?;
2358 let rows = stmt.query_map([], |r| {
2359 Ok(crate::prompt::PromptSnapshot {
2360 fingerprint: r.get(0)?,
2361 captured_at_ms: r.get::<_, i64>(1)? as u64,
2362 files_json: r.get(2)?,
2363 total_bytes: r.get::<_, i64>(3)? as u64,
2364 })
2365 })?;
2366 Ok(rows.filter_map(|r| r.ok()).collect())
2367 }
2368
2369 pub fn sessions_with_prompt_fingerprint(
2371 &self,
2372 workspace: &str,
2373 start_ms: u64,
2374 end_ms: u64,
2375 ) -> Result<Vec<(String, String)>> {
2376 let mut stmt = self.conn.prepare(
2377 "SELECT id, prompt_fingerprint FROM sessions
2378 WHERE workspace = ?1
2379 AND started_at_ms >= ?2 AND started_at_ms < ?3
2380 AND prompt_fingerprint IS NOT NULL",
2381 )?;
2382 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2383 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2384 })?;
2385 Ok(rows.filter_map(|r| r.ok()).collect())
2386 }
2387}
2388
2389fn now_ms() -> u64 {
2390 std::time::SystemTime::now()
2391 .duration_since(std::time::UNIX_EPOCH)
2392 .unwrap_or_default()
2393 .as_millis() as u64
2394}
2395
2396fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2397 raw.and_then(|s| s.trim().parse::<u64>().ok())
2398 .unwrap_or(DEFAULT_MMAP_MB)
2399 .saturating_mul(1024)
2400 .saturating_mul(1024)
2401 .min(i64::MAX as u64) as i64
2402}
2403
2404fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2405 let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2406 conn.execute_batch(&format!(
2407 "
2408 PRAGMA journal_mode=WAL;
2409 PRAGMA busy_timeout=5000;
2410 PRAGMA synchronous=NORMAL;
2411 PRAGMA cache_size=-65536;
2412 PRAGMA mmap_size={mmap_size};
2413 PRAGMA temp_store=MEMORY;
2414 PRAGMA wal_autocheckpoint=1000;
2415 "
2416 ))?;
2417 if mode == StoreOpenMode::ReadOnlyQuery {
2418 conn.execute_batch("PRAGMA query_only=ON;")?;
2419 }
2420 Ok(())
2421}
2422
2423fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2424 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2425}
2426
2427fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2428 let status_str: String = row.get(6)?;
2429 Ok(SessionRecord {
2430 id: row.get(0)?,
2431 agent: row.get(1)?,
2432 model: row.get(2)?,
2433 workspace: row.get(3)?,
2434 started_at_ms: row.get::<_, i64>(4)? as u64,
2435 ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2436 status: status_from_str(&status_str),
2437 trace_path: row.get(7)?,
2438 start_commit: row.get(8)?,
2439 end_commit: row.get(9)?,
2440 branch: row.get(10)?,
2441 dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2442 dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2443 repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2444 prompt_fingerprint: row.get(14)?,
2445 parent_session_id: row.get(15)?,
2446 agent_version: row.get(16)?,
2447 os: row.get(17)?,
2448 arch: row.get(18)?,
2449 repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2450 repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2451 })
2452}
2453
2454fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2455 let payload_str: String = row.get(12)?;
2456 Ok(Event {
2457 session_id: row.get(0)?,
2458 seq: row.get::<_, i64>(1)? as u64,
2459 ts_ms: row.get::<_, i64>(2)? as u64,
2460 ts_exact: row.get::<_, i64>(3)? != 0,
2461 kind: kind_from_str(&row.get::<_, String>(4)?),
2462 source: source_from_str(&row.get::<_, String>(5)?),
2463 tool: row.get(6)?,
2464 tool_call_id: row.get(7)?,
2465 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2466 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2467 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2468 cost_usd_e6: row.get(11)?,
2469 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2470 stop_reason: row.get(13)?,
2471 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2472 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2473 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2474 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2475 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2476 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2477 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2478 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2479 })
2480}
2481
2482fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2483 let mut clauses = vec!["workspace = ?".to_string()];
2484 let mut args = vec![Value::Text(workspace.to_string())];
2485 if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2486 clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2487 args.push(Value::Text(format!("{}%", escape_like(prefix))));
2488 }
2489 if let Some(status) = &filter.status {
2490 clauses.push("status = ?".to_string());
2491 args.push(Value::Text(format!("{status:?}")));
2492 }
2493 if let Some(since_ms) = filter.since_ms {
2494 clauses.push("started_at_ms >= ?".to_string());
2495 args.push(Value::Integer(since_ms as i64));
2496 }
2497 (format!("WHERE {}", clauses.join(" AND ")), args)
2498}
2499
2500fn escape_like(raw: &str) -> String {
2501 raw.to_lowercase()
2502 .replace('\\', "\\\\")
2503 .replace('%', "\\%")
2504 .replace('_', "\\_")
2505}
2506
2507fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2508 let cost: i64 = conn.query_row(
2509 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2510 params![workspace], |r| r.get(0),
2511 )?;
2512 let with_cost: i64 = conn.query_row(
2513 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2514 params![workspace], |r| r.get(0),
2515 )?;
2516 Ok((cost, with_cost as u64))
2517}
2518
2519fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2520 let build_raw: Option<i64> = r.get(4)?;
2521 let ci_raw: Option<i64> = r.get(8)?;
2522 Ok(SessionOutcomeRow {
2523 session_id: r.get(0)?,
2524 test_passed: r.get(1)?,
2525 test_failed: r.get(2)?,
2526 test_skipped: r.get(3)?,
2527 build_ok: build_raw.map(|v| v != 0),
2528 lint_errors: r.get(5)?,
2529 revert_lines_14d: r.get(6)?,
2530 pr_open: r.get(7)?,
2531 ci_ok: ci_raw.map(|v| v != 0),
2532 measured_at_ms: r.get::<_, i64>(9)? as u64,
2533 measure_error: r.get(10)?,
2534 })
2535}
2536
2537fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2538 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2539 let score = r
2540 .get::<_, Option<i64>>(2)?
2541 .and_then(|v| FeedbackScore::new(v as u8));
2542 let label = r
2543 .get::<_, Option<String>>(3)?
2544 .and_then(|s| FeedbackLabel::from_str_opt(&s));
2545 Ok(FeedbackRecord {
2546 id: r.get(0)?,
2547 session_id: r.get(1)?,
2548 score,
2549 label,
2550 note: r.get(4)?,
2551 created_at_ms: r.get::<_, i64>(5)? as u64,
2552 })
2553}
2554
2555fn day_label(day_idx: u64) -> &'static str {
2556 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2557}
2558
2559fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2560 let week_ago = now.saturating_sub(7 * 86_400_000);
2561 let mut stmt = conn
2562 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2563 let days: Vec<u64> = stmt
2564 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2565 .filter_map(|r| r.ok())
2566 .map(|v| v as u64 / 86_400_000)
2567 .collect();
2568 let today = now / 86_400_000;
2569 Ok((0u64..7)
2570 .map(|i| {
2571 let d = today.saturating_sub(6 - i);
2572 (
2573 day_label(d).to_string(),
2574 days.iter().filter(|&&x| x == d).count() as u64,
2575 )
2576 })
2577 .collect())
2578}
2579
2580fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2581 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2582 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2583 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2584 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2585 COUNT(e.id) FROM sessions s \
2586 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2587 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2588 let mut stmt = conn.prepare(sql)?;
2589 let out: Vec<(SessionRecord, u64)> = stmt
2590 .query_map(params![workspace], |r| {
2591 let st: String = r.get(6)?;
2592 Ok((
2593 SessionRecord {
2594 id: r.get(0)?,
2595 agent: r.get(1)?,
2596 model: r.get(2)?,
2597 workspace: r.get(3)?,
2598 started_at_ms: r.get::<_, i64>(4)? as u64,
2599 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2600 status: status_from_str(&st),
2601 trace_path: r.get(7)?,
2602 start_commit: r.get(8)?,
2603 end_commit: r.get(9)?,
2604 branch: r.get(10)?,
2605 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2606 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2607 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2608 prompt_fingerprint: r.get(14)?,
2609 parent_session_id: r.get(15)?,
2610 agent_version: r.get(16)?,
2611 os: r.get(17)?,
2612 arch: r.get(18)?,
2613 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2614 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2615 },
2616 r.get::<_, i64>(21)? as u64,
2617 ))
2618 })?
2619 .filter_map(|r| r.ok())
2620 .collect();
2621 Ok(out)
2622}
2623
2624fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2625 let mut stmt = conn.prepare(
2626 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2627 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2628 )?;
2629 let out: Vec<(String, u64)> = stmt
2630 .query_map(params![workspace], |r| {
2631 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2632 })?
2633 .filter_map(|r| r.ok())
2634 .collect();
2635 Ok(out)
2636}
2637
2638fn status_from_str(s: &str) -> SessionStatus {
2639 match s {
2640 "Running" => SessionStatus::Running,
2641 "Waiting" => SessionStatus::Waiting,
2642 "Idle" => SessionStatus::Idle,
2643 _ => SessionStatus::Done,
2644 }
2645}
2646
2647fn kind_from_str(s: &str) -> EventKind {
2648 match s {
2649 "ToolCall" => EventKind::ToolCall,
2650 "ToolResult" => EventKind::ToolResult,
2651 "Message" => EventKind::Message,
2652 "Error" => EventKind::Error,
2653 "Cost" => EventKind::Cost,
2654 "Hook" => EventKind::Hook,
2655 "Lifecycle" => EventKind::Lifecycle,
2656 _ => EventKind::Hook,
2657 }
2658}
2659
2660fn source_from_str(s: &str) -> EventSource {
2661 match s {
2662 "Tail" => EventSource::Tail,
2663 "Hook" => EventSource::Hook,
2664 _ => EventSource::Proxy,
2665 }
2666}
2667
2668fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2669 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2670 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2671 ensure_column(conn, "sessions", "branch", "TEXT")?;
2672 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2673 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2674 ensure_column(
2675 conn,
2676 "sessions",
2677 "repo_binding_source",
2678 "TEXT NOT NULL DEFAULT ''",
2679 )?;
2680 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2681 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2682 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2683 ensure_column(conn, "events", "stop_reason", "TEXT")?;
2684 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2685 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2686 ensure_column(conn, "events", "retry_count", "INTEGER")?;
2687 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2688 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2689 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2690 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2691 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2692 ensure_column(
2693 conn,
2694 "sync_outbox",
2695 "kind",
2696 "TEXT NOT NULL DEFAULT 'events'",
2697 )?;
2698 ensure_column(
2699 conn,
2700 "experiments",
2701 "state",
2702 "TEXT NOT NULL DEFAULT 'Draft'",
2703 )?;
2704 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2705 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2706 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2707 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2708 ensure_column(conn, "sessions", "os", "TEXT")?;
2709 ensure_column(conn, "sessions", "arch", "TEXT")?;
2710 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2711 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2712 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2713 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2714 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2715 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2716 conn.execute_batch(
2717 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2718 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2719 )?;
2720 Ok(())
2721}
2722
2723fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2724 if has_column(conn, table, column)? {
2725 return Ok(());
2726 }
2727 conn.execute(
2728 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2729 [],
2730 )?;
2731 Ok(())
2732}
2733
2734fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2735 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2736 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2737 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2738}
2739
2740fn bool_to_i64(v: bool) -> i64 {
2741 if v { 1 } else { 0 }
2742}
2743
2744fn i64_to_bool(v: i64) -> bool {
2745 v != 0
2746}
2747
2748fn empty_to_none(s: String) -> Option<String> {
2749 if s.is_empty() { None } else { Some(s) }
2750}
2751
2752#[cfg(test)]
2753mod tests {
2754 use super::*;
2755 use serde_json::json;
2756 use std::collections::HashSet;
2757 use tempfile::TempDir;
2758
2759 fn make_session(id: &str) -> SessionRecord {
2760 SessionRecord {
2761 id: id.to_string(),
2762 agent: "cursor".to_string(),
2763 model: None,
2764 workspace: "/ws".to_string(),
2765 started_at_ms: 1000,
2766 ended_at_ms: None,
2767 status: SessionStatus::Done,
2768 trace_path: "/trace".to_string(),
2769 start_commit: None,
2770 end_commit: None,
2771 branch: None,
2772 dirty_start: None,
2773 dirty_end: None,
2774 repo_binding_source: None,
2775 prompt_fingerprint: None,
2776 parent_session_id: None,
2777 agent_version: None,
2778 os: None,
2779 arch: None,
2780 repo_file_count: None,
2781 repo_total_loc: None,
2782 }
2783 }
2784
2785 fn make_event(session_id: &str, seq: u64) -> Event {
2786 Event {
2787 session_id: session_id.to_string(),
2788 seq,
2789 ts_ms: 1000 + seq * 100,
2790 ts_exact: false,
2791 kind: EventKind::ToolCall,
2792 source: EventSource::Tail,
2793 tool: Some("read_file".to_string()),
2794 tool_call_id: Some(format!("call_{seq}")),
2795 tokens_in: None,
2796 tokens_out: None,
2797 reasoning_tokens: None,
2798 cost_usd_e6: None,
2799 stop_reason: None,
2800 latency_ms: None,
2801 ttft_ms: None,
2802 retry_count: None,
2803 context_used_tokens: None,
2804 context_max_tokens: None,
2805 cache_creation_tokens: None,
2806 cache_read_tokens: None,
2807 system_prompt_tokens: None,
2808 payload: json!({}),
2809 }
2810 }
2811
2812 #[test]
2813 fn open_and_wal_mode() {
2814 let dir = TempDir::new().unwrap();
2815 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2816 let mode: String = store
2817 .conn
2818 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2819 .unwrap();
2820 assert_eq!(mode, "wal");
2821 }
2822
2823 #[test]
2824 fn open_applies_phase0_pragmas() {
2825 let dir = TempDir::new().unwrap();
2826 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2827 let synchronous: i64 = store
2828 .conn
2829 .query_row("PRAGMA synchronous", [], |r| r.get(0))
2830 .unwrap();
2831 let cache_size: i64 = store
2832 .conn
2833 .query_row("PRAGMA cache_size", [], |r| r.get(0))
2834 .unwrap();
2835 let temp_store: i64 = store
2836 .conn
2837 .query_row("PRAGMA temp_store", [], |r| r.get(0))
2838 .unwrap();
2839 let wal_autocheckpoint: i64 = store
2840 .conn
2841 .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
2842 .unwrap();
2843 assert_eq!(synchronous, 1);
2844 assert_eq!(cache_size, -65_536);
2845 assert_eq!(temp_store, 2);
2846 assert_eq!(wal_autocheckpoint, 1_000);
2847 assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
2848 }
2849
2850 #[test]
2851 fn read_only_open_sets_query_only() {
2852 let dir = TempDir::new().unwrap();
2853 let db = dir.path().join("kaizen.db");
2854 Store::open(&db).unwrap();
2855 let store = Store::open_read_only(&db).unwrap();
2856 let query_only: i64 = store
2857 .conn
2858 .query_row("PRAGMA query_only", [], |r| r.get(0))
2859 .unwrap();
2860 assert_eq!(query_only, 1);
2861 }
2862
2863 #[test]
2864 fn phase0_indexes_exist() {
2865 let dir = TempDir::new().unwrap();
2866 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2867 for name in [
2868 "tool_spans_session_idx",
2869 "tool_spans_started_idx",
2870 "session_samples_ts_idx",
2871 "events_ts_idx",
2872 "feedback_session_idx",
2873 ] {
2874 let found: i64 = store
2875 .conn
2876 .query_row(
2877 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
2878 params![name],
2879 |r| r.get(0),
2880 )
2881 .unwrap();
2882 assert_eq!(found, 1, "{name}");
2883 }
2884 }
2885
2886 #[test]
2887 fn upsert_and_get_session() {
2888 let dir = TempDir::new().unwrap();
2889 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2890 let s = make_session("s1");
2891 store.upsert_session(&s).unwrap();
2892
2893 let got = store.get_session("s1").unwrap().unwrap();
2894 assert_eq!(got.id, "s1");
2895 assert_eq!(got.status, SessionStatus::Done);
2896 }
2897
2898 #[test]
2899 fn append_and_list_events_round_trip() {
2900 let dir = TempDir::new().unwrap();
2901 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2902 let s = make_session("s2");
2903 store.upsert_session(&s).unwrap();
2904 store.append_event(&make_event("s2", 0)).unwrap();
2905 store.append_event(&make_event("s2", 1)).unwrap();
2906
2907 let sessions = store.list_sessions("/ws").unwrap();
2908 assert_eq!(sessions.len(), 1);
2909 assert_eq!(sessions[0].id, "s2");
2910 }
2911
2912 #[test]
2913 fn list_sessions_page_orders_and_counts() {
2914 let dir = TempDir::new().unwrap();
2915 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2916 let mut a = make_session("a");
2917 a.started_at_ms = 2_000;
2918 let mut b = make_session("b");
2919 b.started_at_ms = 2_000;
2920 let mut c = make_session("c");
2921 c.started_at_ms = 1_000;
2922 store.upsert_session(&c).unwrap();
2923 store.upsert_session(&b).unwrap();
2924 store.upsert_session(&a).unwrap();
2925
2926 let page = store
2927 .list_sessions_page("/ws", 0, 2, SessionFilter::default())
2928 .unwrap();
2929 assert_eq!(page.total, 3);
2930 assert_eq!(page.next_offset, Some(2));
2931 assert_eq!(
2932 page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
2933 vec!["a", "b"]
2934 );
2935
2936 let all = store.list_sessions("/ws").unwrap();
2937 assert_eq!(
2938 all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
2939 vec!["a", "b", "c"]
2940 );
2941 }
2942
2943 #[test]
2944 fn list_sessions_page_filters_in_sql_shape() {
2945 let dir = TempDir::new().unwrap();
2946 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2947 let mut cursor = make_session("cursor");
2948 cursor.agent = "Cursor".into();
2949 cursor.started_at_ms = 2_000;
2950 cursor.status = SessionStatus::Running;
2951 let mut claude = make_session("claude");
2952 claude.agent = "claude".into();
2953 claude.started_at_ms = 3_000;
2954 store.upsert_session(&cursor).unwrap();
2955 store.upsert_session(&claude).unwrap();
2956
2957 let page = store
2958 .list_sessions_page(
2959 "/ws",
2960 0,
2961 10,
2962 SessionFilter {
2963 agent_prefix: Some("cur".into()),
2964 status: Some(SessionStatus::Running),
2965 since_ms: Some(1_500),
2966 },
2967 )
2968 .unwrap();
2969 assert_eq!(page.total, 1);
2970 assert_eq!(page.rows[0].id, "cursor");
2971 }
2972
2973 #[test]
2974 fn incremental_session_helpers_find_new_rows_and_statuses() {
2975 let dir = TempDir::new().unwrap();
2976 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2977 let mut old = make_session("old");
2978 old.started_at_ms = 1_000;
2979 let mut new = make_session("new");
2980 new.started_at_ms = 2_000;
2981 new.status = SessionStatus::Running;
2982 store.upsert_session(&old).unwrap();
2983 store.upsert_session(&new).unwrap();
2984
2985 let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
2986 assert_eq!(rows.len(), 1);
2987 assert_eq!(rows[0].id, "new");
2988
2989 store
2990 .update_session_status("new", SessionStatus::Done)
2991 .unwrap();
2992 let statuses = store.session_statuses(&["new".to_string()]).unwrap();
2993 assert_eq!(statuses.len(), 1);
2994 assert_eq!(statuses[0].status, SessionStatus::Done);
2995 }
2996
2997 #[test]
2998 fn summary_stats_empty() {
2999 let dir = TempDir::new().unwrap();
3000 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3001 let stats = store.summary_stats("/ws").unwrap();
3002 assert_eq!(stats.session_count, 0);
3003 assert_eq!(stats.total_cost_usd_e6, 0);
3004 }
3005
3006 #[test]
3007 fn summary_stats_counts_sessions() {
3008 let dir = TempDir::new().unwrap();
3009 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3010 store.upsert_session(&make_session("a")).unwrap();
3011 store.upsert_session(&make_session("b")).unwrap();
3012 let stats = store.summary_stats("/ws").unwrap();
3013 assert_eq!(stats.session_count, 2);
3014 assert_eq!(stats.by_agent.len(), 1);
3015 assert_eq!(stats.by_agent[0].0, "cursor");
3016 assert_eq!(stats.by_agent[0].1, 2);
3017 }
3018
3019 #[test]
3020 fn list_events_for_session_round_trip() {
3021 let dir = TempDir::new().unwrap();
3022 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3023 store.upsert_session(&make_session("s4")).unwrap();
3024 store.append_event(&make_event("s4", 0)).unwrap();
3025 store.append_event(&make_event("s4", 1)).unwrap();
3026 let events = store.list_events_for_session("s4").unwrap();
3027 assert_eq!(events.len(), 2);
3028 assert_eq!(events[0].seq, 0);
3029 assert_eq!(events[1].seq, 1);
3030 }
3031
3032 #[test]
3033 fn list_events_page_uses_inclusive_seq_cursor() {
3034 let dir = TempDir::new().unwrap();
3035 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3036 store.upsert_session(&make_session("paged")).unwrap();
3037 for seq in 0..5 {
3038 store.append_event(&make_event("paged", seq)).unwrap();
3039 }
3040 let first = store.list_events_page("paged", 0, 2).unwrap();
3041 assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3042 let second = store
3043 .list_events_page("paged", first[1].seq + 1, 2)
3044 .unwrap();
3045 assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3046 }
3047
3048 #[test]
3049 fn append_event_dedup() {
3050 let dir = TempDir::new().unwrap();
3051 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3052 store.upsert_session(&make_session("s5")).unwrap();
3053 store.append_event(&make_event("s5", 0)).unwrap();
3054 store.append_event(&make_event("s5", 0)).unwrap();
3056 let events = store.list_events_for_session("s5").unwrap();
3057 assert_eq!(events.len(), 1);
3058 }
3059
3060 #[test]
3061 fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3062 let dir = TempDir::new().unwrap();
3063 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3064 assert!(store.session_span_tree("missing").unwrap().is_empty());
3065 assert!(store.span_tree_cache.borrow().is_some());
3066
3067 store.upsert_session(&make_session("tree")).unwrap();
3068 store.append_event(&make_event("tree", 0)).unwrap();
3069 assert!(store.span_tree_cache.borrow().is_none());
3070 let first = store.session_span_tree("tree").unwrap();
3071 assert_eq!(first.len(), 1);
3072 assert!(store.span_tree_cache.borrow().is_some());
3073 store.append_event(&make_event("tree", 1)).unwrap();
3074 assert!(store.span_tree_cache.borrow().is_none());
3075 }
3076
3077 #[test]
3078 fn tool_spans_in_window_uses_started_then_ended_fallback() {
3079 let dir = TempDir::new().unwrap();
3080 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3081 store.upsert_session(&make_session("spans")).unwrap();
3082 for (id, started, ended) in [
3083 ("started", Some(200_i64), None),
3084 ("fallback", None, Some(250_i64)),
3085 ("outside", Some(400_i64), None),
3086 ("too_old", None, Some(50_i64)),
3087 ("started_wins", Some(500_i64), Some(200_i64)),
3088 ] {
3089 store
3090 .conn
3091 .execute(
3092 "INSERT INTO tool_spans
3093 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3094 VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3095 params![id, started, ended],
3096 )
3097 .unwrap();
3098 }
3099 let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3100 let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3101 assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3102 }
3103
3104 #[test]
3105 fn upsert_idempotent() {
3106 let dir = TempDir::new().unwrap();
3107 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3108 let mut s = make_session("s3");
3109 store.upsert_session(&s).unwrap();
3110 s.status = SessionStatus::Running;
3111 store.upsert_session(&s).unwrap();
3112
3113 let got = store.get_session("s3").unwrap().unwrap();
3114 assert_eq!(got.status, SessionStatus::Running);
3115 }
3116
3117 #[test]
3118 fn append_event_indexes_path_from_payload() {
3119 let dir = TempDir::new().unwrap();
3120 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3121 store.upsert_session(&make_session("sx")).unwrap();
3122 let mut ev = make_event("sx", 0);
3123 ev.payload = json!({"input": {"path": "src/lib.rs"}});
3124 store.append_event(&ev).unwrap();
3125 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3126 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3127 }
3128
3129 #[test]
3130 fn update_session_status_changes_status() {
3131 let dir = TempDir::new().unwrap();
3132 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3133 store.upsert_session(&make_session("s6")).unwrap();
3134 store
3135 .update_session_status("s6", SessionStatus::Running)
3136 .unwrap();
3137 let got = store.get_session("s6").unwrap().unwrap();
3138 assert_eq!(got.status, SessionStatus::Running);
3139 }
3140
3141 #[test]
3142 fn prune_sessions_removes_old_rows_and_keeps_recent() {
3143 let dir = TempDir::new().unwrap();
3144 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3145 let mut old = make_session("old");
3146 old.started_at_ms = 1_000;
3147 let mut new = make_session("new");
3148 new.started_at_ms = 9_000_000_000_000;
3149 store.upsert_session(&old).unwrap();
3150 store.upsert_session(&new).unwrap();
3151 store.append_event(&make_event("old", 0)).unwrap();
3152
3153 let stats = store.prune_sessions_started_before(5_000).unwrap();
3154 assert_eq!(
3155 stats,
3156 PruneStats {
3157 sessions_removed: 1,
3158 events_removed: 1,
3159 }
3160 );
3161 assert!(store.get_session("old").unwrap().is_none());
3162 assert!(store.get_session("new").unwrap().is_some());
3163 let sessions = store.list_sessions("/ws").unwrap();
3164 assert_eq!(sessions.len(), 1);
3165 assert_eq!(sessions[0].id, "new");
3166 }
3167
3168 #[test]
3169 fn append_event_indexes_rules_from_payload() {
3170 let dir = TempDir::new().unwrap();
3171 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3172 store.upsert_session(&make_session("sr")).unwrap();
3173 let mut ev = make_event("sr", 0);
3174 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3175 store.append_event(&ev).unwrap();
3176 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3177 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3178 }
3179
3180 #[test]
3181 fn guidance_report_counts_skill_and_rule_sessions() {
3182 let dir = TempDir::new().unwrap();
3183 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3184 store.upsert_session(&make_session("sx")).unwrap();
3185 let mut ev = make_event("sx", 0);
3186 ev.payload =
3187 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3188 ev.cost_usd_e6 = Some(500_000);
3189 store.append_event(&ev).unwrap();
3190
3191 let mut skill_slugs = HashSet::new();
3192 skill_slugs.insert("tdd".into());
3193 let mut rule_slugs = HashSet::new();
3194 rule_slugs.insert("style".into());
3195
3196 let rep = store
3197 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3198 .unwrap();
3199 assert_eq!(rep.sessions_in_window, 1);
3200 let tdd = rep
3201 .rows
3202 .iter()
3203 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3204 .unwrap();
3205 assert_eq!(tdd.sessions, 1);
3206 assert!(tdd.on_disk);
3207 let style = rep
3208 .rows
3209 .iter()
3210 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3211 .unwrap();
3212 assert_eq!(style.sessions, 1);
3213 assert!(style.on_disk);
3214 }
3215
3216 #[test]
3217 fn prune_sessions_removes_rules_used_rows() {
3218 let dir = TempDir::new().unwrap();
3219 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3220 let mut old = make_session("old_r");
3221 old.started_at_ms = 1_000;
3222 store.upsert_session(&old).unwrap();
3223 let mut ev = make_event("old_r", 0);
3224 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3225 store.append_event(&ev).unwrap();
3226
3227 store.prune_sessions_started_before(5_000).unwrap();
3228 let n: i64 = store
3229 .conn
3230 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3231 .unwrap();
3232 assert_eq!(n, 0);
3233 }
3234}