1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
9use crate::store::tool_span_index::{
10 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
11};
12use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
13use crate::sync::context::SyncIngestContext;
14use crate::sync::outbound::outbound_event_from_row;
15use crate::sync::redact::redact_payload;
16use crate::sync::smart::enqueue_tool_spans_for_session;
17use anyhow::{Context, Result};
18use rusqlite::types::Value;
19use rusqlite::{
20 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
21};
22use std::cell::RefCell;
23use std::collections::{HashMap, HashSet};
24use std::path::{Path, PathBuf};
25
26const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
29const DEFAULT_MMAP_MB: u64 = 256;
30const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
31 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
32 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
33 repo_file_count, repo_total_loc FROM sessions";
34
35const MIGRATIONS: &[&str] = &[
36 "CREATE TABLE IF NOT EXISTS sessions (
37 id TEXT PRIMARY KEY,
38 agent TEXT NOT NULL,
39 model TEXT,
40 workspace TEXT NOT NULL,
41 started_at_ms INTEGER NOT NULL,
42 ended_at_ms INTEGER,
43 status TEXT NOT NULL,
44 trace_path TEXT NOT NULL
45 )",
46 "CREATE TABLE IF NOT EXISTS events (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 session_id TEXT NOT NULL,
49 seq INTEGER NOT NULL,
50 ts_ms INTEGER NOT NULL,
51 kind TEXT NOT NULL,
52 source TEXT NOT NULL,
53 tool TEXT,
54 tokens_in INTEGER,
55 tokens_out INTEGER,
56 cost_usd_e6 INTEGER,
57 payload TEXT NOT NULL
58 )",
59 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
60 "CREATE TABLE IF NOT EXISTS files_touched (
61 id INTEGER PRIMARY KEY AUTOINCREMENT,
62 session_id TEXT NOT NULL,
63 path TEXT NOT NULL
64 )",
65 "CREATE TABLE IF NOT EXISTS skills_used (
66 id INTEGER PRIMARY KEY AUTOINCREMENT,
67 session_id TEXT NOT NULL,
68 skill TEXT NOT NULL
69 )",
70 "CREATE TABLE IF NOT EXISTS sync_outbox (
71 id INTEGER PRIMARY KEY AUTOINCREMENT,
72 session_id TEXT NOT NULL,
73 payload TEXT NOT NULL,
74 sent INTEGER NOT NULL DEFAULT 0
75 )",
76 "CREATE TABLE IF NOT EXISTS experiments (
77 id TEXT PRIMARY KEY,
78 name TEXT NOT NULL,
79 created_at_ms INTEGER NOT NULL,
80 metadata TEXT NOT NULL DEFAULT '{}'
81 )",
82 "CREATE TABLE IF NOT EXISTS experiment_tags (
83 experiment_id TEXT NOT NULL,
84 session_id TEXT NOT NULL,
85 variant TEXT NOT NULL,
86 PRIMARY KEY (experiment_id, session_id)
87 )",
88 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
89 "CREATE TABLE IF NOT EXISTS sync_state (
90 k TEXT PRIMARY KEY,
91 v TEXT NOT NULL
92 )",
93 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
94 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
95 "CREATE TABLE IF NOT EXISTS tool_spans (
96 span_id TEXT PRIMARY KEY,
97 session_id TEXT NOT NULL,
98 tool TEXT,
99 tool_call_id TEXT,
100 status TEXT NOT NULL,
101 started_at_ms INTEGER,
102 ended_at_ms INTEGER,
103 lead_time_ms INTEGER,
104 tokens_in INTEGER,
105 tokens_out INTEGER,
106 reasoning_tokens INTEGER,
107 cost_usd_e6 INTEGER,
108 paths_json TEXT NOT NULL DEFAULT '[]'
109 )",
110 "CREATE TABLE IF NOT EXISTS tool_span_paths (
111 span_id TEXT NOT NULL,
112 path TEXT NOT NULL,
113 PRIMARY KEY (span_id, path)
114 )",
115 "CREATE TABLE IF NOT EXISTS session_repo_binding (
116 session_id TEXT PRIMARY KEY,
117 start_commit TEXT,
118 end_commit TEXT,
119 branch TEXT,
120 dirty_start INTEGER,
121 dirty_end INTEGER,
122 repo_binding_source TEXT NOT NULL DEFAULT ''
123 )",
124 "CREATE TABLE IF NOT EXISTS repo_snapshots (
125 id TEXT PRIMARY KEY,
126 workspace TEXT NOT NULL,
127 head_commit TEXT,
128 dirty_fingerprint TEXT NOT NULL,
129 analyzer_version TEXT NOT NULL,
130 indexed_at_ms INTEGER NOT NULL,
131 dirty INTEGER NOT NULL DEFAULT 0,
132 graph_path TEXT NOT NULL
133 )",
134 "CREATE TABLE IF NOT EXISTS file_facts (
135 snapshot_id TEXT NOT NULL,
136 path TEXT NOT NULL,
137 language TEXT NOT NULL,
138 bytes INTEGER NOT NULL,
139 loc INTEGER NOT NULL,
140 sloc INTEGER NOT NULL,
141 complexity_total INTEGER NOT NULL,
142 max_fn_complexity INTEGER NOT NULL,
143 symbol_count INTEGER NOT NULL,
144 import_count INTEGER NOT NULL,
145 fan_in INTEGER NOT NULL,
146 fan_out INTEGER NOT NULL,
147 churn_30d INTEGER NOT NULL,
148 churn_90d INTEGER NOT NULL,
149 authors_90d INTEGER NOT NULL,
150 last_changed_ms INTEGER,
151 PRIMARY KEY (snapshot_id, path)
152 )",
153 "CREATE TABLE IF NOT EXISTS repo_edges (
154 snapshot_id TEXT NOT NULL,
155 from_id TEXT NOT NULL,
156 to_id TEXT NOT NULL,
157 kind TEXT NOT NULL,
158 weight INTEGER NOT NULL,
159 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
160 )",
161 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
163 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
165 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
166 ON sessions(workspace, started_at_ms DESC, id ASC)",
167 "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
168 ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
169 "CREATE TABLE IF NOT EXISTS rules_used (
170 id INTEGER PRIMARY KEY AUTOINCREMENT,
171 session_id TEXT NOT NULL,
172 rule TEXT NOT NULL
173 )",
174 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
175 "CREATE TABLE IF NOT EXISTS remote_pull_state (
177 id INTEGER PRIMARY KEY CHECK (id = 1),
178 query_provider TEXT NOT NULL DEFAULT 'none',
179 cursor_json TEXT NOT NULL DEFAULT '',
180 last_success_ms INTEGER
181 )",
182 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
183 "CREATE TABLE IF NOT EXISTS remote_sessions (
184 team_id TEXT NOT NULL,
185 workspace_hash TEXT NOT NULL,
186 session_id_hash TEXT NOT NULL,
187 json TEXT NOT NULL,
188 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
189 )",
190 "CREATE TABLE IF NOT EXISTS remote_events (
191 team_id TEXT NOT NULL,
192 workspace_hash TEXT NOT NULL,
193 session_id_hash TEXT NOT NULL,
194 event_seq INTEGER NOT NULL,
195 json TEXT NOT NULL,
196 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
197 )",
198 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
199 team_id TEXT NOT NULL,
200 workspace_hash TEXT NOT NULL,
201 span_id_hash TEXT NOT NULL,
202 json TEXT NOT NULL,
203 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
204 )",
205 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
206 team_id TEXT NOT NULL,
207 workspace_hash TEXT NOT NULL,
208 snapshot_id_hash TEXT NOT NULL,
209 chunk_index INTEGER NOT NULL,
210 json TEXT NOT NULL,
211 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
212 )",
213 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
214 team_id TEXT NOT NULL,
215 workspace_hash TEXT NOT NULL,
216 fact_key TEXT NOT NULL,
217 json TEXT NOT NULL,
218 PRIMARY KEY (team_id, workspace_hash, fact_key)
219 )",
220 "CREATE TABLE IF NOT EXISTS session_evals (
221 id TEXT PRIMARY KEY,
222 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
223 judge_model TEXT NOT NULL,
224 rubric_id TEXT NOT NULL,
225 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
226 rationale TEXT NOT NULL,
227 flagged INTEGER NOT NULL DEFAULT 0,
228 created_at_ms INTEGER NOT NULL
229 );
230 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
231 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
232 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
233 fingerprint TEXT PRIMARY KEY,
234 captured_at_ms INTEGER NOT NULL,
235 files_json TEXT NOT NULL,
236 total_bytes INTEGER NOT NULL
237 )",
238 "CREATE TABLE IF NOT EXISTS session_feedback (
239 id TEXT PRIMARY KEY,
240 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
241 score INTEGER CHECK(score BETWEEN 1 AND 5),
242 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
243 note TEXT,
244 created_at_ms INTEGER NOT NULL
245 );
246 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
247 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
248 "CREATE TABLE IF NOT EXISTS session_outcomes (
249 session_id TEXT PRIMARY KEY NOT NULL,
250 test_passed INTEGER,
251 test_failed INTEGER,
252 test_skipped INTEGER,
253 build_ok INTEGER,
254 lint_errors INTEGER,
255 revert_lines_14d INTEGER,
256 pr_open INTEGER,
257 ci_ok INTEGER,
258 measured_at_ms INTEGER NOT NULL,
259 measure_error TEXT
260 )",
261 "CREATE TABLE IF NOT EXISTS session_samples (
262 session_id TEXT NOT NULL,
263 ts_ms INTEGER NOT NULL,
264 pid INTEGER NOT NULL,
265 cpu_percent REAL,
266 rss_bytes INTEGER,
267 PRIMARY KEY (session_id, ts_ms, pid)
268 )",
269 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
270 "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
271 "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
272 "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
273 "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
274 "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
275 "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
276 "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
277 "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
278 "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
279 "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
280];
281
282#[derive(Clone)]
284pub struct InsightsStats {
285 pub total_sessions: u64,
286 pub running_sessions: u64,
287 pub total_events: u64,
288 pub sessions_by_day: Vec<(String, u64)>,
290 pub recent: Vec<(SessionRecord, u64)>,
292 pub top_tools: Vec<(String, u64)>,
294 pub total_cost_usd_e6: i64,
295 pub sessions_with_cost: u64,
296}
297
298pub struct SyncStatusSnapshot {
300 pub pending_outbox: u64,
301 pub last_success_ms: Option<u64>,
302 pub last_error: Option<String>,
303 pub consecutive_failures: u32,
304}
305
306#[derive(serde::Serialize)]
308pub struct SummaryStats {
309 pub session_count: u64,
310 pub total_cost_usd_e6: i64,
311 pub by_agent: Vec<(String, u64)>,
312 pub by_model: Vec<(String, u64)>,
313 pub top_tools: Vec<(String, u64)>,
314}
315
316#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
318#[serde(rename_all = "lowercase")]
319pub enum GuidanceKind {
320 Skill,
321 Rule,
322}
323
324#[derive(Clone, Debug, serde::Serialize)]
326pub struct GuidancePerfRow {
327 pub kind: GuidanceKind,
328 pub id: String,
329 pub sessions: u64,
330 pub sessions_pct: f64,
331 pub total_cost_usd_e6: i64,
332 pub avg_cost_per_session_usd: Option<f64>,
333 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
334 pub on_disk: bool,
335}
336
337#[derive(Clone, Debug, serde::Serialize)]
339pub struct GuidanceReport {
340 pub workspace: String,
341 pub window_start_ms: u64,
342 pub window_end_ms: u64,
343 pub sessions_in_window: u64,
344 pub workspace_avg_cost_per_session_usd: Option<f64>,
345 pub rows: Vec<GuidancePerfRow>,
346}
347
348#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
350pub struct PruneStats {
351 pub sessions_removed: u64,
352 pub events_removed: u64,
353}
354
355#[derive(Debug, Clone, Eq, PartialEq)]
357pub struct SessionOutcomeRow {
358 pub session_id: String,
359 pub test_passed: Option<i64>,
360 pub test_failed: Option<i64>,
361 pub test_skipped: Option<i64>,
362 pub build_ok: Option<bool>,
363 pub lint_errors: Option<i64>,
364 pub revert_lines_14d: Option<i64>,
365 pub pr_open: Option<i64>,
366 pub ci_ok: Option<bool>,
367 pub measured_at_ms: u64,
368 pub measure_error: Option<String>,
369}
370
371#[derive(Debug, Clone)]
373pub struct SessionSampleAgg {
374 pub session_id: String,
375 pub sample_count: u64,
376 pub max_cpu_percent: f64,
377 pub max_rss_bytes: u64,
378}
379
380pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
382pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
383pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
384
385pub struct ToolSpanSyncRow {
386 pub span_id: String,
387 pub session_id: String,
388 pub tool: Option<String>,
389 pub tool_call_id: Option<String>,
390 pub status: String,
391 pub started_at_ms: Option<u64>,
392 pub ended_at_ms: Option<u64>,
393 pub lead_time_ms: Option<u64>,
394 pub tokens_in: Option<u32>,
395 pub tokens_out: Option<u32>,
396 pub reasoning_tokens: Option<u32>,
397 pub cost_usd_e6: Option<i64>,
398 pub paths: Vec<String>,
399}
400
401#[derive(Debug, Clone, Copy, Eq, PartialEq)]
402pub enum StoreOpenMode {
403 ReadWrite,
404 ReadOnlyQuery,
405}
406
407#[derive(Debug, Clone)]
408pub struct SessionStatusRow {
409 pub id: String,
410 pub status: SessionStatus,
411 pub ended_at_ms: Option<u64>,
412}
413
414#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
415pub struct SessionFilter {
416 pub agent_prefix: Option<String>,
417 pub status: Option<SessionStatus>,
418 pub since_ms: Option<u64>,
419}
420
421#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
422pub struct SessionPage {
423 pub rows: Vec<SessionRecord>,
424 pub total: usize,
425 pub next_offset: Option<usize>,
426}
427
428#[derive(Clone)]
429struct SpanTreeCacheEntry {
430 session_id: String,
431 last_event_seq: Option<u64>,
432 nodes: Vec<crate::store::span_tree::SpanNode>,
433}
434
435pub struct Store {
436 conn: Connection,
437 root: PathBuf,
438 hot_log: RefCell<Option<HotLog>>,
439 search_writer: RefCell<Option<crate::search::PendingWriter>>,
440 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
441 projector: RefCell<Projector>,
442}
443
444impl Store {
445 pub(crate) fn conn(&self) -> &Connection {
446 &self.conn
447 }
448
449 pub fn open(path: &Path) -> Result<Self> {
450 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
451 }
452
453 pub fn open_read_only(path: &Path) -> Result<Self> {
454 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
455 }
456
457 pub fn open_query(path: &Path) -> Result<Self> {
458 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
459 }
460
461 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
462 if let Some(parent) = path.parent() {
463 std::fs::create_dir_all(parent)?;
464 }
465 let conn = match mode {
466 StoreOpenMode::ReadWrite => Connection::open(path),
467 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
468 path,
469 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
470 ),
471 }
472 .with_context(|| format!("open db: {}", path.display()))?;
473 apply_pragmas(&conn, mode)?;
474 if mode == StoreOpenMode::ReadWrite {
475 for sql in MIGRATIONS {
476 conn.execute_batch(sql)?;
477 }
478 ensure_schema_columns(&conn)?;
479 }
480 let store = Self {
481 conn,
482 root: path
483 .parent()
484 .unwrap_or_else(|| Path::new("."))
485 .to_path_buf(),
486 hot_log: RefCell::new(None),
487 search_writer: RefCell::new(None),
488 span_tree_cache: RefCell::new(None),
489 projector: RefCell::new(Projector::default()),
490 };
491 if mode == StoreOpenMode::ReadWrite {
492 store.warm_projector()?;
493 }
494 Ok(store)
495 }
496
497 fn invalidate_span_tree_cache(&self) {
498 *self.span_tree_cache.borrow_mut() = None;
499 }
500
501 fn warm_projector(&self) -> Result<()> {
502 let ids = self.running_session_ids()?;
503 let mut projector = self.projector.borrow_mut();
504 for id in ids {
505 for event in self.list_events_for_session(&id)? {
506 let _ = projector.apply(&event);
507 }
508 }
509 Ok(())
510 }
511
512 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
513 self.conn.execute(
514 "INSERT INTO sessions (
515 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
516 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
517 prompt_fingerprint, parent_session_id, agent_version, os, arch,
518 repo_file_count, repo_total_loc
519 )
520 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
521 ?16, ?17, ?18, ?19, ?20, ?21)
522 ON CONFLICT(id) DO UPDATE SET
523 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
524 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
525 status=excluded.status, trace_path=excluded.trace_path,
526 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
527 branch=excluded.branch, dirty_start=excluded.dirty_start,
528 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
529 prompt_fingerprint=excluded.prompt_fingerprint,
530 parent_session_id=excluded.parent_session_id,
531 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
532 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
533 params![
534 s.id,
535 s.agent,
536 s.model,
537 s.workspace,
538 s.started_at_ms as i64,
539 s.ended_at_ms.map(|v| v as i64),
540 format!("{:?}", s.status),
541 s.trace_path,
542 s.start_commit,
543 s.end_commit,
544 s.branch,
545 s.dirty_start.map(bool_to_i64),
546 s.dirty_end.map(bool_to_i64),
547 s.repo_binding_source.clone().unwrap_or_default(),
548 s.prompt_fingerprint.as_deref(),
549 s.parent_session_id.as_deref(),
550 s.agent_version.as_deref(),
551 s.os.as_deref(),
552 s.arch.as_deref(),
553 s.repo_file_count.map(|v| v as i64),
554 s.repo_total_loc.map(|v| v as i64),
555 ],
556 )?;
557 self.conn.execute(
558 "INSERT INTO session_repo_binding (
559 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
560 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
561 ON CONFLICT(session_id) DO UPDATE SET
562 start_commit=excluded.start_commit,
563 end_commit=excluded.end_commit,
564 branch=excluded.branch,
565 dirty_start=excluded.dirty_start,
566 dirty_end=excluded.dirty_end,
567 repo_binding_source=excluded.repo_binding_source",
568 params![
569 s.id,
570 s.start_commit,
571 s.end_commit,
572 s.branch,
573 s.dirty_start.map(bool_to_i64),
574 s.dirty_end.map(bool_to_i64),
575 s.repo_binding_source.clone().unwrap_or_default(),
576 ],
577 )?;
578 Ok(())
579 }
580
581 pub fn ensure_session_stub(
584 &self,
585 id: &str,
586 agent: &str,
587 workspace: &str,
588 started_at_ms: u64,
589 ) -> Result<()> {
590 self.conn.execute(
591 "INSERT OR IGNORE INTO sessions (
592 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
593 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
594 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
595 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
596 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
597 params![id, agent, workspace, started_at_ms as i64],
598 )?;
599 Ok(())
600 }
601
602 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
604 let n: i64 = self.conn.query_row(
605 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
606 [session_id],
607 |r| r.get(0),
608 )?;
609 Ok(n as u64)
610 }
611
612 pub fn append_event(&self, e: &Event) -> Result<()> {
613 self.append_event_with_sync(e, None)
614 }
615
616 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
618 let last_before = if projector_legacy_mode() {
619 None
620 } else {
621 self.last_event_seq_for_session(&e.session_id)?
622 };
623 let payload = serde_json::to_string(&e.payload)?;
624 self.conn.execute(
625 "INSERT INTO events (
626 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
627 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
628 stop_reason, latency_ms, ttft_ms, retry_count,
629 context_used_tokens, context_max_tokens,
630 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
631 ) VALUES (
632 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
633 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
634 )
635 ON CONFLICT(session_id, seq) DO UPDATE SET
636 ts_ms = excluded.ts_ms,
637 ts_exact = excluded.ts_exact,
638 kind = excluded.kind,
639 source = excluded.source,
640 tool = excluded.tool,
641 tool_call_id = excluded.tool_call_id,
642 tokens_in = excluded.tokens_in,
643 tokens_out = excluded.tokens_out,
644 reasoning_tokens = excluded.reasoning_tokens,
645 cost_usd_e6 = excluded.cost_usd_e6,
646 payload = excluded.payload,
647 stop_reason = excluded.stop_reason,
648 latency_ms = excluded.latency_ms,
649 ttft_ms = excluded.ttft_ms,
650 retry_count = excluded.retry_count,
651 context_used_tokens = excluded.context_used_tokens,
652 context_max_tokens = excluded.context_max_tokens,
653 cache_creation_tokens = excluded.cache_creation_tokens,
654 cache_read_tokens = excluded.cache_read_tokens,
655 system_prompt_tokens = excluded.system_prompt_tokens",
656 params![
657 e.session_id,
658 e.seq as i64,
659 e.ts_ms as i64,
660 bool_to_i64(e.ts_exact),
661 format!("{:?}", e.kind),
662 format!("{:?}", e.source),
663 e.tool,
664 e.tool_call_id,
665 e.tokens_in.map(|v| v as i64),
666 e.tokens_out.map(|v| v as i64),
667 e.reasoning_tokens.map(|v| v as i64),
668 e.cost_usd_e6,
669 payload,
670 e.stop_reason,
671 e.latency_ms.map(|v| v as i64),
672 e.ttft_ms.map(|v| v as i64),
673 e.retry_count.map(|v| v as i64),
674 e.context_used_tokens.map(|v| v as i64),
675 e.context_max_tokens.map(|v| v as i64),
676 e.cache_creation_tokens.map(|v| v as i64),
677 e.cache_read_tokens.map(|v| v as i64),
678 e.system_prompt_tokens.map(|v| v as i64),
679 ],
680 )?;
681 if self.conn.changes() == 0 {
682 return Ok(());
683 }
684 self.append_hot_event(e)?;
685 if projector_legacy_mode() {
686 index_event_derived(&self.conn, e)?;
687 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
688 self.invalidate_span_tree_cache();
689 } else if last_before.is_some_and(|last| e.seq <= last) {
690 self.replay_projector_session(&e.session_id)?;
691 } else {
692 let deltas = self.projector.borrow_mut().apply(e);
693 self.apply_projector_events(&deltas)?;
694 let expired = self
695 .projector
696 .borrow_mut()
697 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
698 self.apply_projector_events(&expired)?;
699 if is_stop_event(e) {
700 let flushed = self
701 .projector
702 .borrow_mut()
703 .flush_session(&e.session_id, e.ts_ms);
704 self.apply_projector_events(&flushed)?;
705 }
706 self.invalidate_span_tree_cache();
707 }
708 self.append_search_event(e);
709 let Some(ctx) = ctx else {
710 return Ok(());
711 };
712 let sync = &ctx.sync;
713 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
714 return Ok(());
715 }
716 let Some(salt) = try_team_salt(sync) else {
717 tracing::warn!(
718 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
719 );
720 return Ok(());
721 };
722 if sync.sample_rate < 1.0 {
723 let u: f64 = rand::random();
724 if u > sync.sample_rate {
725 return Ok(());
726 }
727 }
728 let Some(session) = self.get_session(&e.session_id)? else {
729 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
730 return Ok(());
731 };
732 let mut outbound = outbound_event_from_row(e, &session, &salt);
733 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
734 let row = serde_json::to_string(&outbound)?;
735 self.outbox()?.append(&e.session_id, "events", &row)?;
736 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
737 Ok(())
738 }
739
740 fn append_hot_event(&self, e: &Event) -> Result<()> {
741 if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
742 return Ok(());
743 }
744 let mut slot = self.hot_log.borrow_mut();
745 if slot.is_none() {
746 *slot = Some(HotLog::open(&self.root)?);
747 }
748 if let Some(log) = slot.as_mut() {
749 log.append(e)?;
750 }
751 Ok(())
752 }
753
754 fn append_search_event(&self, e: &Event) {
755 if let Err(err) = self.try_append_search_event(e) {
756 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
757 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
758 }
759 }
760
761 fn try_append_search_event(&self, e: &Event) -> Result<()> {
762 let Some(session) = self.get_session(&e.session_id)? else {
763 return Ok(());
764 };
765 let workspace = PathBuf::from(&session.workspace);
766 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
767 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
768 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
769 return Ok(());
770 };
771 let mut slot = self.search_writer.borrow_mut();
772 if slot.is_none() {
773 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
774 }
775 slot.as_mut().expect("writer").add(&doc)
776 }
777
778 pub fn flush_search(&self) -> Result<()> {
779 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
780 writer.commit()?;
781 }
782 Ok(())
783 }
784
785 fn outbox(&self) -> Result<Outbox> {
786 Outbox::open(&self.root)
787 }
788
789 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
790 if projector_legacy_mode() {
791 rebuild_tool_spans_for_session(&self.conn, session_id)?;
792 self.invalidate_span_tree_cache();
793 return Ok(());
794 }
795 let deltas = self
796 .projector
797 .borrow_mut()
798 .flush_session(session_id, now_ms);
799 if self.apply_projector_events(&deltas)? {
800 self.invalidate_span_tree_cache();
801 }
802 Ok(())
803 }
804
805 fn replay_projector_session(&self, session_id: &str) -> Result<()> {
806 clear_session_spans(&self.conn, session_id)?;
807 self.projector.borrow_mut().reset_session(session_id);
808 let events = self.list_events_for_session(session_id)?;
809 let mut changed = false;
810 for event in &events {
811 let deltas = self.projector.borrow_mut().apply(event);
812 changed |= self.apply_projector_events(&deltas)?;
813 }
814 if self
815 .get_session(session_id)?
816 .is_some_and(|session| session.status == SessionStatus::Done)
817 {
818 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
819 let deltas = self
820 .projector
821 .borrow_mut()
822 .flush_session(session_id, now_ms);
823 changed |= self.apply_projector_events(&deltas)?;
824 }
825 if changed {
826 self.invalidate_span_tree_cache();
827 }
828 Ok(())
829 }
830
831 fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
832 let mut changed = false;
833 for delta in deltas {
834 match delta {
835 ProjectorEvent::SpanClosed(span, sample) => {
836 upsert_tool_span_record(&self.conn, span)?;
837 tracing::debug!(
838 session_id = %sample.session_id,
839 span_id = %sample.span_id,
840 tool = ?sample.tool,
841 lead_time_ms = ?sample.lead_time_ms,
842 tokens_in = ?sample.tokens_in,
843 tokens_out = ?sample.tokens_out,
844 reasoning_tokens = ?sample.reasoning_tokens,
845 cost_usd_e6 = ?sample.cost_usd_e6,
846 paths = ?sample.paths,
847 "tool span closed"
848 );
849 changed = true;
850 }
851 ProjectorEvent::SpanPatched(span) => {
852 upsert_tool_span_record(&self.conn, span)?;
853 changed = true;
854 }
855 ProjectorEvent::FileTouched { session, path } => {
856 self.conn.execute(
857 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
858 params![session, path],
859 )?;
860 changed = true;
861 }
862 ProjectorEvent::SkillUsed { session, skill } => {
863 self.conn.execute(
864 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
865 params![session, skill],
866 )?;
867 changed = true;
868 }
869 ProjectorEvent::RuleUsed { session, rule } => {
870 self.conn.execute(
871 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
872 params![session, rule],
873 )?;
874 changed = true;
875 }
876 }
877 }
878 Ok(changed)
879 }
880
881 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
882 let rows = self.outbox()?.list_pending(limit)?;
883 if !rows.is_empty() {
884 return Ok(rows);
885 }
886 let mut stmt = self.conn.prepare(
887 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
888 )?;
889 let rows = stmt.query_map(params![limit as i64], |row| {
890 Ok((
891 row.get::<_, i64>(0)?,
892 row.get::<_, String>(1)?,
893 row.get::<_, String>(2)?,
894 ))
895 })?;
896 let mut out = Vec::new();
897 for r in rows {
898 out.push(r?);
899 }
900 Ok(out)
901 }
902
903 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
904 self.outbox()?.delete_ids(ids)?;
905 for id in ids {
906 self.conn
907 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
908 }
909 Ok(())
910 }
911
912 pub fn replace_outbox_rows(
913 &self,
914 owner_id: &str,
915 kind: &str,
916 payloads: &[String],
917 ) -> Result<()> {
918 self.outbox()?.replace(owner_id, kind, payloads)?;
919 self.conn.execute(
920 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
921 params![owner_id, kind],
922 )?;
923 for payload in payloads {
924 self.conn.execute(
925 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
926 params![owner_id, kind, payload],
927 )?;
928 }
929 Ok(())
930 }
931
932 pub fn outbox_pending_count(&self) -> Result<u64> {
933 let redb = self.outbox()?.pending_count()?;
934 if redb > 0 {
935 return Ok(redb);
936 }
937 let c: i64 =
938 self.conn
939 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
940 r.get(0)
941 })?;
942 Ok(c as u64)
943 }
944
945 pub fn set_sync_state_ok(&self) -> Result<()> {
946 let now = now_ms().to_string();
947 self.conn.execute(
948 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
949 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
950 params![now],
951 )?;
952 self.conn.execute(
953 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
954 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
955 [],
956 )?;
957 self.conn
958 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
959 Ok(())
960 }
961
962 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
963 let prev: i64 = self
964 .conn
965 .query_row(
966 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
967 [],
968 |r| {
969 let s: String = r.get(0)?;
970 Ok(s.parse::<i64>().unwrap_or(0))
971 },
972 )
973 .optional()?
974 .unwrap_or(0);
975 let next = prev.saturating_add(1);
976 self.conn.execute(
977 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
978 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
979 params![msg],
980 )?;
981 self.conn.execute(
982 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
983 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
984 params![next.to_string()],
985 )?;
986 Ok(())
987 }
988
989 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
990 let pending_outbox = self.outbox_pending_count()?;
991 let last_success_ms = self
992 .conn
993 .query_row(
994 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
995 [],
996 |r| r.get::<_, String>(0),
997 )
998 .optional()?
999 .and_then(|s| s.parse().ok());
1000 let last_error = self
1001 .conn
1002 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1003 r.get::<_, String>(0)
1004 })
1005 .optional()?;
1006 let consecutive_failures = self
1007 .conn
1008 .query_row(
1009 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1010 [],
1011 |r| r.get::<_, String>(0),
1012 )
1013 .optional()?
1014 .and_then(|s| s.parse().ok())
1015 .unwrap_or(0);
1016 Ok(SyncStatusSnapshot {
1017 pending_outbox,
1018 last_success_ms,
1019 last_error,
1020 consecutive_failures,
1021 })
1022 }
1023
1024 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1025 let row: Option<String> = self
1026 .conn
1027 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1028 r.get::<_, String>(0)
1029 })
1030 .optional()?;
1031 Ok(row.and_then(|s| s.parse().ok()))
1032 }
1033
1034 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1035 self.conn.execute(
1036 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1037 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1038 params![key, v.to_string()],
1039 )?;
1040 Ok(())
1041 }
1042
1043 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1045 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1046 let old_ids = old_session_ids(&tx, cutoff_ms)?;
1047 let sessions_to_remove: i64 = tx.query_row(
1048 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1049 params![cutoff_ms],
1050 |r| r.get(0),
1051 )?;
1052 let events_to_remove: i64 = tx.query_row(
1053 "SELECT COUNT(*) FROM events WHERE session_id IN \
1054 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1055 params![cutoff_ms],
1056 |r| r.get(0),
1057 )?;
1058
1059 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1060 tx.execute(
1061 &format!(
1062 "DELETE FROM tool_span_paths WHERE span_id IN \
1063 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1064 ),
1065 params![cutoff_ms],
1066 )?;
1067 tx.execute(
1068 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1069 params![cutoff_ms],
1070 )?;
1071 tx.execute(
1072 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1073 params![cutoff_ms],
1074 )?;
1075 tx.execute(
1076 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1077 params![cutoff_ms],
1078 )?;
1079 tx.execute(
1080 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1081 params![cutoff_ms],
1082 )?;
1083 tx.execute(
1084 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1085 params![cutoff_ms],
1086 )?;
1087 tx.execute(
1088 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1089 params![cutoff_ms],
1090 )?;
1091 tx.execute(
1092 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1093 params![cutoff_ms],
1094 )?;
1095 tx.execute(
1096 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1097 params![cutoff_ms],
1098 )?;
1099 tx.execute(
1100 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1101 params![cutoff_ms],
1102 )?;
1103 tx.execute(
1104 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1105 params![cutoff_ms],
1106 )?;
1107 tx.execute(
1108 "DELETE FROM sessions WHERE started_at_ms < ?1",
1109 params![cutoff_ms],
1110 )?;
1111 tx.commit()?;
1112 if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1113 let _ = writer.commit();
1114 }
1115 if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1116 tracing::warn!("search prune skipped: {err:#}");
1117 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1118 }
1119 self.invalidate_span_tree_cache();
1120 Ok(PruneStats {
1121 sessions_removed: sessions_to_remove as u64,
1122 events_removed: events_to_remove as u64,
1123 })
1124 }
1125
1126 pub fn vacuum(&self) -> Result<()> {
1128 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1129 Ok(())
1130 }
1131
1132 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1133 Ok(self
1134 .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1135 .rows)
1136 }
1137
1138 pub fn list_sessions_page(
1139 &self,
1140 workspace: &str,
1141 offset: usize,
1142 limit: usize,
1143 filter: SessionFilter,
1144 ) -> Result<SessionPage> {
1145 let (where_sql, args) = session_filter_sql(workspace, &filter);
1146 let total = self.query_session_page_count(&where_sql, &args)?;
1147 let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1148 let next = offset.saturating_add(rows.len());
1149 Ok(SessionPage {
1150 rows,
1151 total,
1152 next_offset: (next < total).then_some(next),
1153 })
1154 }
1155
1156 fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1157 let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1158 let total: i64 = self
1159 .conn
1160 .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1161 Ok(total as usize)
1162 }
1163
1164 fn query_session_page_rows(
1165 &self,
1166 where_sql: &str,
1167 args: &[Value],
1168 offset: usize,
1169 limit: usize,
1170 ) -> Result<Vec<SessionRecord>> {
1171 let sql = format!(
1172 "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1173 );
1174 let mut values = args.to_vec();
1175 values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1176 values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1177 let mut stmt = self.conn.prepare(&sql)?;
1178 let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1179 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1180 }
1181
1182 pub fn list_sessions_started_after(
1183 &self,
1184 workspace: &str,
1185 after_started_at_ms: u64,
1186 ) -> Result<Vec<SessionRecord>> {
1187 let mut stmt = self.conn.prepare(
1188 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1189 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1190 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1191 repo_file_count, repo_total_loc
1192 FROM sessions
1193 WHERE workspace = ?1 AND started_at_ms > ?2
1194 ORDER BY started_at_ms DESC, id ASC",
1195 )?;
1196 let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1197 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1198 }
1199
1200 pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1201 if ids.is_empty() {
1202 return Ok(Vec::new());
1203 }
1204 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1205 let sql =
1206 format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1207 let mut stmt = self.conn.prepare(&sql)?;
1208 let params: Vec<&dyn rusqlite::ToSql> =
1209 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1210 let rows = stmt.query_map(params.as_slice(), |r| {
1211 let status: String = r.get(1)?;
1212 Ok(SessionStatusRow {
1213 id: r.get(0)?,
1214 status: status_from_str(&status),
1215 ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1216 })
1217 })?;
1218 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1219 }
1220
1221 fn running_session_ids(&self) -> Result<Vec<String>> {
1222 let mut stmt = self
1223 .conn
1224 .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1225 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1226 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1227 }
1228
1229 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1230 let session_count: i64 = self.conn.query_row(
1231 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1232 params![workspace],
1233 |r| r.get(0),
1234 )?;
1235
1236 let total_cost: i64 = self.conn.query_row(
1237 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1238 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1239 params![workspace],
1240 |r| r.get(0),
1241 )?;
1242
1243 let mut stmt = self.conn.prepare(
1244 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1245 )?;
1246 let by_agent: Vec<(String, u64)> = stmt
1247 .query_map(params![workspace], |r| {
1248 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1249 })?
1250 .filter_map(|r| r.ok())
1251 .collect();
1252
1253 let mut stmt = self.conn.prepare(
1254 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1255 )?;
1256 let by_model: Vec<(String, u64)> = stmt
1257 .query_map(params![workspace], |r| {
1258 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1259 })?
1260 .filter_map(|r| r.ok())
1261 .collect();
1262
1263 let mut stmt = self.conn.prepare(
1264 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1265 WHERE s.workspace = ?1 AND tool IS NOT NULL
1266 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1267 )?;
1268 let top_tools: Vec<(String, u64)> = stmt
1269 .query_map(params![workspace], |r| {
1270 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1271 })?
1272 .filter_map(|r| r.ok())
1273 .collect();
1274
1275 Ok(SummaryStats {
1276 session_count: session_count as u64,
1277 total_cost_usd_e6: total_cost,
1278 by_agent,
1279 by_model,
1280 top_tools,
1281 })
1282 }
1283
1284 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1285 self.list_events_page(session_id, 0, i64::MAX as usize)
1286 }
1287
1288 pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1289 let mut stmt = self.conn.prepare(
1290 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1291 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1292 stop_reason, latency_ms, ttft_ms, retry_count,
1293 context_used_tokens, context_max_tokens,
1294 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1295 FROM events WHERE session_id = ?1 AND seq = ?2",
1296 )?;
1297 stmt.query_row(params![session_id, seq as i64], event_row)
1298 .optional()
1299 .map_err(Into::into)
1300 }
1301
1302 pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1303 let mut out = Vec::new();
1304 for session in self.list_sessions(workspace)? {
1305 for event in self.list_events_for_session(&session.id)? {
1306 out.push((session.clone(), event));
1307 }
1308 }
1309 out.sort_by(|a, b| {
1310 (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1311 });
1312 Ok(out)
1313 }
1314
1315 pub fn list_events_page(
1316 &self,
1317 session_id: &str,
1318 after_seq: u64,
1319 limit: usize,
1320 ) -> Result<Vec<Event>> {
1321 let mut stmt = self.conn.prepare(
1322 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1323 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1324 stop_reason, latency_ms, ttft_ms, retry_count,
1325 context_used_tokens, context_max_tokens,
1326 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1327 FROM events
1328 WHERE session_id = ?1 AND seq >= ?2
1329 ORDER BY seq ASC LIMIT ?3",
1330 )?;
1331 let rows = stmt.query_map(
1332 params![
1333 session_id,
1334 after_seq as i64,
1335 limit.min(i64::MAX as usize) as i64
1336 ],
1337 event_row,
1338 )?;
1339 let mut events = Vec::new();
1340 for row in rows {
1341 events.push(row?);
1342 }
1343 Ok(events)
1344 }
1345
1346 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1348 self.conn.execute(
1349 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1350 params![format!("{:?}", status), id],
1351 )?;
1352 Ok(())
1353 }
1354
1355 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1357 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1358 Ok(InsightsStats {
1359 total_sessions: count_q(
1360 &self.conn,
1361 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1362 workspace,
1363 )?,
1364 running_sessions: count_q(
1365 &self.conn,
1366 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1367 workspace,
1368 )?,
1369 total_events: count_q(
1370 &self.conn,
1371 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1372 workspace,
1373 )?,
1374 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1375 recent: recent_sessions_3(&self.conn, workspace)?,
1376 top_tools: top_tools_5(&self.conn, workspace)?,
1377 total_cost_usd_e6,
1378 sessions_with_cost,
1379 })
1380 }
1381
1382 pub fn retro_events_in_window(
1384 &self,
1385 workspace: &str,
1386 start_ms: u64,
1387 end_ms: u64,
1388 ) -> Result<Vec<(SessionRecord, Event)>> {
1389 let mut stmt = self.conn.prepare(
1390 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1391 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1392 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1393 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1394 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1395 s.repo_file_count, s.repo_total_loc,
1396 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1397 e.context_used_tokens, e.context_max_tokens,
1398 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1399 FROM events e
1400 JOIN sessions s ON s.id = e.session_id
1401 WHERE s.workspace = ?1
1402 AND (
1403 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1404 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1405 )
1406 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1407 )?;
1408 let rows = stmt.query_map(
1409 params![
1410 workspace,
1411 start_ms as i64,
1412 end_ms as i64,
1413 SYNTHETIC_TS_CEILING_MS,
1414 ],
1415 |row| {
1416 let payload_str: String = row.get(12)?;
1417 let status_str: String = row.get(19)?;
1418 Ok((
1419 SessionRecord {
1420 id: row.get(13)?,
1421 agent: row.get(14)?,
1422 model: row.get(15)?,
1423 workspace: row.get(16)?,
1424 started_at_ms: row.get::<_, i64>(17)? as u64,
1425 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1426 status: status_from_str(&status_str),
1427 trace_path: row.get(20)?,
1428 start_commit: row.get(21)?,
1429 end_commit: row.get(22)?,
1430 branch: row.get(23)?,
1431 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1432 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1433 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1434 prompt_fingerprint: row.get(27)?,
1435 parent_session_id: row.get(28)?,
1436 agent_version: row.get(29)?,
1437 os: row.get(30)?,
1438 arch: row.get(31)?,
1439 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1440 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1441 },
1442 Event {
1443 session_id: row.get(0)?,
1444 seq: row.get::<_, i64>(1)? as u64,
1445 ts_ms: row.get::<_, i64>(2)? as u64,
1446 ts_exact: row.get::<_, i64>(3)? != 0,
1447 kind: kind_from_str(&row.get::<_, String>(4)?),
1448 source: source_from_str(&row.get::<_, String>(5)?),
1449 tool: row.get(6)?,
1450 tool_call_id: row.get(7)?,
1451 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1452 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1453 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1454 cost_usd_e6: row.get(11)?,
1455 payload: serde_json::from_str(&payload_str)
1456 .unwrap_or(serde_json::Value::Null),
1457 stop_reason: row.get(34)?,
1458 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1459 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1460 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1461 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1462 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1463 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1464 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1465 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1466 },
1467 ))
1468 },
1469 )?;
1470
1471 let mut out = Vec::new();
1472 for r in rows {
1473 out.push(r?);
1474 }
1475 Ok(out)
1476 }
1477
1478 pub fn experiment_metric_values_in_window(
1479 &self,
1480 workspace: &str,
1481 start_ms: u64,
1482 end_ms: u64,
1483 metric: crate::experiment::types::Metric,
1484 ) -> Result<Vec<(SessionRecord, f64)>> {
1485 use crate::experiment::types::Metric;
1486 let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1487 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1488 s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1489 s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1490 let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1491 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1492 let sql = match metric {
1493 Metric::TokensPerSession => format!(
1494 "SELECT {session_cols},
1495 SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1496 FROM sessions s JOIN events e ON e.session_id = s.id
1497 WHERE {window}
1498 GROUP BY s.id"
1499 ),
1500 Metric::CostPerSession => format!(
1501 "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1502 FROM sessions s JOIN events e ON e.session_id = s.id
1503 WHERE {window}
1504 GROUP BY s.id"
1505 ),
1506 Metric::SuccessRate => format!(
1507 "SELECT {session_cols},
1508 CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1509 FROM sessions s JOIN events e ON e.session_id = s.id
1510 WHERE {window}
1511 GROUP BY s.id"
1512 ),
1513 Metric::DurationMinutes => format!(
1514 "SELECT {session_cols},
1515 (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1516 FROM sessions s
1517 WHERE s.workspace = ?1
1518 AND s.ended_at_ms IS NOT NULL
1519 AND EXISTS (
1520 SELECT 1 FROM events e
1521 WHERE e.session_id = s.id
1522 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1523 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1524 )"
1525 ),
1526 Metric::FilesPerSession => format!(
1527 "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1528 FROM sessions s
1529 JOIN events e ON e.session_id = s.id
1530 LEFT JOIN files_touched ft ON ft.session_id = s.id
1531 WHERE {window}
1532 GROUP BY s.id"
1533 ),
1534 Metric::SuccessRateByPrompt => format!(
1535 "SELECT {session_cols},
1536 1.0 - (MIN(
1537 SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1538 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1539 ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1540 FROM sessions s JOIN events e ON e.session_id = s.id
1541 WHERE {window}
1542 GROUP BY s.id
1543 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1544 ),
1545 Metric::CostByPrompt => format!(
1546 "SELECT {session_cols},
1547 SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1548 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1549 FROM sessions s JOIN events e ON e.session_id = s.id
1550 WHERE {window}
1551 GROUP BY s.id
1552 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1553 ),
1554 Metric::ToolLoops => format!(
1555 "WITH calls AS (
1556 SELECT e.session_id, e.tool,
1557 LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1558 FROM events e JOIN sessions s ON s.id = e.session_id
1559 WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1560 )
1561 SELECT {session_cols},
1562 SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1563 FROM sessions s JOIN calls ON calls.session_id = s.id
1564 GROUP BY s.id"
1565 ),
1566 };
1567 let mut stmt = self.conn.prepare(&sql)?;
1568 let rows = stmt.query_map(
1569 params![
1570 workspace,
1571 start_ms as i64,
1572 end_ms as i64,
1573 SYNTHETIC_TS_CEILING_MS,
1574 ],
1575 |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1576 )?;
1577 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1578 }
1579
1580 pub fn files_touched_in_window(
1582 &self,
1583 workspace: &str,
1584 start_ms: u64,
1585 end_ms: u64,
1586 ) -> Result<Vec<(String, String)>> {
1587 let mut stmt = self.conn.prepare(
1588 "SELECT DISTINCT ft.session_id, ft.path
1589 FROM files_touched ft
1590 JOIN sessions s ON s.id = ft.session_id
1591 WHERE s.workspace = ?1
1592 AND EXISTS (
1593 SELECT 1 FROM events e
1594 JOIN sessions ss ON ss.id = e.session_id
1595 WHERE e.session_id = ft.session_id
1596 AND (
1597 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1598 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1599 )
1600 )
1601 ORDER BY ft.session_id, ft.path",
1602 )?;
1603 let out: Vec<(String, String)> = stmt
1604 .query_map(
1605 params![
1606 workspace,
1607 start_ms as i64,
1608 end_ms as i64,
1609 SYNTHETIC_TS_CEILING_MS,
1610 ],
1611 |r| Ok((r.get(0)?, r.get(1)?)),
1612 )?
1613 .filter_map(|r| r.ok())
1614 .collect();
1615 Ok(out)
1616 }
1617
1618 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1621 let mut stmt = self.conn.prepare(
1622 "SELECT DISTINCT su.skill
1623 FROM skills_used su
1624 JOIN sessions s ON s.id = su.session_id
1625 WHERE s.workspace = ?1
1626 AND EXISTS (
1627 SELECT 1 FROM events e
1628 JOIN sessions ss ON ss.id = e.session_id
1629 WHERE e.session_id = su.session_id
1630 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1631 )
1632 ORDER BY su.skill",
1633 )?;
1634 let out: Vec<String> = stmt
1635 .query_map(
1636 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1637 |r| r.get::<_, String>(0),
1638 )?
1639 .filter_map(|r| r.ok())
1640 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1641 .collect();
1642 Ok(out)
1643 }
1644
1645 pub fn skills_used_in_window(
1647 &self,
1648 workspace: &str,
1649 start_ms: u64,
1650 end_ms: u64,
1651 ) -> Result<Vec<(String, String)>> {
1652 let mut stmt = self.conn.prepare(
1653 "SELECT DISTINCT su.session_id, su.skill
1654 FROM skills_used su
1655 JOIN sessions s ON s.id = su.session_id
1656 WHERE s.workspace = ?1
1657 AND EXISTS (
1658 SELECT 1 FROM events e
1659 JOIN sessions ss ON ss.id = e.session_id
1660 WHERE e.session_id = su.session_id
1661 AND (
1662 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1663 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1664 )
1665 )
1666 ORDER BY su.session_id, su.skill",
1667 )?;
1668 let out: Vec<(String, String)> = stmt
1669 .query_map(
1670 params![
1671 workspace,
1672 start_ms as i64,
1673 end_ms as i64,
1674 SYNTHETIC_TS_CEILING_MS,
1675 ],
1676 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1677 )?
1678 .filter_map(|r| r.ok())
1679 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1680 .collect();
1681 Ok(out)
1682 }
1683
1684 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1686 let mut stmt = self.conn.prepare(
1687 "SELECT DISTINCT ru.rule
1688 FROM rules_used ru
1689 JOIN sessions s ON s.id = ru.session_id
1690 WHERE s.workspace = ?1
1691 AND EXISTS (
1692 SELECT 1 FROM events e
1693 JOIN sessions ss ON ss.id = e.session_id
1694 WHERE e.session_id = ru.session_id
1695 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1696 )
1697 ORDER BY ru.rule",
1698 )?;
1699 let out: Vec<String> = stmt
1700 .query_map(
1701 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1702 |r| r.get::<_, String>(0),
1703 )?
1704 .filter_map(|r| r.ok())
1705 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1706 .collect();
1707 Ok(out)
1708 }
1709
1710 pub fn rules_used_in_window(
1712 &self,
1713 workspace: &str,
1714 start_ms: u64,
1715 end_ms: u64,
1716 ) -> Result<Vec<(String, String)>> {
1717 let mut stmt = self.conn.prepare(
1718 "SELECT DISTINCT ru.session_id, ru.rule
1719 FROM rules_used ru
1720 JOIN sessions s ON s.id = ru.session_id
1721 WHERE s.workspace = ?1
1722 AND EXISTS (
1723 SELECT 1 FROM events e
1724 JOIN sessions ss ON ss.id = e.session_id
1725 WHERE e.session_id = ru.session_id
1726 AND (
1727 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1728 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1729 )
1730 )
1731 ORDER BY ru.session_id, ru.rule",
1732 )?;
1733 let out: Vec<(String, String)> = stmt
1734 .query_map(
1735 params![
1736 workspace,
1737 start_ms as i64,
1738 end_ms as i64,
1739 SYNTHETIC_TS_CEILING_MS,
1740 ],
1741 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1742 )?
1743 .filter_map(|r| r.ok())
1744 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1745 .collect();
1746 Ok(out)
1747 }
1748
1749 pub fn sessions_active_in_window(
1751 &self,
1752 workspace: &str,
1753 start_ms: u64,
1754 end_ms: u64,
1755 ) -> Result<HashSet<String>> {
1756 let mut stmt = self.conn.prepare(
1757 "SELECT DISTINCT s.id
1758 FROM sessions s
1759 WHERE s.workspace = ?1
1760 AND EXISTS (
1761 SELECT 1 FROM events e
1762 WHERE e.session_id = s.id
1763 AND (
1764 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1765 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1766 )
1767 )",
1768 )?;
1769 let out: HashSet<String> = stmt
1770 .query_map(
1771 params![
1772 workspace,
1773 start_ms as i64,
1774 end_ms as i64,
1775 SYNTHETIC_TS_CEILING_MS,
1776 ],
1777 |r| r.get(0),
1778 )?
1779 .filter_map(|r| r.ok())
1780 .collect();
1781 Ok(out)
1782 }
1783
1784 pub fn session_costs_usd_e6_in_window(
1786 &self,
1787 workspace: &str,
1788 start_ms: u64,
1789 end_ms: u64,
1790 ) -> Result<HashMap<String, i64>> {
1791 let mut stmt = self.conn.prepare(
1792 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1793 FROM events e
1794 JOIN sessions s ON s.id = e.session_id
1795 WHERE s.workspace = ?1
1796 AND (
1797 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1798 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1799 )
1800 GROUP BY e.session_id",
1801 )?;
1802 let rows: Vec<(String, i64)> = stmt
1803 .query_map(
1804 params![
1805 workspace,
1806 start_ms as i64,
1807 end_ms as i64,
1808 SYNTHETIC_TS_CEILING_MS,
1809 ],
1810 |r| Ok((r.get(0)?, r.get(1)?)),
1811 )?
1812 .filter_map(|r| r.ok())
1813 .collect();
1814 Ok(rows.into_iter().collect())
1815 }
1816
1817 pub fn guidance_report(
1819 &self,
1820 workspace: &str,
1821 window_start_ms: u64,
1822 window_end_ms: u64,
1823 skill_slugs_on_disk: &HashSet<String>,
1824 rule_slugs_on_disk: &HashSet<String>,
1825 ) -> Result<GuidanceReport> {
1826 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1827 let denom = active.len() as u64;
1828 let costs =
1829 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1830
1831 let workspace_avg_cost_per_session_usd = if denom > 0 {
1832 let total_e6: i64 = active
1833 .iter()
1834 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1835 .sum();
1836 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1837 } else {
1838 None
1839 };
1840
1841 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1842 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1843 skill_sessions.entry(skill).or_default().insert(sid);
1844 }
1845 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1846 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1847 rule_sessions.entry(rule).or_default().insert(sid);
1848 }
1849
1850 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1851
1852 let mut push_row =
1853 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1854 let sessions = sids.len() as u64;
1855 let sessions_pct = if denom > 0 {
1856 sessions as f64 * 100.0 / denom as f64
1857 } else {
1858 0.0
1859 };
1860 let total_cost_usd_e6: i64 = sids
1861 .iter()
1862 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1863 .sum();
1864 let avg_cost_per_session_usd = if sessions > 0 {
1865 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1866 } else {
1867 None
1868 };
1869 let vs_workspace_avg_cost_per_session_usd =
1870 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1871 (Some(avg), Some(w)) => Some(avg - w),
1872 _ => None,
1873 };
1874 rows.push(GuidancePerfRow {
1875 kind,
1876 id,
1877 sessions,
1878 sessions_pct,
1879 total_cost_usd_e6,
1880 avg_cost_per_session_usd,
1881 vs_workspace_avg_cost_per_session_usd,
1882 on_disk,
1883 });
1884 };
1885
1886 let mut seen_skills: HashSet<String> = HashSet::new();
1887 for (id, sids) in &skill_sessions {
1888 seen_skills.insert(id.clone());
1889 push_row(
1890 GuidanceKind::Skill,
1891 id.clone(),
1892 sids,
1893 skill_slugs_on_disk.contains(id),
1894 );
1895 }
1896 for slug in skill_slugs_on_disk {
1897 if seen_skills.contains(slug) {
1898 continue;
1899 }
1900 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1901 }
1902
1903 let mut seen_rules: HashSet<String> = HashSet::new();
1904 for (id, sids) in &rule_sessions {
1905 seen_rules.insert(id.clone());
1906 push_row(
1907 GuidanceKind::Rule,
1908 id.clone(),
1909 sids,
1910 rule_slugs_on_disk.contains(id),
1911 );
1912 }
1913 for slug in rule_slugs_on_disk {
1914 if seen_rules.contains(slug) {
1915 continue;
1916 }
1917 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1918 }
1919
1920 rows.sort_by(|a, b| {
1921 b.sessions
1922 .cmp(&a.sessions)
1923 .then_with(|| a.kind.cmp(&b.kind))
1924 .then_with(|| a.id.cmp(&b.id))
1925 });
1926
1927 Ok(GuidanceReport {
1928 workspace: workspace.to_string(),
1929 window_start_ms,
1930 window_end_ms,
1931 sessions_in_window: denom,
1932 workspace_avg_cost_per_session_usd,
1933 rows,
1934 })
1935 }
1936
1937 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1938 let mut stmt = self.conn.prepare(
1939 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1940 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1941 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1942 repo_file_count, repo_total_loc
1943 FROM sessions WHERE id = ?1",
1944 )?;
1945 let mut rows = stmt.query_map(params![id], |row| {
1946 Ok((
1947 row.get::<_, String>(0)?,
1948 row.get::<_, String>(1)?,
1949 row.get::<_, Option<String>>(2)?,
1950 row.get::<_, String>(3)?,
1951 row.get::<_, i64>(4)?,
1952 row.get::<_, Option<i64>>(5)?,
1953 row.get::<_, String>(6)?,
1954 row.get::<_, String>(7)?,
1955 row.get::<_, Option<String>>(8)?,
1956 row.get::<_, Option<String>>(9)?,
1957 row.get::<_, Option<String>>(10)?,
1958 row.get::<_, Option<i64>>(11)?,
1959 row.get::<_, Option<i64>>(12)?,
1960 row.get::<_, String>(13)?,
1961 row.get::<_, Option<String>>(14)?,
1962 row.get::<_, Option<String>>(15)?,
1963 row.get::<_, Option<String>>(16)?,
1964 row.get::<_, Option<String>>(17)?,
1965 row.get::<_, Option<String>>(18)?,
1966 row.get::<_, Option<i64>>(19)?,
1967 row.get::<_, Option<i64>>(20)?,
1968 ))
1969 })?;
1970
1971 if let Some(row) = rows.next() {
1972 let (
1973 id,
1974 agent,
1975 model,
1976 workspace,
1977 started,
1978 ended,
1979 status_str,
1980 trace,
1981 start_commit,
1982 end_commit,
1983 branch,
1984 dirty_start,
1985 dirty_end,
1986 source,
1987 prompt_fingerprint,
1988 parent_session_id,
1989 agent_version,
1990 os,
1991 arch,
1992 repo_file_count,
1993 repo_total_loc,
1994 ) = row?;
1995 Ok(Some(SessionRecord {
1996 id,
1997 agent,
1998 model,
1999 workspace,
2000 started_at_ms: started as u64,
2001 ended_at_ms: ended.map(|v| v as u64),
2002 status: status_from_str(&status_str),
2003 trace_path: trace,
2004 start_commit,
2005 end_commit,
2006 branch,
2007 dirty_start: dirty_start.map(i64_to_bool),
2008 dirty_end: dirty_end.map(i64_to_bool),
2009 repo_binding_source: empty_to_none(source),
2010 prompt_fingerprint,
2011 parent_session_id,
2012 agent_version,
2013 os,
2014 arch,
2015 repo_file_count: repo_file_count.map(|v| v as u32),
2016 repo_total_loc: repo_total_loc.map(|v| v as u64),
2017 }))
2018 } else {
2019 Ok(None)
2020 }
2021 }
2022
2023 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2024 let mut stmt = self.conn.prepare(
2025 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2026 indexed_at_ms, dirty, graph_path
2027 FROM repo_snapshots WHERE workspace = ?1
2028 ORDER BY indexed_at_ms DESC LIMIT 1",
2029 )?;
2030 let mut rows = stmt.query_map(params![workspace], |row| {
2031 Ok(RepoSnapshotRecord {
2032 id: row.get(0)?,
2033 workspace: row.get(1)?,
2034 head_commit: row.get(2)?,
2035 dirty_fingerprint: row.get(3)?,
2036 analyzer_version: row.get(4)?,
2037 indexed_at_ms: row.get::<_, i64>(5)? as u64,
2038 dirty: row.get::<_, i64>(6)? != 0,
2039 graph_path: row.get(7)?,
2040 })
2041 })?;
2042 Ok(rows.next().transpose()?)
2043 }
2044
2045 pub fn save_repo_snapshot(
2046 &self,
2047 snapshot: &RepoSnapshotRecord,
2048 facts: &[FileFact],
2049 edges: &[RepoEdge],
2050 ) -> Result<()> {
2051 self.conn.execute(
2052 "INSERT INTO repo_snapshots (
2053 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2054 indexed_at_ms, dirty, graph_path
2055 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2056 ON CONFLICT(id) DO UPDATE SET
2057 workspace=excluded.workspace,
2058 head_commit=excluded.head_commit,
2059 dirty_fingerprint=excluded.dirty_fingerprint,
2060 analyzer_version=excluded.analyzer_version,
2061 indexed_at_ms=excluded.indexed_at_ms,
2062 dirty=excluded.dirty,
2063 graph_path=excluded.graph_path",
2064 params![
2065 snapshot.id,
2066 snapshot.workspace,
2067 snapshot.head_commit,
2068 snapshot.dirty_fingerprint,
2069 snapshot.analyzer_version,
2070 snapshot.indexed_at_ms as i64,
2071 bool_to_i64(snapshot.dirty),
2072 snapshot.graph_path,
2073 ],
2074 )?;
2075 self.conn.execute(
2076 "DELETE FROM file_facts WHERE snapshot_id = ?1",
2077 params![snapshot.id],
2078 )?;
2079 self.conn.execute(
2080 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2081 params![snapshot.id],
2082 )?;
2083 for fact in facts {
2084 self.conn.execute(
2085 "INSERT INTO file_facts (
2086 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2087 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2088 churn_30d, churn_90d, authors_90d, last_changed_ms
2089 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2090 params![
2091 fact.snapshot_id,
2092 fact.path,
2093 fact.language,
2094 fact.bytes as i64,
2095 fact.loc as i64,
2096 fact.sloc as i64,
2097 fact.complexity_total as i64,
2098 fact.max_fn_complexity as i64,
2099 fact.symbol_count as i64,
2100 fact.import_count as i64,
2101 fact.fan_in as i64,
2102 fact.fan_out as i64,
2103 fact.churn_30d as i64,
2104 fact.churn_90d as i64,
2105 fact.authors_90d as i64,
2106 fact.last_changed_ms.map(|v| v as i64),
2107 ],
2108 )?;
2109 }
2110 for edge in edges {
2111 self.conn.execute(
2112 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2113 VALUES (?1, ?2, ?3, ?4, ?5)
2114 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2115 DO UPDATE SET weight = weight + excluded.weight",
2116 params![
2117 snapshot.id,
2118 edge.from_path,
2119 edge.to_path,
2120 edge.kind,
2121 edge.weight as i64,
2122 ],
2123 )?;
2124 }
2125 Ok(())
2126 }
2127
2128 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2129 let mut stmt = self.conn.prepare(
2130 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2131 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2132 churn_30d, churn_90d, authors_90d, last_changed_ms
2133 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2134 )?;
2135 let rows = stmt.query_map(params![snapshot_id], |row| {
2136 Ok(FileFact {
2137 snapshot_id: row.get(0)?,
2138 path: row.get(1)?,
2139 language: row.get(2)?,
2140 bytes: row.get::<_, i64>(3)? as u64,
2141 loc: row.get::<_, i64>(4)? as u32,
2142 sloc: row.get::<_, i64>(5)? as u32,
2143 complexity_total: row.get::<_, i64>(6)? as u32,
2144 max_fn_complexity: row.get::<_, i64>(7)? as u32,
2145 symbol_count: row.get::<_, i64>(8)? as u32,
2146 import_count: row.get::<_, i64>(9)? as u32,
2147 fan_in: row.get::<_, i64>(10)? as u32,
2148 fan_out: row.get::<_, i64>(11)? as u32,
2149 churn_30d: row.get::<_, i64>(12)? as u32,
2150 churn_90d: row.get::<_, i64>(13)? as u32,
2151 authors_90d: row.get::<_, i64>(14)? as u32,
2152 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2153 })
2154 })?;
2155 Ok(rows.filter_map(|row| row.ok()).collect())
2156 }
2157
2158 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2159 let mut stmt = self.conn.prepare(
2160 "SELECT from_id, to_id, kind, weight
2161 FROM repo_edges WHERE snapshot_id = ?1
2162 ORDER BY kind, from_id, to_id",
2163 )?;
2164 let rows = stmt.query_map(params![snapshot_id], |row| {
2165 Ok(RepoEdge {
2166 from_path: row.get(0)?,
2167 to_path: row.get(1)?,
2168 kind: row.get(2)?,
2169 weight: row.get::<_, i64>(3)? as u32,
2170 })
2171 })?;
2172 Ok(rows.filter_map(|row| row.ok()).collect())
2173 }
2174
2175 pub fn tool_spans_in_window(
2176 &self,
2177 workspace: &str,
2178 start_ms: u64,
2179 end_ms: u64,
2180 ) -> Result<Vec<ToolSpanView>> {
2181 let mut stmt = self.conn.prepare(
2182 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2183 reasoning_tokens, cost_usd_e6, paths_json,
2184 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2185 FROM (
2186 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2187 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2188 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2189 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2190 ts.started_at_ms AS sort_ms
2191 FROM tool_spans ts
2192 JOIN sessions s ON s.id = ts.session_id
2193 WHERE s.workspace = ?1
2194 AND ts.started_at_ms >= ?2
2195 AND ts.started_at_ms <= ?3
2196 UNION ALL
2197 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2198 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2199 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2200 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2201 ts.ended_at_ms AS sort_ms
2202 FROM tool_spans ts
2203 JOIN sessions s ON s.id = ts.session_id
2204 WHERE s.workspace = ?1
2205 AND ts.started_at_ms IS NULL
2206 AND ts.ended_at_ms >= ?2
2207 AND ts.ended_at_ms <= ?3
2208 )
2209 ORDER BY sort_ms DESC",
2210 )?;
2211 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2212 let paths_json: String = row.get(8)?;
2213 Ok(ToolSpanView {
2214 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2215 tool: row
2216 .get::<_, Option<String>>(1)?
2217 .unwrap_or_else(|| "unknown".into()),
2218 status: row.get(2)?,
2219 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2220 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2221 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2222 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2223 cost_usd_e6: row.get(7)?,
2224 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2225 parent_span_id: row.get(9)?,
2226 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2227 subtree_cost_usd_e6: row.get(11)?,
2228 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2229 })
2230 })?;
2231 Ok(rows.filter_map(|row| row.ok()).collect())
2232 }
2233
2234 pub fn session_span_tree(
2235 &self,
2236 session_id: &str,
2237 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2238 let last_event_seq = self.last_event_seq_for_session(session_id)?;
2239 if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2240 && entry.session_id == session_id
2241 && entry.last_event_seq == last_event_seq
2242 {
2243 return Ok(entry.nodes.clone());
2244 }
2245 let mut stmt = self.conn.prepare(
2246 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2247 reasoning_tokens, cost_usd_e6, paths_json,
2248 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2249 FROM tool_spans
2250 WHERE session_id = ?1
2251 ORDER BY depth ASC, started_at_ms ASC",
2252 )?;
2253 let rows = stmt.query_map(params![session_id], |row| {
2254 let paths_json: String = row.get(8)?;
2255 Ok(crate::metrics::types::ToolSpanView {
2256 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2257 tool: row
2258 .get::<_, Option<String>>(1)?
2259 .unwrap_or_else(|| "unknown".into()),
2260 status: row.get(2)?,
2261 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2262 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2263 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2264 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2265 cost_usd_e6: row.get(7)?,
2266 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2267 parent_span_id: row.get(9)?,
2268 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2269 subtree_cost_usd_e6: row.get(11)?,
2270 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2271 })
2272 })?;
2273 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2274 let nodes = crate::store::span_tree::build_tree(spans);
2275 *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2276 session_id: session_id.to_string(),
2277 last_event_seq,
2278 nodes: nodes.clone(),
2279 });
2280 Ok(nodes)
2281 }
2282
2283 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2284 let seq = self
2285 .conn
2286 .query_row(
2287 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2288 params![session_id],
2289 |r| r.get::<_, Option<i64>>(0),
2290 )?
2291 .map(|v| v as u64);
2292 Ok(seq)
2293 }
2294
2295 pub fn tool_spans_sync_rows_in_window(
2301 &self,
2302 workspace: &str,
2303 start_ms: u64,
2304 end_ms: u64,
2305 ) -> Result<Vec<ToolSpanSyncRow>> {
2306 let mut stmt = self.conn.prepare(
2307 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2308 lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2309 FROM (
2310 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2311 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2312 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2313 ts.started_at_ms AS sort_ms
2314 FROM tool_spans ts
2315 JOIN sessions s ON s.id = ts.session_id
2316 WHERE s.workspace = ?1
2317 AND ts.started_at_ms IS NOT NULL
2318 AND ts.started_at_ms >= ?2
2319 AND ts.started_at_ms <= ?3
2320 UNION ALL
2321 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2322 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2323 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2324 ts.ended_at_ms AS sort_ms
2325 FROM tool_spans ts
2326 JOIN sessions s ON s.id = ts.session_id
2327 WHERE s.workspace = ?1
2328 AND ts.started_at_ms IS NULL
2329 AND ts.ended_at_ms IS NOT NULL
2330 AND ts.ended_at_ms >= ?2
2331 AND ts.ended_at_ms <= ?3
2332 )
2333 ORDER BY sort_ms ASC, span_id ASC",
2334 )?;
2335 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2336 let paths_json: String = row.get(12)?;
2337 Ok(ToolSpanSyncRow {
2338 span_id: row.get(0)?,
2339 session_id: row.get(1)?,
2340 tool: row.get(2)?,
2341 tool_call_id: row.get(3)?,
2342 status: row.get(4)?,
2343 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2344 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2345 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2346 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2347 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2348 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2349 cost_usd_e6: row.get(11)?,
2350 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2351 })
2352 })?;
2353 Ok(rows.filter_map(|row| row.ok()).collect())
2354 }
2355
2356 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2357 let mut stmt = self.conn.prepare(
2358 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2359 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2360 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2361 )?;
2362 let rows = stmt.query_map(params![session_id], |row| {
2363 let paths_json: String = row.get(12)?;
2364 Ok(ToolSpanSyncRow {
2365 span_id: row.get(0)?,
2366 session_id: row.get(1)?,
2367 tool: row.get(2)?,
2368 tool_call_id: row.get(3)?,
2369 status: row.get(4)?,
2370 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2371 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2372 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2373 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2374 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2375 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2376 cost_usd_e6: row.get(11)?,
2377 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2378 })
2379 })?;
2380 Ok(rows.filter_map(|row| row.ok()).collect())
2381 }
2382
2383 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2384 self.conn.execute(
2385 "INSERT OR REPLACE INTO session_evals
2386 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2387 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2388 rusqlite::params![
2389 eval.id,
2390 eval.session_id,
2391 eval.judge_model,
2392 eval.rubric_id,
2393 eval.score,
2394 eval.rationale,
2395 eval.flagged as i64,
2396 eval.created_at_ms as i64,
2397 ],
2398 )?;
2399 Ok(())
2400 }
2401
2402 pub fn list_evals_in_window(
2403 &self,
2404 start_ms: u64,
2405 end_ms: u64,
2406 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2407 let mut stmt = self.conn.prepare(
2408 "SELECT id, session_id, judge_model, rubric_id, score,
2409 rationale, flagged, created_at_ms
2410 FROM session_evals
2411 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2412 ORDER BY created_at_ms ASC",
2413 )?;
2414 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2415 Ok(crate::eval::types::EvalRow {
2416 id: r.get(0)?,
2417 session_id: r.get(1)?,
2418 judge_model: r.get(2)?,
2419 rubric_id: r.get(3)?,
2420 score: r.get(4)?,
2421 rationale: r.get(5)?,
2422 flagged: r.get::<_, i64>(6)? != 0,
2423 created_at_ms: r.get::<_, i64>(7)? as u64,
2424 })
2425 })?;
2426 rows.collect()
2427 }
2428
2429 pub fn list_evals_for_session(
2430 &self,
2431 session_id: &str,
2432 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2433 let mut stmt = self.conn.prepare(
2434 "SELECT id, session_id, judge_model, rubric_id, score,
2435 rationale, flagged, created_at_ms
2436 FROM session_evals
2437 WHERE session_id = ?1
2438 ORDER BY created_at_ms DESC",
2439 )?;
2440 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2441 Ok(crate::eval::types::EvalRow {
2442 id: r.get(0)?,
2443 session_id: r.get(1)?,
2444 judge_model: r.get(2)?,
2445 rubric_id: r.get(3)?,
2446 score: r.get(4)?,
2447 rationale: r.get(5)?,
2448 flagged: r.get::<_, i64>(6)? != 0,
2449 created_at_ms: r.get::<_, i64>(7)? as u64,
2450 })
2451 })?;
2452 rows.collect()
2453 }
2454
2455 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2456 use crate::feedback::types::FeedbackLabel;
2457 self.conn.execute(
2458 "INSERT OR REPLACE INTO session_feedback
2459 (id, session_id, score, label, note, created_at_ms)
2460 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2461 rusqlite::params![
2462 r.id,
2463 r.session_id,
2464 r.score.as_ref().map(|s| s.0 as i64),
2465 r.label.as_ref().map(FeedbackLabel::to_db_str),
2466 r.note,
2467 r.created_at_ms as i64,
2468 ],
2469 )?;
2470 let payload = serde_json::to_string(r).unwrap_or_default();
2471 self.conn.execute(
2472 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2473 VALUES (?1, 'session_feedback', ?2, 0)",
2474 rusqlite::params![r.session_id, payload],
2475 )?;
2476 Ok(())
2477 }
2478
2479 pub fn list_feedback_in_window(
2480 &self,
2481 start_ms: u64,
2482 end_ms: u64,
2483 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2484 let mut stmt = self.conn.prepare(
2485 "SELECT id, session_id, score, label, note, created_at_ms
2486 FROM session_feedback
2487 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2488 ORDER BY created_at_ms ASC",
2489 )?;
2490 let rows = stmt.query_map(
2491 rusqlite::params![start_ms as i64, end_ms as i64],
2492 feedback_row,
2493 )?;
2494 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2495 }
2496
2497 pub fn feedback_for_sessions(
2498 &self,
2499 ids: &[String],
2500 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2501 if ids.is_empty() {
2502 return Ok(std::collections::HashMap::new());
2503 }
2504 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2505 let sql = format!(
2506 "SELECT id, session_id, score, label, note, created_at_ms
2507 FROM session_feedback WHERE session_id IN ({placeholders})
2508 ORDER BY created_at_ms DESC"
2509 );
2510 let mut stmt = self.conn.prepare(&sql)?;
2511 let params: Vec<&dyn rusqlite::ToSql> =
2512 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2513 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2514 let mut map = std::collections::HashMap::new();
2515 for row in rows {
2516 let r = row?;
2517 map.entry(r.session_id.clone()).or_insert(r);
2518 }
2519 Ok(map)
2520 }
2521
2522 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2523 self.conn.execute(
2524 "INSERT INTO session_outcomes (
2525 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2526 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2527 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2528 ON CONFLICT(session_id) DO UPDATE SET
2529 test_passed=excluded.test_passed,
2530 test_failed=excluded.test_failed,
2531 test_skipped=excluded.test_skipped,
2532 build_ok=excluded.build_ok,
2533 lint_errors=excluded.lint_errors,
2534 revert_lines_14d=excluded.revert_lines_14d,
2535 pr_open=excluded.pr_open,
2536 ci_ok=excluded.ci_ok,
2537 measured_at_ms=excluded.measured_at_ms,
2538 measure_error=excluded.measure_error",
2539 params![
2540 row.session_id,
2541 row.test_passed,
2542 row.test_failed,
2543 row.test_skipped,
2544 row.build_ok.map(bool_to_i64),
2545 row.lint_errors,
2546 row.revert_lines_14d,
2547 row.pr_open,
2548 row.ci_ok.map(bool_to_i64),
2549 row.measured_at_ms as i64,
2550 row.measure_error.as_deref(),
2551 ],
2552 )?;
2553 Ok(())
2554 }
2555
2556 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2557 let mut stmt = self.conn.prepare(
2558 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2559 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2560 FROM session_outcomes WHERE session_id = ?1",
2561 )?;
2562 let row = stmt
2563 .query_row(params![session_id], outcome_row)
2564 .optional()?;
2565 Ok(row)
2566 }
2567
2568 pub fn list_session_outcomes_in_window(
2570 &self,
2571 workspace: &str,
2572 start_ms: u64,
2573 end_ms: u64,
2574 ) -> Result<Vec<SessionOutcomeRow>> {
2575 let mut stmt = self.conn.prepare(
2576 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2577 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2578 FROM session_outcomes o
2579 JOIN sessions s ON s.id = o.session_id
2580 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2581 ORDER BY o.measured_at_ms ASC",
2582 )?;
2583 let rows = stmt.query_map(
2584 params![workspace, start_ms as i64, end_ms as i64],
2585 outcome_row,
2586 )?;
2587 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2588 }
2589
2590 pub fn append_session_sample(
2591 &self,
2592 session_id: &str,
2593 ts_ms: u64,
2594 pid: u32,
2595 cpu_percent: Option<f64>,
2596 rss_bytes: Option<u64>,
2597 ) -> Result<()> {
2598 self.conn.execute(
2599 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2600 VALUES (?1, ?2, ?3, ?4, ?5)",
2601 params![
2602 session_id,
2603 ts_ms as i64,
2604 pid as i64,
2605 cpu_percent,
2606 rss_bytes.map(|b| b as i64)
2607 ],
2608 )?;
2609 Ok(())
2610 }
2611
2612 pub fn list_session_sample_aggs_in_window(
2614 &self,
2615 workspace: &str,
2616 start_ms: u64,
2617 end_ms: u64,
2618 ) -> Result<Vec<SessionSampleAgg>> {
2619 let mut stmt = self.conn.prepare(
2620 "SELECT ss.session_id, COUNT(*) AS n,
2621 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2622 FROM session_samples ss
2623 JOIN sessions s ON s.id = ss.session_id
2624 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2625 GROUP BY ss.session_id",
2626 )?;
2627 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2628 let sid: String = r.get(0)?;
2629 let n: i64 = r.get(1)?;
2630 let max_cpu: Option<f64> = r.get(2)?;
2631 let max_rss: Option<i64> = r.get(3)?;
2632 Ok(SessionSampleAgg {
2633 session_id: sid,
2634 sample_count: n as u64,
2635 max_cpu_percent: max_cpu.unwrap_or(0.0),
2636 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2637 })
2638 })?;
2639 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2640 }
2641
2642 pub fn list_sessions_for_eval(
2643 &self,
2644 since_ms: u64,
2645 min_cost_usd: f64,
2646 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2647 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2648 let mut stmt = self.conn.prepare(
2649 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2650 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2651 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2652 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2653 FROM sessions s
2654 WHERE s.started_at_ms >= ?1
2655 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2656 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2657 ORDER BY s.started_at_ms DESC",
2658 )?;
2659 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2660 Ok((
2661 r.get::<_, String>(0)?,
2662 r.get::<_, String>(1)?,
2663 r.get::<_, Option<String>>(2)?,
2664 r.get::<_, String>(3)?,
2665 r.get::<_, i64>(4)?,
2666 r.get::<_, Option<i64>>(5)?,
2667 r.get::<_, String>(6)?,
2668 r.get::<_, String>(7)?,
2669 r.get::<_, Option<String>>(8)?,
2670 r.get::<_, Option<String>>(9)?,
2671 r.get::<_, Option<String>>(10)?,
2672 r.get::<_, Option<i64>>(11)?,
2673 r.get::<_, Option<i64>>(12)?,
2674 r.get::<_, Option<String>>(13)?,
2675 r.get::<_, Option<String>>(14)?,
2676 r.get::<_, Option<String>>(15)?,
2677 r.get::<_, Option<String>>(16)?,
2678 r.get::<_, Option<String>>(17)?,
2679 r.get::<_, Option<String>>(18)?,
2680 r.get::<_, Option<i64>>(19)?,
2681 r.get::<_, Option<i64>>(20)?,
2682 ))
2683 })?;
2684 let mut out = Vec::new();
2685 for row in rows {
2686 let (
2687 id,
2688 agent,
2689 model,
2690 workspace,
2691 started,
2692 ended,
2693 status_str,
2694 trace,
2695 start_commit,
2696 end_commit,
2697 branch,
2698 dirty_start,
2699 dirty_end,
2700 source,
2701 prompt_fingerprint,
2702 parent_session_id,
2703 agent_version,
2704 os,
2705 arch,
2706 repo_file_count,
2707 repo_total_loc,
2708 ) = row?;
2709 out.push(crate::core::event::SessionRecord {
2710 id,
2711 agent,
2712 model,
2713 workspace,
2714 started_at_ms: started as u64,
2715 ended_at_ms: ended.map(|v| v as u64),
2716 status: status_from_str(&status_str),
2717 trace_path: trace,
2718 start_commit,
2719 end_commit,
2720 branch,
2721 dirty_start: dirty_start.map(i64_to_bool),
2722 dirty_end: dirty_end.map(i64_to_bool),
2723 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2724 prompt_fingerprint,
2725 parent_session_id,
2726 agent_version,
2727 os,
2728 arch,
2729 repo_file_count: repo_file_count.map(|v| v as u32),
2730 repo_total_loc: repo_total_loc.map(|v| v as u64),
2731 });
2732 }
2733 Ok(out)
2734 }
2735
2736 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2737 self.conn.execute(
2738 "INSERT OR IGNORE INTO prompt_snapshots
2739 (fingerprint, captured_at_ms, files_json, total_bytes)
2740 VALUES (?1, ?2, ?3, ?4)",
2741 params![
2742 snap.fingerprint,
2743 snap.captured_at_ms as i64,
2744 snap.files_json,
2745 snap.total_bytes as i64
2746 ],
2747 )?;
2748 Ok(())
2749 }
2750
2751 pub fn get_prompt_snapshot(
2752 &self,
2753 fingerprint: &str,
2754 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2755 self.conn
2756 .query_row(
2757 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2758 FROM prompt_snapshots WHERE fingerprint = ?1",
2759 params![fingerprint],
2760 |r| {
2761 Ok(crate::prompt::PromptSnapshot {
2762 fingerprint: r.get(0)?,
2763 captured_at_ms: r.get::<_, i64>(1)? as u64,
2764 files_json: r.get(2)?,
2765 total_bytes: r.get::<_, i64>(3)? as u64,
2766 })
2767 },
2768 )
2769 .optional()
2770 .map_err(Into::into)
2771 }
2772
2773 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2774 let mut stmt = self.conn.prepare(
2775 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2776 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2777 )?;
2778 let rows = stmt.query_map([], |r| {
2779 Ok(crate::prompt::PromptSnapshot {
2780 fingerprint: r.get(0)?,
2781 captured_at_ms: r.get::<_, i64>(1)? as u64,
2782 files_json: r.get(2)?,
2783 total_bytes: r.get::<_, i64>(3)? as u64,
2784 })
2785 })?;
2786 Ok(rows.filter_map(|r| r.ok()).collect())
2787 }
2788
2789 pub fn sessions_with_prompt_fingerprint(
2791 &self,
2792 workspace: &str,
2793 start_ms: u64,
2794 end_ms: u64,
2795 ) -> Result<Vec<(String, String)>> {
2796 let mut stmt = self.conn.prepare(
2797 "SELECT id, prompt_fingerprint FROM sessions
2798 WHERE workspace = ?1
2799 AND started_at_ms >= ?2 AND started_at_ms < ?3
2800 AND prompt_fingerprint IS NOT NULL",
2801 )?;
2802 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2803 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2804 })?;
2805 Ok(rows.filter_map(|r| r.ok()).collect())
2806 }
2807}
2808
2809impl Drop for Store {
2810 fn drop(&mut self) {
2811 if let Some(writer) = self.search_writer.get_mut().as_mut() {
2812 let _ = writer.commit();
2813 }
2814 }
2815}
2816
2817fn now_ms() -> u64 {
2818 std::time::SystemTime::now()
2819 .duration_since(std::time::UNIX_EPOCH)
2820 .unwrap_or_default()
2821 .as_millis() as u64
2822}
2823
2824fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
2825 let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
2826 let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
2827 Ok(rows.filter_map(|r| r.ok()).collect())
2828}
2829
2830fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2831 raw.and_then(|s| s.trim().parse::<u64>().ok())
2832 .unwrap_or(DEFAULT_MMAP_MB)
2833 .saturating_mul(1024)
2834 .saturating_mul(1024)
2835 .min(i64::MAX as u64) as i64
2836}
2837
2838fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2839 let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2840 conn.execute_batch(&format!(
2841 "
2842 PRAGMA journal_mode=WAL;
2843 PRAGMA busy_timeout=5000;
2844 PRAGMA synchronous=NORMAL;
2845 PRAGMA cache_size=-65536;
2846 PRAGMA mmap_size={mmap_size};
2847 PRAGMA temp_store=MEMORY;
2848 PRAGMA wal_autocheckpoint=1000;
2849 "
2850 ))?;
2851 if mode == StoreOpenMode::ReadOnlyQuery {
2852 conn.execute_batch("PRAGMA query_only=ON;")?;
2853 }
2854 Ok(())
2855}
2856
2857fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2858 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2859}
2860
2861fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2862 let status_str: String = row.get(6)?;
2863 Ok(SessionRecord {
2864 id: row.get(0)?,
2865 agent: row.get(1)?,
2866 model: row.get(2)?,
2867 workspace: row.get(3)?,
2868 started_at_ms: row.get::<_, i64>(4)? as u64,
2869 ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2870 status: status_from_str(&status_str),
2871 trace_path: row.get(7)?,
2872 start_commit: row.get(8)?,
2873 end_commit: row.get(9)?,
2874 branch: row.get(10)?,
2875 dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2876 dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2877 repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2878 prompt_fingerprint: row.get(14)?,
2879 parent_session_id: row.get(15)?,
2880 agent_version: row.get(16)?,
2881 os: row.get(17)?,
2882 arch: row.get(18)?,
2883 repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2884 repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2885 })
2886}
2887
2888fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2889 let payload_str: String = row.get(12)?;
2890 Ok(Event {
2891 session_id: row.get(0)?,
2892 seq: row.get::<_, i64>(1)? as u64,
2893 ts_ms: row.get::<_, i64>(2)? as u64,
2894 ts_exact: row.get::<_, i64>(3)? != 0,
2895 kind: kind_from_str(&row.get::<_, String>(4)?),
2896 source: source_from_str(&row.get::<_, String>(5)?),
2897 tool: row.get(6)?,
2898 tool_call_id: row.get(7)?,
2899 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2900 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2901 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2902 cost_usd_e6: row.get(11)?,
2903 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2904 stop_reason: row.get(13)?,
2905 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2906 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2907 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2908 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2909 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2910 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2911 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2912 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2913 })
2914}
2915
2916fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2917 let mut clauses = vec!["workspace = ?".to_string()];
2918 let mut args = vec![Value::Text(workspace.to_string())];
2919 if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2920 clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2921 args.push(Value::Text(format!("{}%", escape_like(prefix))));
2922 }
2923 if let Some(status) = &filter.status {
2924 clauses.push("status = ?".to_string());
2925 args.push(Value::Text(format!("{status:?}")));
2926 }
2927 if let Some(since_ms) = filter.since_ms {
2928 clauses.push("started_at_ms >= ?".to_string());
2929 args.push(Value::Integer(since_ms as i64));
2930 }
2931 (format!("WHERE {}", clauses.join(" AND ")), args)
2932}
2933
2934fn escape_like(raw: &str) -> String {
2935 raw.to_lowercase()
2936 .replace('\\', "\\\\")
2937 .replace('%', "\\%")
2938 .replace('_', "\\_")
2939}
2940
2941fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2942 let cost: i64 = conn.query_row(
2943 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2944 params![workspace], |r| r.get(0),
2945 )?;
2946 let with_cost: i64 = conn.query_row(
2947 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2948 params![workspace], |r| r.get(0),
2949 )?;
2950 Ok((cost, with_cost as u64))
2951}
2952
2953fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2954 let build_raw: Option<i64> = r.get(4)?;
2955 let ci_raw: Option<i64> = r.get(8)?;
2956 Ok(SessionOutcomeRow {
2957 session_id: r.get(0)?,
2958 test_passed: r.get(1)?,
2959 test_failed: r.get(2)?,
2960 test_skipped: r.get(3)?,
2961 build_ok: build_raw.map(|v| v != 0),
2962 lint_errors: r.get(5)?,
2963 revert_lines_14d: r.get(6)?,
2964 pr_open: r.get(7)?,
2965 ci_ok: ci_raw.map(|v| v != 0),
2966 measured_at_ms: r.get::<_, i64>(9)? as u64,
2967 measure_error: r.get(10)?,
2968 })
2969}
2970
2971fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2972 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2973 let score = r
2974 .get::<_, Option<i64>>(2)?
2975 .and_then(|v| FeedbackScore::new(v as u8));
2976 let label = r
2977 .get::<_, Option<String>>(3)?
2978 .and_then(|s| FeedbackLabel::from_str_opt(&s));
2979 Ok(FeedbackRecord {
2980 id: r.get(0)?,
2981 session_id: r.get(1)?,
2982 score,
2983 label,
2984 note: r.get(4)?,
2985 created_at_ms: r.get::<_, i64>(5)? as u64,
2986 })
2987}
2988
2989fn day_label(day_idx: u64) -> &'static str {
2990 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2991}
2992
2993fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2994 let week_ago = now.saturating_sub(7 * 86_400_000);
2995 let mut stmt = conn
2996 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2997 let days: Vec<u64> = stmt
2998 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2999 .filter_map(|r| r.ok())
3000 .map(|v| v as u64 / 86_400_000)
3001 .collect();
3002 let today = now / 86_400_000;
3003 Ok((0u64..7)
3004 .map(|i| {
3005 let d = today.saturating_sub(6 - i);
3006 (
3007 day_label(d).to_string(),
3008 days.iter().filter(|&&x| x == d).count() as u64,
3009 )
3010 })
3011 .collect())
3012}
3013
3014fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3015 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3016 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3017 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3018 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3019 COUNT(e.id) FROM sessions s \
3020 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3021 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3022 let mut stmt = conn.prepare(sql)?;
3023 let out: Vec<(SessionRecord, u64)> = stmt
3024 .query_map(params![workspace], |r| {
3025 let st: String = r.get(6)?;
3026 Ok((
3027 SessionRecord {
3028 id: r.get(0)?,
3029 agent: r.get(1)?,
3030 model: r.get(2)?,
3031 workspace: r.get(3)?,
3032 started_at_ms: r.get::<_, i64>(4)? as u64,
3033 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3034 status: status_from_str(&st),
3035 trace_path: r.get(7)?,
3036 start_commit: r.get(8)?,
3037 end_commit: r.get(9)?,
3038 branch: r.get(10)?,
3039 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3040 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3041 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3042 prompt_fingerprint: r.get(14)?,
3043 parent_session_id: r.get(15)?,
3044 agent_version: r.get(16)?,
3045 os: r.get(17)?,
3046 arch: r.get(18)?,
3047 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3048 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3049 },
3050 r.get::<_, i64>(21)? as u64,
3051 ))
3052 })?
3053 .filter_map(|r| r.ok())
3054 .collect();
3055 Ok(out)
3056}
3057
3058fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3059 let mut stmt = conn.prepare(
3060 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3061 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3062 )?;
3063 let out: Vec<(String, u64)> = stmt
3064 .query_map(params![workspace], |r| {
3065 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3066 })?
3067 .filter_map(|r| r.ok())
3068 .collect();
3069 Ok(out)
3070}
3071
3072fn status_from_str(s: &str) -> SessionStatus {
3073 match s {
3074 "Running" => SessionStatus::Running,
3075 "Waiting" => SessionStatus::Waiting,
3076 "Idle" => SessionStatus::Idle,
3077 _ => SessionStatus::Done,
3078 }
3079}
3080
3081fn projector_legacy_mode() -> bool {
3082 std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3083}
3084
3085fn is_stop_event(e: &Event) -> bool {
3086 if !matches!(e.kind, EventKind::Hook) {
3087 return false;
3088 }
3089 e.payload
3090 .get("event")
3091 .and_then(|v| v.as_str())
3092 .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3093 == Some("Stop")
3094}
3095
3096fn kind_from_str(s: &str) -> EventKind {
3097 match s {
3098 "ToolCall" => EventKind::ToolCall,
3099 "ToolResult" => EventKind::ToolResult,
3100 "Message" => EventKind::Message,
3101 "Error" => EventKind::Error,
3102 "Cost" => EventKind::Cost,
3103 "Hook" => EventKind::Hook,
3104 "Lifecycle" => EventKind::Lifecycle,
3105 _ => EventKind::Hook,
3106 }
3107}
3108
3109fn source_from_str(s: &str) -> EventSource {
3110 match s {
3111 "Tail" => EventSource::Tail,
3112 "Hook" => EventSource::Hook,
3113 _ => EventSource::Proxy,
3114 }
3115}
3116
3117fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3118 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3119 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3120 ensure_column(conn, "sessions", "branch", "TEXT")?;
3121 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3122 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3123 ensure_column(
3124 conn,
3125 "sessions",
3126 "repo_binding_source",
3127 "TEXT NOT NULL DEFAULT ''",
3128 )?;
3129 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3130 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3131 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3132 ensure_column(conn, "events", "stop_reason", "TEXT")?;
3133 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3134 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3135 ensure_column(conn, "events", "retry_count", "INTEGER")?;
3136 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3137 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3138 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3139 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3140 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3141 ensure_column(
3142 conn,
3143 "sync_outbox",
3144 "kind",
3145 "TEXT NOT NULL DEFAULT 'events'",
3146 )?;
3147 ensure_column(
3148 conn,
3149 "experiments",
3150 "state",
3151 "TEXT NOT NULL DEFAULT 'Draft'",
3152 )?;
3153 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3154 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3155 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3156 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3157 ensure_column(conn, "sessions", "os", "TEXT")?;
3158 ensure_column(conn, "sessions", "arch", "TEXT")?;
3159 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3160 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3161 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3162 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3163 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3164 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3165 conn.execute_batch(
3166 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3167 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3168 )?;
3169 Ok(())
3170}
3171
3172fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3173 if has_column(conn, table, column)? {
3174 return Ok(());
3175 }
3176 conn.execute(
3177 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
3178 [],
3179 )?;
3180 Ok(())
3181}
3182
3183fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3184 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3185 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3186 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3187}
3188
3189fn bool_to_i64(v: bool) -> i64 {
3190 if v { 1 } else { 0 }
3191}
3192
3193fn i64_to_bool(v: i64) -> bool {
3194 v != 0
3195}
3196
3197fn empty_to_none(s: String) -> Option<String> {
3198 if s.is_empty() { None } else { Some(s) }
3199}
3200
3201#[cfg(test)]
3202mod tests {
3203 use super::*;
3204 use serde_json::json;
3205 use std::collections::HashSet;
3206 use tempfile::TempDir;
3207
3208 fn make_session(id: &str) -> SessionRecord {
3209 SessionRecord {
3210 id: id.to_string(),
3211 agent: "cursor".to_string(),
3212 model: None,
3213 workspace: "/ws".to_string(),
3214 started_at_ms: 1000,
3215 ended_at_ms: None,
3216 status: SessionStatus::Done,
3217 trace_path: "/trace".to_string(),
3218 start_commit: None,
3219 end_commit: None,
3220 branch: None,
3221 dirty_start: None,
3222 dirty_end: None,
3223 repo_binding_source: None,
3224 prompt_fingerprint: None,
3225 parent_session_id: None,
3226 agent_version: None,
3227 os: None,
3228 arch: None,
3229 repo_file_count: None,
3230 repo_total_loc: None,
3231 }
3232 }
3233
3234 fn make_event(session_id: &str, seq: u64) -> Event {
3235 Event {
3236 session_id: session_id.to_string(),
3237 seq,
3238 ts_ms: 1000 + seq * 100,
3239 ts_exact: false,
3240 kind: EventKind::ToolCall,
3241 source: EventSource::Tail,
3242 tool: Some("read_file".to_string()),
3243 tool_call_id: Some(format!("call_{seq}")),
3244 tokens_in: None,
3245 tokens_out: None,
3246 reasoning_tokens: None,
3247 cost_usd_e6: None,
3248 stop_reason: None,
3249 latency_ms: None,
3250 ttft_ms: None,
3251 retry_count: None,
3252 context_used_tokens: None,
3253 context_max_tokens: None,
3254 cache_creation_tokens: None,
3255 cache_read_tokens: None,
3256 system_prompt_tokens: None,
3257 payload: json!({}),
3258 }
3259 }
3260
3261 #[test]
3262 fn open_and_wal_mode() {
3263 let dir = TempDir::new().unwrap();
3264 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3265 let mode: String = store
3266 .conn
3267 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3268 .unwrap();
3269 assert_eq!(mode, "wal");
3270 }
3271
3272 #[test]
3273 fn open_applies_phase0_pragmas() {
3274 let dir = TempDir::new().unwrap();
3275 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3276 let synchronous: i64 = store
3277 .conn
3278 .query_row("PRAGMA synchronous", [], |r| r.get(0))
3279 .unwrap();
3280 let cache_size: i64 = store
3281 .conn
3282 .query_row("PRAGMA cache_size", [], |r| r.get(0))
3283 .unwrap();
3284 let temp_store: i64 = store
3285 .conn
3286 .query_row("PRAGMA temp_store", [], |r| r.get(0))
3287 .unwrap();
3288 let wal_autocheckpoint: i64 = store
3289 .conn
3290 .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3291 .unwrap();
3292 assert_eq!(synchronous, 1);
3293 assert_eq!(cache_size, -65_536);
3294 assert_eq!(temp_store, 2);
3295 assert_eq!(wal_autocheckpoint, 1_000);
3296 assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3297 }
3298
3299 #[test]
3300 fn read_only_open_sets_query_only() {
3301 let dir = TempDir::new().unwrap();
3302 let db = dir.path().join("kaizen.db");
3303 Store::open(&db).unwrap();
3304 let store = Store::open_read_only(&db).unwrap();
3305 let query_only: i64 = store
3306 .conn
3307 .query_row("PRAGMA query_only", [], |r| r.get(0))
3308 .unwrap();
3309 assert_eq!(query_only, 1);
3310 }
3311
3312 #[test]
3313 fn phase0_indexes_exist() {
3314 let dir = TempDir::new().unwrap();
3315 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3316 for name in [
3317 "tool_spans_session_idx",
3318 "tool_spans_started_idx",
3319 "session_samples_ts_idx",
3320 "events_ts_idx",
3321 "feedback_session_idx",
3322 ] {
3323 let found: i64 = store
3324 .conn
3325 .query_row(
3326 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3327 params![name],
3328 |r| r.get(0),
3329 )
3330 .unwrap();
3331 assert_eq!(found, 1, "{name}");
3332 }
3333 }
3334
3335 #[test]
3336 fn upsert_and_get_session() {
3337 let dir = TempDir::new().unwrap();
3338 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3339 let s = make_session("s1");
3340 store.upsert_session(&s).unwrap();
3341
3342 let got = store.get_session("s1").unwrap().unwrap();
3343 assert_eq!(got.id, "s1");
3344 assert_eq!(got.status, SessionStatus::Done);
3345 }
3346
3347 #[test]
3348 fn append_and_list_events_round_trip() {
3349 let dir = TempDir::new().unwrap();
3350 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3351 let s = make_session("s2");
3352 store.upsert_session(&s).unwrap();
3353 store.append_event(&make_event("s2", 0)).unwrap();
3354 store.append_event(&make_event("s2", 1)).unwrap();
3355
3356 let sessions = store.list_sessions("/ws").unwrap();
3357 assert_eq!(sessions.len(), 1);
3358 assert_eq!(sessions[0].id, "s2");
3359 }
3360
3361 #[test]
3362 fn list_sessions_page_orders_and_counts() {
3363 let dir = TempDir::new().unwrap();
3364 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3365 let mut a = make_session("a");
3366 a.started_at_ms = 2_000;
3367 let mut b = make_session("b");
3368 b.started_at_ms = 2_000;
3369 let mut c = make_session("c");
3370 c.started_at_ms = 1_000;
3371 store.upsert_session(&c).unwrap();
3372 store.upsert_session(&b).unwrap();
3373 store.upsert_session(&a).unwrap();
3374
3375 let page = store
3376 .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3377 .unwrap();
3378 assert_eq!(page.total, 3);
3379 assert_eq!(page.next_offset, Some(2));
3380 assert_eq!(
3381 page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3382 vec!["a", "b"]
3383 );
3384
3385 let all = store.list_sessions("/ws").unwrap();
3386 assert_eq!(
3387 all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3388 vec!["a", "b", "c"]
3389 );
3390 }
3391
3392 #[test]
3393 fn list_sessions_page_filters_in_sql_shape() {
3394 let dir = TempDir::new().unwrap();
3395 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3396 let mut cursor = make_session("cursor");
3397 cursor.agent = "Cursor".into();
3398 cursor.started_at_ms = 2_000;
3399 cursor.status = SessionStatus::Running;
3400 let mut claude = make_session("claude");
3401 claude.agent = "claude".into();
3402 claude.started_at_ms = 3_000;
3403 store.upsert_session(&cursor).unwrap();
3404 store.upsert_session(&claude).unwrap();
3405
3406 let page = store
3407 .list_sessions_page(
3408 "/ws",
3409 0,
3410 10,
3411 SessionFilter {
3412 agent_prefix: Some("cur".into()),
3413 status: Some(SessionStatus::Running),
3414 since_ms: Some(1_500),
3415 },
3416 )
3417 .unwrap();
3418 assert_eq!(page.total, 1);
3419 assert_eq!(page.rows[0].id, "cursor");
3420 }
3421
3422 #[test]
3423 fn incremental_session_helpers_find_new_rows_and_statuses() {
3424 let dir = TempDir::new().unwrap();
3425 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3426 let mut old = make_session("old");
3427 old.started_at_ms = 1_000;
3428 let mut new = make_session("new");
3429 new.started_at_ms = 2_000;
3430 new.status = SessionStatus::Running;
3431 store.upsert_session(&old).unwrap();
3432 store.upsert_session(&new).unwrap();
3433
3434 let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3435 assert_eq!(rows.len(), 1);
3436 assert_eq!(rows[0].id, "new");
3437
3438 store
3439 .update_session_status("new", SessionStatus::Done)
3440 .unwrap();
3441 let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3442 assert_eq!(statuses.len(), 1);
3443 assert_eq!(statuses[0].status, SessionStatus::Done);
3444 }
3445
3446 #[test]
3447 fn summary_stats_empty() {
3448 let dir = TempDir::new().unwrap();
3449 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3450 let stats = store.summary_stats("/ws").unwrap();
3451 assert_eq!(stats.session_count, 0);
3452 assert_eq!(stats.total_cost_usd_e6, 0);
3453 }
3454
3455 #[test]
3456 fn summary_stats_counts_sessions() {
3457 let dir = TempDir::new().unwrap();
3458 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3459 store.upsert_session(&make_session("a")).unwrap();
3460 store.upsert_session(&make_session("b")).unwrap();
3461 let stats = store.summary_stats("/ws").unwrap();
3462 assert_eq!(stats.session_count, 2);
3463 assert_eq!(stats.by_agent.len(), 1);
3464 assert_eq!(stats.by_agent[0].0, "cursor");
3465 assert_eq!(stats.by_agent[0].1, 2);
3466 }
3467
3468 #[test]
3469 fn list_events_for_session_round_trip() {
3470 let dir = TempDir::new().unwrap();
3471 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3472 store.upsert_session(&make_session("s4")).unwrap();
3473 store.append_event(&make_event("s4", 0)).unwrap();
3474 store.append_event(&make_event("s4", 1)).unwrap();
3475 let events = store.list_events_for_session("s4").unwrap();
3476 assert_eq!(events.len(), 2);
3477 assert_eq!(events[0].seq, 0);
3478 assert_eq!(events[1].seq, 1);
3479 }
3480
3481 #[test]
3482 fn list_events_page_uses_inclusive_seq_cursor() {
3483 let dir = TempDir::new().unwrap();
3484 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3485 store.upsert_session(&make_session("paged")).unwrap();
3486 for seq in 0..5 {
3487 store.append_event(&make_event("paged", seq)).unwrap();
3488 }
3489 let first = store.list_events_page("paged", 0, 2).unwrap();
3490 assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3491 let second = store
3492 .list_events_page("paged", first[1].seq + 1, 2)
3493 .unwrap();
3494 assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3495 }
3496
3497 #[test]
3498 fn append_event_dedup() {
3499 let dir = TempDir::new().unwrap();
3500 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3501 store.upsert_session(&make_session("s5")).unwrap();
3502 store.append_event(&make_event("s5", 0)).unwrap();
3503 store.append_event(&make_event("s5", 0)).unwrap();
3505 let events = store.list_events_for_session("s5").unwrap();
3506 assert_eq!(events.len(), 1);
3507 }
3508
3509 #[test]
3510 fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3511 let dir = TempDir::new().unwrap();
3512 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3513 assert!(store.session_span_tree("missing").unwrap().is_empty());
3514 assert!(store.span_tree_cache.borrow().is_some());
3515
3516 store.upsert_session(&make_session("tree")).unwrap();
3517 let call = make_event("tree", 0);
3518 store.append_event(&call).unwrap();
3519 assert!(store.span_tree_cache.borrow().is_none());
3520 assert!(store.session_span_tree("tree").unwrap().is_empty());
3521 assert!(store.span_tree_cache.borrow().is_some());
3522 let mut result = make_event("tree", 1);
3523 result.kind = EventKind::ToolResult;
3524 result.tool_call_id = call.tool_call_id.clone();
3525 store.append_event(&result).unwrap();
3526 assert!(store.span_tree_cache.borrow().is_none());
3527 let first = store.session_span_tree("tree").unwrap();
3528 assert_eq!(first.len(), 1);
3529 assert!(store.span_tree_cache.borrow().is_some());
3530 store.append_event(&make_event("tree", 2)).unwrap();
3531 assert!(store.span_tree_cache.borrow().is_none());
3532 }
3533
3534 #[test]
3535 fn tool_spans_in_window_uses_started_then_ended_fallback() {
3536 let dir = TempDir::new().unwrap();
3537 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3538 store.upsert_session(&make_session("spans")).unwrap();
3539 for (id, started, ended) in [
3540 ("started", Some(200_i64), None),
3541 ("fallback", None, Some(250_i64)),
3542 ("outside", Some(400_i64), None),
3543 ("too_old", None, Some(50_i64)),
3544 ("started_wins", Some(500_i64), Some(200_i64)),
3545 ] {
3546 store
3547 .conn
3548 .execute(
3549 "INSERT INTO tool_spans
3550 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3551 VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3552 params![id, started, ended],
3553 )
3554 .unwrap();
3555 }
3556 let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3557 let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3558 assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3559 }
3560
3561 #[test]
3562 fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3563 let dir = TempDir::new().unwrap();
3564 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3565 store.upsert_session(&make_session("s1")).unwrap();
3566 for (id, started, ended) in [
3567 ("inside_started", Some(150_i64), None),
3568 ("inside_ended_only", None, Some(220_i64)),
3569 ("after_window", Some(400_i64), None),
3570 ("before_window", None, Some(50_i64)),
3571 ] {
3572 store
3573 .conn
3574 .execute(
3575 "INSERT INTO tool_spans
3576 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3577 VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3578 params![id, started, ended],
3579 )
3580 .unwrap();
3581 }
3582 let rows = store
3583 .tool_spans_sync_rows_in_window("/ws", 100, 300)
3584 .unwrap();
3585 let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
3586 assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
3587 assert!(rows.iter().all(|r| r.session_id == "s1"));
3588 }
3589
3590 #[test]
3591 fn upsert_idempotent() {
3592 let dir = TempDir::new().unwrap();
3593 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3594 let mut s = make_session("s3");
3595 store.upsert_session(&s).unwrap();
3596 s.status = SessionStatus::Running;
3597 store.upsert_session(&s).unwrap();
3598
3599 let got = store.get_session("s3").unwrap().unwrap();
3600 assert_eq!(got.status, SessionStatus::Running);
3601 }
3602
3603 #[test]
3604 fn append_event_indexes_path_from_payload() {
3605 let dir = TempDir::new().unwrap();
3606 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3607 store.upsert_session(&make_session("sx")).unwrap();
3608 let mut ev = make_event("sx", 0);
3609 ev.payload = json!({"input": {"path": "src/lib.rs"}});
3610 store.append_event(&ev).unwrap();
3611 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3612 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3613 }
3614
3615 #[test]
3616 fn update_session_status_changes_status() {
3617 let dir = TempDir::new().unwrap();
3618 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3619 store.upsert_session(&make_session("s6")).unwrap();
3620 store
3621 .update_session_status("s6", SessionStatus::Running)
3622 .unwrap();
3623 let got = store.get_session("s6").unwrap().unwrap();
3624 assert_eq!(got.status, SessionStatus::Running);
3625 }
3626
3627 #[test]
3628 fn prune_sessions_removes_old_rows_and_keeps_recent() {
3629 let dir = TempDir::new().unwrap();
3630 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3631 let mut old = make_session("old");
3632 old.started_at_ms = 1_000;
3633 let mut new = make_session("new");
3634 new.started_at_ms = 9_000_000_000_000;
3635 store.upsert_session(&old).unwrap();
3636 store.upsert_session(&new).unwrap();
3637 store.append_event(&make_event("old", 0)).unwrap();
3638
3639 let stats = store.prune_sessions_started_before(5_000).unwrap();
3640 assert_eq!(
3641 stats,
3642 PruneStats {
3643 sessions_removed: 1,
3644 events_removed: 1,
3645 }
3646 );
3647 assert!(store.get_session("old").unwrap().is_none());
3648 assert!(store.get_session("new").unwrap().is_some());
3649 let sessions = store.list_sessions("/ws").unwrap();
3650 assert_eq!(sessions.len(), 1);
3651 assert_eq!(sessions[0].id, "new");
3652 }
3653
3654 #[test]
3655 fn append_event_indexes_rules_from_payload() {
3656 let dir = TempDir::new().unwrap();
3657 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3658 store.upsert_session(&make_session("sr")).unwrap();
3659 let mut ev = make_event("sr", 0);
3660 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3661 store.append_event(&ev).unwrap();
3662 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3663 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3664 }
3665
3666 #[test]
3667 fn guidance_report_counts_skill_and_rule_sessions() {
3668 let dir = TempDir::new().unwrap();
3669 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3670 store.upsert_session(&make_session("sx")).unwrap();
3671 let mut ev = make_event("sx", 0);
3672 ev.payload =
3673 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3674 ev.cost_usd_e6 = Some(500_000);
3675 store.append_event(&ev).unwrap();
3676
3677 let mut skill_slugs = HashSet::new();
3678 skill_slugs.insert("tdd".into());
3679 let mut rule_slugs = HashSet::new();
3680 rule_slugs.insert("style".into());
3681
3682 let rep = store
3683 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3684 .unwrap();
3685 assert_eq!(rep.sessions_in_window, 1);
3686 let tdd = rep
3687 .rows
3688 .iter()
3689 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3690 .unwrap();
3691 assert_eq!(tdd.sessions, 1);
3692 assert!(tdd.on_disk);
3693 let style = rep
3694 .rows
3695 .iter()
3696 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3697 .unwrap();
3698 assert_eq!(style.sessions, 1);
3699 assert!(style.on_disk);
3700 }
3701
3702 #[test]
3703 fn prune_sessions_removes_rules_used_rows() {
3704 let dir = TempDir::new().unwrap();
3705 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3706 let mut old = make_session("old_r");
3707 old.started_at_ms = 1_000;
3708 store.upsert_session(&old).unwrap();
3709 let mut ev = make_event("old_r", 0);
3710 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3711 store.append_event(&ev).unwrap();
3712
3713 store.prune_sessions_started_before(5_000).unwrap();
3714 let n: i64 = store
3715 .conn
3716 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3717 .unwrap();
3718 assert_eq!(n, 0);
3719 }
3720}