1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
9use crate::store::tool_span_index::{
10 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
11};
12use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
13use crate::sync::context::SyncIngestContext;
14use crate::sync::outbound::outbound_event_from_row;
15use crate::sync::redact::redact_payload;
16use crate::sync::smart::enqueue_tool_spans_for_session;
17use anyhow::{Context, Result};
18use rusqlite::types::Value;
19use rusqlite::{
20 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
21};
22use std::cell::RefCell;
23use std::collections::{HashMap, HashSet};
24use std::path::{Path, PathBuf};
25
26const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
29const DEFAULT_MMAP_MB: u64 = 256;
30const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
31 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
32 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
33 repo_file_count, repo_total_loc FROM sessions";
34
35const MIGRATIONS: &[&str] = &[
36 "CREATE TABLE IF NOT EXISTS sessions (
37 id TEXT PRIMARY KEY,
38 agent TEXT NOT NULL,
39 model TEXT,
40 workspace TEXT NOT NULL,
41 started_at_ms INTEGER NOT NULL,
42 ended_at_ms INTEGER,
43 status TEXT NOT NULL,
44 trace_path TEXT NOT NULL
45 )",
46 "CREATE TABLE IF NOT EXISTS events (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 session_id TEXT NOT NULL,
49 seq INTEGER NOT NULL,
50 ts_ms INTEGER NOT NULL,
51 kind TEXT NOT NULL,
52 source TEXT NOT NULL,
53 tool TEXT,
54 tokens_in INTEGER,
55 tokens_out INTEGER,
56 cost_usd_e6 INTEGER,
57 payload TEXT NOT NULL
58 )",
59 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
60 "CREATE TABLE IF NOT EXISTS files_touched (
61 id INTEGER PRIMARY KEY AUTOINCREMENT,
62 session_id TEXT NOT NULL,
63 path TEXT NOT NULL
64 )",
65 "CREATE TABLE IF NOT EXISTS skills_used (
66 id INTEGER PRIMARY KEY AUTOINCREMENT,
67 session_id TEXT NOT NULL,
68 skill TEXT NOT NULL
69 )",
70 "CREATE TABLE IF NOT EXISTS sync_outbox (
71 id INTEGER PRIMARY KEY AUTOINCREMENT,
72 session_id TEXT NOT NULL,
73 payload TEXT NOT NULL,
74 sent INTEGER NOT NULL DEFAULT 0
75 )",
76 "CREATE TABLE IF NOT EXISTS experiments (
77 id TEXT PRIMARY KEY,
78 name TEXT NOT NULL,
79 created_at_ms INTEGER NOT NULL,
80 metadata TEXT NOT NULL DEFAULT '{}'
81 )",
82 "CREATE TABLE IF NOT EXISTS experiment_tags (
83 experiment_id TEXT NOT NULL,
84 session_id TEXT NOT NULL,
85 variant TEXT NOT NULL,
86 PRIMARY KEY (experiment_id, session_id)
87 )",
88 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
89 "CREATE TABLE IF NOT EXISTS sync_state (
90 k TEXT PRIMARY KEY,
91 v TEXT NOT NULL
92 )",
93 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
94 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
95 "CREATE TABLE IF NOT EXISTS tool_spans (
96 span_id TEXT PRIMARY KEY,
97 session_id TEXT NOT NULL,
98 tool TEXT,
99 tool_call_id TEXT,
100 status TEXT NOT NULL,
101 started_at_ms INTEGER,
102 ended_at_ms INTEGER,
103 lead_time_ms INTEGER,
104 tokens_in INTEGER,
105 tokens_out INTEGER,
106 reasoning_tokens INTEGER,
107 cost_usd_e6 INTEGER,
108 paths_json TEXT NOT NULL DEFAULT '[]'
109 )",
110 "CREATE TABLE IF NOT EXISTS tool_span_paths (
111 span_id TEXT NOT NULL,
112 path TEXT NOT NULL,
113 PRIMARY KEY (span_id, path)
114 )",
115 "CREATE TABLE IF NOT EXISTS session_repo_binding (
116 session_id TEXT PRIMARY KEY,
117 start_commit TEXT,
118 end_commit TEXT,
119 branch TEXT,
120 dirty_start INTEGER,
121 dirty_end INTEGER,
122 repo_binding_source TEXT NOT NULL DEFAULT ''
123 )",
124 "CREATE TABLE IF NOT EXISTS repo_snapshots (
125 id TEXT PRIMARY KEY,
126 workspace TEXT NOT NULL,
127 head_commit TEXT,
128 dirty_fingerprint TEXT NOT NULL,
129 analyzer_version TEXT NOT NULL,
130 indexed_at_ms INTEGER NOT NULL,
131 dirty INTEGER NOT NULL DEFAULT 0,
132 graph_path TEXT NOT NULL
133 )",
134 "CREATE TABLE IF NOT EXISTS file_facts (
135 snapshot_id TEXT NOT NULL,
136 path TEXT NOT NULL,
137 language TEXT NOT NULL,
138 bytes INTEGER NOT NULL,
139 loc INTEGER NOT NULL,
140 sloc INTEGER NOT NULL,
141 complexity_total INTEGER NOT NULL,
142 max_fn_complexity INTEGER NOT NULL,
143 symbol_count INTEGER NOT NULL,
144 import_count INTEGER NOT NULL,
145 fan_in INTEGER NOT NULL,
146 fan_out INTEGER NOT NULL,
147 churn_30d INTEGER NOT NULL,
148 churn_90d INTEGER NOT NULL,
149 authors_90d INTEGER NOT NULL,
150 last_changed_ms INTEGER,
151 PRIMARY KEY (snapshot_id, path)
152 )",
153 "CREATE TABLE IF NOT EXISTS repo_edges (
154 snapshot_id TEXT NOT NULL,
155 from_id TEXT NOT NULL,
156 to_id TEXT NOT NULL,
157 kind TEXT NOT NULL,
158 weight INTEGER NOT NULL,
159 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
160 )",
161 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
163 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
165 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
166 ON sessions(workspace, started_at_ms DESC, id ASC)",
167 "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
168 ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
169 "CREATE TABLE IF NOT EXISTS rules_used (
170 id INTEGER PRIMARY KEY AUTOINCREMENT,
171 session_id TEXT NOT NULL,
172 rule TEXT NOT NULL
173 )",
174 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
175 "CREATE TABLE IF NOT EXISTS remote_pull_state (
177 id INTEGER PRIMARY KEY CHECK (id = 1),
178 query_provider TEXT NOT NULL DEFAULT 'none',
179 cursor_json TEXT NOT NULL DEFAULT '',
180 last_success_ms INTEGER
181 )",
182 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
183 "CREATE TABLE IF NOT EXISTS remote_sessions (
184 team_id TEXT NOT NULL,
185 workspace_hash TEXT NOT NULL,
186 session_id_hash TEXT NOT NULL,
187 json TEXT NOT NULL,
188 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
189 )",
190 "CREATE TABLE IF NOT EXISTS remote_events (
191 team_id TEXT NOT NULL,
192 workspace_hash TEXT NOT NULL,
193 session_id_hash TEXT NOT NULL,
194 event_seq INTEGER NOT NULL,
195 json TEXT NOT NULL,
196 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
197 )",
198 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
199 team_id TEXT NOT NULL,
200 workspace_hash TEXT NOT NULL,
201 span_id_hash TEXT NOT NULL,
202 json TEXT NOT NULL,
203 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
204 )",
205 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
206 team_id TEXT NOT NULL,
207 workspace_hash TEXT NOT NULL,
208 snapshot_id_hash TEXT NOT NULL,
209 chunk_index INTEGER NOT NULL,
210 json TEXT NOT NULL,
211 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
212 )",
213 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
214 team_id TEXT NOT NULL,
215 workspace_hash TEXT NOT NULL,
216 fact_key TEXT NOT NULL,
217 json TEXT NOT NULL,
218 PRIMARY KEY (team_id, workspace_hash, fact_key)
219 )",
220 "CREATE TABLE IF NOT EXISTS session_evals (
221 id TEXT PRIMARY KEY,
222 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
223 judge_model TEXT NOT NULL,
224 rubric_id TEXT NOT NULL,
225 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
226 rationale TEXT NOT NULL,
227 flagged INTEGER NOT NULL DEFAULT 0,
228 created_at_ms INTEGER NOT NULL
229 );
230 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
231 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
232 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
233 fingerprint TEXT PRIMARY KEY,
234 captured_at_ms INTEGER NOT NULL,
235 files_json TEXT NOT NULL,
236 total_bytes INTEGER NOT NULL
237 )",
238 "CREATE TABLE IF NOT EXISTS session_feedback (
239 id TEXT PRIMARY KEY,
240 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
241 score INTEGER CHECK(score BETWEEN 1 AND 5),
242 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
243 note TEXT,
244 created_at_ms INTEGER NOT NULL
245 );
246 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
247 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
248 "CREATE TABLE IF NOT EXISTS session_outcomes (
249 session_id TEXT PRIMARY KEY NOT NULL,
250 test_passed INTEGER,
251 test_failed INTEGER,
252 test_skipped INTEGER,
253 build_ok INTEGER,
254 lint_errors INTEGER,
255 revert_lines_14d INTEGER,
256 pr_open INTEGER,
257 ci_ok INTEGER,
258 measured_at_ms INTEGER NOT NULL,
259 measure_error TEXT
260 )",
261 "CREATE TABLE IF NOT EXISTS session_samples (
262 session_id TEXT NOT NULL,
263 ts_ms INTEGER NOT NULL,
264 pid INTEGER NOT NULL,
265 cpu_percent REAL,
266 rss_bytes INTEGER,
267 PRIMARY KEY (session_id, ts_ms, pid)
268 )",
269 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
270 "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
271 "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
272 "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
273 "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
274 "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
275 "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
276];
277
278#[derive(Clone)]
280pub struct InsightsStats {
281 pub total_sessions: u64,
282 pub running_sessions: u64,
283 pub total_events: u64,
284 pub sessions_by_day: Vec<(String, u64)>,
286 pub recent: Vec<(SessionRecord, u64)>,
288 pub top_tools: Vec<(String, u64)>,
290 pub total_cost_usd_e6: i64,
291 pub sessions_with_cost: u64,
292}
293
294pub struct SyncStatusSnapshot {
296 pub pending_outbox: u64,
297 pub last_success_ms: Option<u64>,
298 pub last_error: Option<String>,
299 pub consecutive_failures: u32,
300}
301
302#[derive(serde::Serialize)]
304pub struct SummaryStats {
305 pub session_count: u64,
306 pub total_cost_usd_e6: i64,
307 pub by_agent: Vec<(String, u64)>,
308 pub by_model: Vec<(String, u64)>,
309 pub top_tools: Vec<(String, u64)>,
310}
311
312#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
314#[serde(rename_all = "lowercase")]
315pub enum GuidanceKind {
316 Skill,
317 Rule,
318}
319
320#[derive(Clone, Debug, serde::Serialize)]
322pub struct GuidancePerfRow {
323 pub kind: GuidanceKind,
324 pub id: String,
325 pub sessions: u64,
326 pub sessions_pct: f64,
327 pub total_cost_usd_e6: i64,
328 pub avg_cost_per_session_usd: Option<f64>,
329 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
330 pub on_disk: bool,
331}
332
333#[derive(Clone, Debug, serde::Serialize)]
335pub struct GuidanceReport {
336 pub workspace: String,
337 pub window_start_ms: u64,
338 pub window_end_ms: u64,
339 pub sessions_in_window: u64,
340 pub workspace_avg_cost_per_session_usd: Option<f64>,
341 pub rows: Vec<GuidancePerfRow>,
342}
343
344#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
346pub struct PruneStats {
347 pub sessions_removed: u64,
348 pub events_removed: u64,
349}
350
351#[derive(Debug, Clone, Eq, PartialEq)]
353pub struct SessionOutcomeRow {
354 pub session_id: String,
355 pub test_passed: Option<i64>,
356 pub test_failed: Option<i64>,
357 pub test_skipped: Option<i64>,
358 pub build_ok: Option<bool>,
359 pub lint_errors: Option<i64>,
360 pub revert_lines_14d: Option<i64>,
361 pub pr_open: Option<i64>,
362 pub ci_ok: Option<bool>,
363 pub measured_at_ms: u64,
364 pub measure_error: Option<String>,
365}
366
367#[derive(Debug, Clone)]
369pub struct SessionSampleAgg {
370 pub session_id: String,
371 pub sample_count: u64,
372 pub max_cpu_percent: f64,
373 pub max_rss_bytes: u64,
374}
375
376pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
378pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
379pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
380
381pub struct ToolSpanSyncRow {
382 pub span_id: String,
383 pub session_id: String,
384 pub tool: Option<String>,
385 pub tool_call_id: Option<String>,
386 pub status: String,
387 pub started_at_ms: Option<u64>,
388 pub ended_at_ms: Option<u64>,
389 pub lead_time_ms: Option<u64>,
390 pub tokens_in: Option<u32>,
391 pub tokens_out: Option<u32>,
392 pub reasoning_tokens: Option<u32>,
393 pub cost_usd_e6: Option<i64>,
394 pub paths: Vec<String>,
395}
396
397#[derive(Debug, Clone, Copy, Eq, PartialEq)]
398pub enum StoreOpenMode {
399 ReadWrite,
400 ReadOnlyQuery,
401}
402
403#[derive(Debug, Clone)]
404pub struct SessionStatusRow {
405 pub id: String,
406 pub status: SessionStatus,
407 pub ended_at_ms: Option<u64>,
408}
409
410#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
411pub struct SessionFilter {
412 pub agent_prefix: Option<String>,
413 pub status: Option<SessionStatus>,
414 pub since_ms: Option<u64>,
415}
416
417#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
418pub struct SessionPage {
419 pub rows: Vec<SessionRecord>,
420 pub total: usize,
421 pub next_offset: Option<usize>,
422}
423
424#[derive(Clone)]
425struct SpanTreeCacheEntry {
426 session_id: String,
427 last_event_seq: Option<u64>,
428 nodes: Vec<crate::store::span_tree::SpanNode>,
429}
430
431pub struct Store {
432 conn: Connection,
433 root: PathBuf,
434 hot_log: RefCell<Option<HotLog>>,
435 search_writer: RefCell<Option<crate::search::PendingWriter>>,
436 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
437 projector: RefCell<Projector>,
438}
439
440impl Store {
441 pub(crate) fn conn(&self) -> &Connection {
442 &self.conn
443 }
444
445 pub fn open(path: &Path) -> Result<Self> {
446 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
447 }
448
449 pub fn open_read_only(path: &Path) -> Result<Self> {
450 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
451 }
452
453 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
454 if let Some(parent) = path.parent() {
455 std::fs::create_dir_all(parent)?;
456 }
457 let conn = match mode {
458 StoreOpenMode::ReadWrite => Connection::open(path),
459 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
460 path,
461 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
462 ),
463 }
464 .with_context(|| format!("open db: {}", path.display()))?;
465 apply_pragmas(&conn, mode)?;
466 if mode == StoreOpenMode::ReadWrite {
467 for sql in MIGRATIONS {
468 conn.execute_batch(sql)?;
469 }
470 ensure_schema_columns(&conn)?;
471 }
472 let store = Self {
473 conn,
474 root: path
475 .parent()
476 .unwrap_or_else(|| Path::new("."))
477 .to_path_buf(),
478 hot_log: RefCell::new(None),
479 search_writer: RefCell::new(None),
480 span_tree_cache: RefCell::new(None),
481 projector: RefCell::new(Projector::default()),
482 };
483 if mode == StoreOpenMode::ReadWrite {
484 store.warm_projector()?;
485 }
486 Ok(store)
487 }
488
489 fn invalidate_span_tree_cache(&self) {
490 *self.span_tree_cache.borrow_mut() = None;
491 }
492
493 fn warm_projector(&self) -> Result<()> {
494 let ids = self.running_session_ids()?;
495 let mut projector = self.projector.borrow_mut();
496 for id in ids {
497 for event in self.list_events_for_session(&id)? {
498 let _ = projector.apply(&event);
499 }
500 }
501 Ok(())
502 }
503
504 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
505 self.conn.execute(
506 "INSERT INTO sessions (
507 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
508 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
509 prompt_fingerprint, parent_session_id, agent_version, os, arch,
510 repo_file_count, repo_total_loc
511 )
512 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
513 ?16, ?17, ?18, ?19, ?20, ?21)
514 ON CONFLICT(id) DO UPDATE SET
515 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
516 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
517 status=excluded.status, trace_path=excluded.trace_path,
518 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
519 branch=excluded.branch, dirty_start=excluded.dirty_start,
520 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
521 prompt_fingerprint=excluded.prompt_fingerprint,
522 parent_session_id=excluded.parent_session_id,
523 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
524 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
525 params![
526 s.id,
527 s.agent,
528 s.model,
529 s.workspace,
530 s.started_at_ms as i64,
531 s.ended_at_ms.map(|v| v as i64),
532 format!("{:?}", s.status),
533 s.trace_path,
534 s.start_commit,
535 s.end_commit,
536 s.branch,
537 s.dirty_start.map(bool_to_i64),
538 s.dirty_end.map(bool_to_i64),
539 s.repo_binding_source.clone().unwrap_or_default(),
540 s.prompt_fingerprint.as_deref(),
541 s.parent_session_id.as_deref(),
542 s.agent_version.as_deref(),
543 s.os.as_deref(),
544 s.arch.as_deref(),
545 s.repo_file_count.map(|v| v as i64),
546 s.repo_total_loc.map(|v| v as i64),
547 ],
548 )?;
549 self.conn.execute(
550 "INSERT INTO session_repo_binding (
551 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
552 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
553 ON CONFLICT(session_id) DO UPDATE SET
554 start_commit=excluded.start_commit,
555 end_commit=excluded.end_commit,
556 branch=excluded.branch,
557 dirty_start=excluded.dirty_start,
558 dirty_end=excluded.dirty_end,
559 repo_binding_source=excluded.repo_binding_source",
560 params![
561 s.id,
562 s.start_commit,
563 s.end_commit,
564 s.branch,
565 s.dirty_start.map(bool_to_i64),
566 s.dirty_end.map(bool_to_i64),
567 s.repo_binding_source.clone().unwrap_or_default(),
568 ],
569 )?;
570 Ok(())
571 }
572
573 pub fn ensure_session_stub(
576 &self,
577 id: &str,
578 agent: &str,
579 workspace: &str,
580 started_at_ms: u64,
581 ) -> Result<()> {
582 self.conn.execute(
583 "INSERT OR IGNORE INTO sessions (
584 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
585 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
586 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
587 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
588 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
589 params![id, agent, workspace, started_at_ms as i64],
590 )?;
591 Ok(())
592 }
593
594 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
596 let n: i64 = self.conn.query_row(
597 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
598 [session_id],
599 |r| r.get(0),
600 )?;
601 Ok(n as u64)
602 }
603
604 pub fn append_event(&self, e: &Event) -> Result<()> {
605 self.append_event_with_sync(e, None)
606 }
607
608 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
610 let last_before = if projector_legacy_mode() {
611 None
612 } else {
613 self.last_event_seq_for_session(&e.session_id)?
614 };
615 let payload = serde_json::to_string(&e.payload)?;
616 self.conn.execute(
617 "INSERT INTO events (
618 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
619 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
620 stop_reason, latency_ms, ttft_ms, retry_count,
621 context_used_tokens, context_max_tokens,
622 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
623 ) VALUES (
624 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
625 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
626 )
627 ON CONFLICT(session_id, seq) DO UPDATE SET
628 ts_ms = excluded.ts_ms,
629 ts_exact = excluded.ts_exact,
630 kind = excluded.kind,
631 source = excluded.source,
632 tool = excluded.tool,
633 tool_call_id = excluded.tool_call_id,
634 tokens_in = excluded.tokens_in,
635 tokens_out = excluded.tokens_out,
636 reasoning_tokens = excluded.reasoning_tokens,
637 cost_usd_e6 = excluded.cost_usd_e6,
638 payload = excluded.payload,
639 stop_reason = excluded.stop_reason,
640 latency_ms = excluded.latency_ms,
641 ttft_ms = excluded.ttft_ms,
642 retry_count = excluded.retry_count,
643 context_used_tokens = excluded.context_used_tokens,
644 context_max_tokens = excluded.context_max_tokens,
645 cache_creation_tokens = excluded.cache_creation_tokens,
646 cache_read_tokens = excluded.cache_read_tokens,
647 system_prompt_tokens = excluded.system_prompt_tokens",
648 params![
649 e.session_id,
650 e.seq as i64,
651 e.ts_ms as i64,
652 bool_to_i64(e.ts_exact),
653 format!("{:?}", e.kind),
654 format!("{:?}", e.source),
655 e.tool,
656 e.tool_call_id,
657 e.tokens_in.map(|v| v as i64),
658 e.tokens_out.map(|v| v as i64),
659 e.reasoning_tokens.map(|v| v as i64),
660 e.cost_usd_e6,
661 payload,
662 e.stop_reason,
663 e.latency_ms.map(|v| v as i64),
664 e.ttft_ms.map(|v| v as i64),
665 e.retry_count.map(|v| v as i64),
666 e.context_used_tokens.map(|v| v as i64),
667 e.context_max_tokens.map(|v| v as i64),
668 e.cache_creation_tokens.map(|v| v as i64),
669 e.cache_read_tokens.map(|v| v as i64),
670 e.system_prompt_tokens.map(|v| v as i64),
671 ],
672 )?;
673 if self.conn.changes() == 0 {
674 return Ok(());
675 }
676 self.append_hot_event(e)?;
677 if projector_legacy_mode() {
678 index_event_derived(&self.conn, e)?;
679 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
680 self.invalidate_span_tree_cache();
681 } else if last_before.is_some_and(|last| e.seq <= last) {
682 self.replay_projector_session(&e.session_id)?;
683 } else {
684 let deltas = self.projector.borrow_mut().apply(e);
685 self.apply_projector_events(&deltas)?;
686 let expired = self
687 .projector
688 .borrow_mut()
689 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
690 self.apply_projector_events(&expired)?;
691 if is_stop_event(e) {
692 let flushed = self
693 .projector
694 .borrow_mut()
695 .flush_session(&e.session_id, e.ts_ms);
696 self.apply_projector_events(&flushed)?;
697 }
698 self.invalidate_span_tree_cache();
699 }
700 self.append_search_event(e);
701 let Some(ctx) = ctx else {
702 return Ok(());
703 };
704 let sync = &ctx.sync;
705 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
706 return Ok(());
707 }
708 let Some(salt) = try_team_salt(sync) else {
709 tracing::warn!(
710 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
711 );
712 return Ok(());
713 };
714 if sync.sample_rate < 1.0 {
715 let u: f64 = rand::random();
716 if u > sync.sample_rate {
717 return Ok(());
718 }
719 }
720 let Some(session) = self.get_session(&e.session_id)? else {
721 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
722 return Ok(());
723 };
724 let mut outbound = outbound_event_from_row(e, &session, &salt);
725 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
726 let row = serde_json::to_string(&outbound)?;
727 self.outbox()?.append(&e.session_id, "events", &row)?;
728 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
729 Ok(())
730 }
731
732 fn append_hot_event(&self, e: &Event) -> Result<()> {
733 if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
734 return Ok(());
735 }
736 let mut slot = self.hot_log.borrow_mut();
737 if slot.is_none() {
738 *slot = Some(HotLog::open(&self.root)?);
739 }
740 if let Some(log) = slot.as_mut() {
741 log.append(e)?;
742 }
743 Ok(())
744 }
745
746 fn append_search_event(&self, e: &Event) {
747 if let Err(err) = self.try_append_search_event(e) {
748 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
749 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
750 }
751 }
752
753 fn try_append_search_event(&self, e: &Event) -> Result<()> {
754 let Some(session) = self.get_session(&e.session_id)? else {
755 return Ok(());
756 };
757 let workspace = PathBuf::from(&session.workspace);
758 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
759 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
760 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
761 return Ok(());
762 };
763 let mut slot = self.search_writer.borrow_mut();
764 if slot.is_none() {
765 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
766 }
767 slot.as_mut().expect("writer").add(&doc)
768 }
769
770 pub fn flush_search(&self) -> Result<()> {
771 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
772 writer.commit()?;
773 }
774 Ok(())
775 }
776
777 fn outbox(&self) -> Result<Outbox> {
778 Outbox::open(&self.root)
779 }
780
781 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
782 if projector_legacy_mode() {
783 rebuild_tool_spans_for_session(&self.conn, session_id)?;
784 self.invalidate_span_tree_cache();
785 return Ok(());
786 }
787 let deltas = self
788 .projector
789 .borrow_mut()
790 .flush_session(session_id, now_ms);
791 if self.apply_projector_events(&deltas)? {
792 self.invalidate_span_tree_cache();
793 }
794 Ok(())
795 }
796
797 fn replay_projector_session(&self, session_id: &str) -> Result<()> {
798 clear_session_spans(&self.conn, session_id)?;
799 self.projector.borrow_mut().reset_session(session_id);
800 let events = self.list_events_for_session(session_id)?;
801 let mut changed = false;
802 for event in &events {
803 let deltas = self.projector.borrow_mut().apply(event);
804 changed |= self.apply_projector_events(&deltas)?;
805 }
806 if self
807 .get_session(session_id)?
808 .is_some_and(|session| session.status == SessionStatus::Done)
809 {
810 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
811 let deltas = self
812 .projector
813 .borrow_mut()
814 .flush_session(session_id, now_ms);
815 changed |= self.apply_projector_events(&deltas)?;
816 }
817 if changed {
818 self.invalidate_span_tree_cache();
819 }
820 Ok(())
821 }
822
823 fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
824 let mut changed = false;
825 for delta in deltas {
826 match delta {
827 ProjectorEvent::SpanClosed(span, sample) => {
828 upsert_tool_span_record(&self.conn, span)?;
829 tracing::debug!(
830 session_id = %sample.session_id,
831 span_id = %sample.span_id,
832 tool = ?sample.tool,
833 lead_time_ms = ?sample.lead_time_ms,
834 tokens_in = ?sample.tokens_in,
835 tokens_out = ?sample.tokens_out,
836 reasoning_tokens = ?sample.reasoning_tokens,
837 cost_usd_e6 = ?sample.cost_usd_e6,
838 paths = ?sample.paths,
839 "tool span closed"
840 );
841 changed = true;
842 }
843 ProjectorEvent::SpanPatched(span) => {
844 upsert_tool_span_record(&self.conn, span)?;
845 changed = true;
846 }
847 ProjectorEvent::FileTouched { session, path } => {
848 self.conn.execute(
849 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
850 params![session, path],
851 )?;
852 changed = true;
853 }
854 ProjectorEvent::SkillUsed { session, skill } => {
855 self.conn.execute(
856 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
857 params![session, skill],
858 )?;
859 changed = true;
860 }
861 ProjectorEvent::RuleUsed { session, rule } => {
862 self.conn.execute(
863 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
864 params![session, rule],
865 )?;
866 changed = true;
867 }
868 }
869 }
870 Ok(changed)
871 }
872
873 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
874 let rows = self.outbox()?.list_pending(limit)?;
875 if !rows.is_empty() {
876 return Ok(rows);
877 }
878 let mut stmt = self.conn.prepare(
879 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
880 )?;
881 let rows = stmt.query_map(params![limit as i64], |row| {
882 Ok((
883 row.get::<_, i64>(0)?,
884 row.get::<_, String>(1)?,
885 row.get::<_, String>(2)?,
886 ))
887 })?;
888 let mut out = Vec::new();
889 for r in rows {
890 out.push(r?);
891 }
892 Ok(out)
893 }
894
895 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
896 self.outbox()?.delete_ids(ids)?;
897 for id in ids {
898 self.conn
899 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
900 }
901 Ok(())
902 }
903
904 pub fn replace_outbox_rows(
905 &self,
906 owner_id: &str,
907 kind: &str,
908 payloads: &[String],
909 ) -> Result<()> {
910 self.outbox()?.replace(owner_id, kind, payloads)?;
911 self.conn.execute(
912 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
913 params![owner_id, kind],
914 )?;
915 for payload in payloads {
916 self.conn.execute(
917 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
918 params![owner_id, kind, payload],
919 )?;
920 }
921 Ok(())
922 }
923
924 pub fn outbox_pending_count(&self) -> Result<u64> {
925 let redb = self.outbox()?.pending_count()?;
926 if redb > 0 {
927 return Ok(redb);
928 }
929 let c: i64 =
930 self.conn
931 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
932 r.get(0)
933 })?;
934 Ok(c as u64)
935 }
936
937 pub fn set_sync_state_ok(&self) -> Result<()> {
938 let now = now_ms().to_string();
939 self.conn.execute(
940 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
941 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
942 params![now],
943 )?;
944 self.conn.execute(
945 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
946 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
947 [],
948 )?;
949 self.conn
950 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
951 Ok(())
952 }
953
954 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
955 let prev: i64 = self
956 .conn
957 .query_row(
958 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
959 [],
960 |r| {
961 let s: String = r.get(0)?;
962 Ok(s.parse::<i64>().unwrap_or(0))
963 },
964 )
965 .optional()?
966 .unwrap_or(0);
967 let next = prev.saturating_add(1);
968 self.conn.execute(
969 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
970 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
971 params![msg],
972 )?;
973 self.conn.execute(
974 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
975 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
976 params![next.to_string()],
977 )?;
978 Ok(())
979 }
980
981 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
982 let pending_outbox = self.outbox_pending_count()?;
983 let last_success_ms = self
984 .conn
985 .query_row(
986 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
987 [],
988 |r| r.get::<_, String>(0),
989 )
990 .optional()?
991 .and_then(|s| s.parse().ok());
992 let last_error = self
993 .conn
994 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
995 r.get::<_, String>(0)
996 })
997 .optional()?;
998 let consecutive_failures = self
999 .conn
1000 .query_row(
1001 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1002 [],
1003 |r| r.get::<_, String>(0),
1004 )
1005 .optional()?
1006 .and_then(|s| s.parse().ok())
1007 .unwrap_or(0);
1008 Ok(SyncStatusSnapshot {
1009 pending_outbox,
1010 last_success_ms,
1011 last_error,
1012 consecutive_failures,
1013 })
1014 }
1015
1016 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1017 let row: Option<String> = self
1018 .conn
1019 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1020 r.get::<_, String>(0)
1021 })
1022 .optional()?;
1023 Ok(row.and_then(|s| s.parse().ok()))
1024 }
1025
1026 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1027 self.conn.execute(
1028 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1029 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1030 params![key, v.to_string()],
1031 )?;
1032 Ok(())
1033 }
1034
1035 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1037 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1038 let old_ids = old_session_ids(&tx, cutoff_ms)?;
1039 let sessions_to_remove: i64 = tx.query_row(
1040 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1041 params![cutoff_ms],
1042 |r| r.get(0),
1043 )?;
1044 let events_to_remove: i64 = tx.query_row(
1045 "SELECT COUNT(*) FROM events WHERE session_id IN \
1046 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1047 params![cutoff_ms],
1048 |r| r.get(0),
1049 )?;
1050
1051 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1052 tx.execute(
1053 &format!(
1054 "DELETE FROM tool_span_paths WHERE span_id IN \
1055 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1056 ),
1057 params![cutoff_ms],
1058 )?;
1059 tx.execute(
1060 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1061 params![cutoff_ms],
1062 )?;
1063 tx.execute(
1064 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1065 params![cutoff_ms],
1066 )?;
1067 tx.execute(
1068 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1069 params![cutoff_ms],
1070 )?;
1071 tx.execute(
1072 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1073 params![cutoff_ms],
1074 )?;
1075 tx.execute(
1076 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1077 params![cutoff_ms],
1078 )?;
1079 tx.execute(
1080 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1081 params![cutoff_ms],
1082 )?;
1083 tx.execute(
1084 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1085 params![cutoff_ms],
1086 )?;
1087 tx.execute(
1088 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1089 params![cutoff_ms],
1090 )?;
1091 tx.execute(
1092 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1093 params![cutoff_ms],
1094 )?;
1095 tx.execute(
1096 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1097 params![cutoff_ms],
1098 )?;
1099 tx.execute(
1100 "DELETE FROM sessions WHERE started_at_ms < ?1",
1101 params![cutoff_ms],
1102 )?;
1103 tx.commit()?;
1104 if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1105 let _ = writer.commit();
1106 }
1107 if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1108 tracing::warn!("search prune skipped: {err:#}");
1109 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1110 }
1111 self.invalidate_span_tree_cache();
1112 Ok(PruneStats {
1113 sessions_removed: sessions_to_remove as u64,
1114 events_removed: events_to_remove as u64,
1115 })
1116 }
1117
1118 pub fn vacuum(&self) -> Result<()> {
1120 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1121 Ok(())
1122 }
1123
1124 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1125 Ok(self
1126 .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1127 .rows)
1128 }
1129
1130 pub fn list_sessions_page(
1131 &self,
1132 workspace: &str,
1133 offset: usize,
1134 limit: usize,
1135 filter: SessionFilter,
1136 ) -> Result<SessionPage> {
1137 let (where_sql, args) = session_filter_sql(workspace, &filter);
1138 let total = self.query_session_page_count(&where_sql, &args)?;
1139 let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1140 let next = offset.saturating_add(rows.len());
1141 Ok(SessionPage {
1142 rows,
1143 total,
1144 next_offset: (next < total).then_some(next),
1145 })
1146 }
1147
1148 fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1149 let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1150 let total: i64 = self
1151 .conn
1152 .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1153 Ok(total as usize)
1154 }
1155
1156 fn query_session_page_rows(
1157 &self,
1158 where_sql: &str,
1159 args: &[Value],
1160 offset: usize,
1161 limit: usize,
1162 ) -> Result<Vec<SessionRecord>> {
1163 let sql = format!(
1164 "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1165 );
1166 let mut values = args.to_vec();
1167 values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1168 values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1169 let mut stmt = self.conn.prepare(&sql)?;
1170 let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1171 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1172 }
1173
1174 pub fn list_sessions_started_after(
1175 &self,
1176 workspace: &str,
1177 after_started_at_ms: u64,
1178 ) -> Result<Vec<SessionRecord>> {
1179 let mut stmt = self.conn.prepare(
1180 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1181 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1182 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1183 repo_file_count, repo_total_loc
1184 FROM sessions
1185 WHERE workspace = ?1 AND started_at_ms > ?2
1186 ORDER BY started_at_ms DESC, id ASC",
1187 )?;
1188 let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1189 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1190 }
1191
1192 pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1193 if ids.is_empty() {
1194 return Ok(Vec::new());
1195 }
1196 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1197 let sql =
1198 format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1199 let mut stmt = self.conn.prepare(&sql)?;
1200 let params: Vec<&dyn rusqlite::ToSql> =
1201 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1202 let rows = stmt.query_map(params.as_slice(), |r| {
1203 let status: String = r.get(1)?;
1204 Ok(SessionStatusRow {
1205 id: r.get(0)?,
1206 status: status_from_str(&status),
1207 ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1208 })
1209 })?;
1210 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1211 }
1212
1213 fn running_session_ids(&self) -> Result<Vec<String>> {
1214 let mut stmt = self
1215 .conn
1216 .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1217 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1218 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1219 }
1220
1221 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1222 let session_count: i64 = self.conn.query_row(
1223 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1224 params![workspace],
1225 |r| r.get(0),
1226 )?;
1227
1228 let total_cost: i64 = self.conn.query_row(
1229 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1230 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1231 params![workspace],
1232 |r| r.get(0),
1233 )?;
1234
1235 let mut stmt = self.conn.prepare(
1236 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1237 )?;
1238 let by_agent: Vec<(String, u64)> = stmt
1239 .query_map(params![workspace], |r| {
1240 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1241 })?
1242 .filter_map(|r| r.ok())
1243 .collect();
1244
1245 let mut stmt = self.conn.prepare(
1246 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1247 )?;
1248 let by_model: Vec<(String, u64)> = stmt
1249 .query_map(params![workspace], |r| {
1250 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1251 })?
1252 .filter_map(|r| r.ok())
1253 .collect();
1254
1255 let mut stmt = self.conn.prepare(
1256 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1257 WHERE s.workspace = ?1 AND tool IS NOT NULL
1258 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1259 )?;
1260 let top_tools: Vec<(String, u64)> = stmt
1261 .query_map(params![workspace], |r| {
1262 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1263 })?
1264 .filter_map(|r| r.ok())
1265 .collect();
1266
1267 Ok(SummaryStats {
1268 session_count: session_count as u64,
1269 total_cost_usd_e6: total_cost,
1270 by_agent,
1271 by_model,
1272 top_tools,
1273 })
1274 }
1275
1276 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1277 self.list_events_page(session_id, 0, i64::MAX as usize)
1278 }
1279
1280 pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1281 let mut stmt = self.conn.prepare(
1282 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1283 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1284 stop_reason, latency_ms, ttft_ms, retry_count,
1285 context_used_tokens, context_max_tokens,
1286 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1287 FROM events WHERE session_id = ?1 AND seq = ?2",
1288 )?;
1289 stmt.query_row(params![session_id, seq as i64], event_row)
1290 .optional()
1291 .map_err(Into::into)
1292 }
1293
1294 pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1295 let mut out = Vec::new();
1296 for session in self.list_sessions(workspace)? {
1297 for event in self.list_events_for_session(&session.id)? {
1298 out.push((session.clone(), event));
1299 }
1300 }
1301 out.sort_by(|a, b| {
1302 (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1303 });
1304 Ok(out)
1305 }
1306
1307 pub fn list_events_page(
1308 &self,
1309 session_id: &str,
1310 after_seq: u64,
1311 limit: usize,
1312 ) -> Result<Vec<Event>> {
1313 let mut stmt = self.conn.prepare(
1314 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1315 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1316 stop_reason, latency_ms, ttft_ms, retry_count,
1317 context_used_tokens, context_max_tokens,
1318 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1319 FROM events
1320 WHERE session_id = ?1 AND seq >= ?2
1321 ORDER BY seq ASC LIMIT ?3",
1322 )?;
1323 let rows = stmt.query_map(
1324 params![
1325 session_id,
1326 after_seq as i64,
1327 limit.min(i64::MAX as usize) as i64
1328 ],
1329 event_row,
1330 )?;
1331 let mut events = Vec::new();
1332 for row in rows {
1333 events.push(row?);
1334 }
1335 Ok(events)
1336 }
1337
1338 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1340 self.conn.execute(
1341 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1342 params![format!("{:?}", status), id],
1343 )?;
1344 Ok(())
1345 }
1346
1347 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1349 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1350 Ok(InsightsStats {
1351 total_sessions: count_q(
1352 &self.conn,
1353 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1354 workspace,
1355 )?,
1356 running_sessions: count_q(
1357 &self.conn,
1358 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1359 workspace,
1360 )?,
1361 total_events: count_q(
1362 &self.conn,
1363 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1364 workspace,
1365 )?,
1366 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1367 recent: recent_sessions_3(&self.conn, workspace)?,
1368 top_tools: top_tools_5(&self.conn, workspace)?,
1369 total_cost_usd_e6,
1370 sessions_with_cost,
1371 })
1372 }
1373
1374 pub fn retro_events_in_window(
1376 &self,
1377 workspace: &str,
1378 start_ms: u64,
1379 end_ms: u64,
1380 ) -> Result<Vec<(SessionRecord, Event)>> {
1381 let mut stmt = self.conn.prepare(
1382 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1383 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1384 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1385 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1386 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1387 s.repo_file_count, s.repo_total_loc,
1388 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1389 e.context_used_tokens, e.context_max_tokens,
1390 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1391 FROM events e
1392 JOIN sessions s ON s.id = e.session_id
1393 WHERE s.workspace = ?1
1394 AND (
1395 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1396 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1397 )
1398 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1399 )?;
1400 let rows = stmt.query_map(
1401 params![
1402 workspace,
1403 start_ms as i64,
1404 end_ms as i64,
1405 SYNTHETIC_TS_CEILING_MS,
1406 ],
1407 |row| {
1408 let payload_str: String = row.get(12)?;
1409 let status_str: String = row.get(19)?;
1410 Ok((
1411 SessionRecord {
1412 id: row.get(13)?,
1413 agent: row.get(14)?,
1414 model: row.get(15)?,
1415 workspace: row.get(16)?,
1416 started_at_ms: row.get::<_, i64>(17)? as u64,
1417 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1418 status: status_from_str(&status_str),
1419 trace_path: row.get(20)?,
1420 start_commit: row.get(21)?,
1421 end_commit: row.get(22)?,
1422 branch: row.get(23)?,
1423 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1424 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1425 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1426 prompt_fingerprint: row.get(27)?,
1427 parent_session_id: row.get(28)?,
1428 agent_version: row.get(29)?,
1429 os: row.get(30)?,
1430 arch: row.get(31)?,
1431 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1432 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1433 },
1434 Event {
1435 session_id: row.get(0)?,
1436 seq: row.get::<_, i64>(1)? as u64,
1437 ts_ms: row.get::<_, i64>(2)? as u64,
1438 ts_exact: row.get::<_, i64>(3)? != 0,
1439 kind: kind_from_str(&row.get::<_, String>(4)?),
1440 source: source_from_str(&row.get::<_, String>(5)?),
1441 tool: row.get(6)?,
1442 tool_call_id: row.get(7)?,
1443 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1444 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1445 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1446 cost_usd_e6: row.get(11)?,
1447 payload: serde_json::from_str(&payload_str)
1448 .unwrap_or(serde_json::Value::Null),
1449 stop_reason: row.get(34)?,
1450 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1451 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1452 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1453 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1454 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1455 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1456 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1457 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1458 },
1459 ))
1460 },
1461 )?;
1462
1463 let mut out = Vec::new();
1464 for r in rows {
1465 out.push(r?);
1466 }
1467 Ok(out)
1468 }
1469
1470 pub fn files_touched_in_window(
1472 &self,
1473 workspace: &str,
1474 start_ms: u64,
1475 end_ms: u64,
1476 ) -> Result<Vec<(String, String)>> {
1477 let mut stmt = self.conn.prepare(
1478 "SELECT DISTINCT ft.session_id, ft.path
1479 FROM files_touched ft
1480 JOIN sessions s ON s.id = ft.session_id
1481 WHERE s.workspace = ?1
1482 AND EXISTS (
1483 SELECT 1 FROM events e
1484 JOIN sessions ss ON ss.id = e.session_id
1485 WHERE e.session_id = ft.session_id
1486 AND (
1487 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1488 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1489 )
1490 )
1491 ORDER BY ft.session_id, ft.path",
1492 )?;
1493 let out: Vec<(String, String)> = stmt
1494 .query_map(
1495 params![
1496 workspace,
1497 start_ms as i64,
1498 end_ms as i64,
1499 SYNTHETIC_TS_CEILING_MS,
1500 ],
1501 |r| Ok((r.get(0)?, r.get(1)?)),
1502 )?
1503 .filter_map(|r| r.ok())
1504 .collect();
1505 Ok(out)
1506 }
1507
1508 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1511 let mut stmt = self.conn.prepare(
1512 "SELECT DISTINCT su.skill
1513 FROM skills_used su
1514 JOIN sessions s ON s.id = su.session_id
1515 WHERE s.workspace = ?1
1516 AND EXISTS (
1517 SELECT 1 FROM events e
1518 JOIN sessions ss ON ss.id = e.session_id
1519 WHERE e.session_id = su.session_id
1520 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1521 )
1522 ORDER BY su.skill",
1523 )?;
1524 let out: Vec<String> = stmt
1525 .query_map(
1526 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1527 |r| r.get::<_, String>(0),
1528 )?
1529 .filter_map(|r| r.ok())
1530 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1531 .collect();
1532 Ok(out)
1533 }
1534
1535 pub fn skills_used_in_window(
1537 &self,
1538 workspace: &str,
1539 start_ms: u64,
1540 end_ms: u64,
1541 ) -> Result<Vec<(String, String)>> {
1542 let mut stmt = self.conn.prepare(
1543 "SELECT DISTINCT su.session_id, su.skill
1544 FROM skills_used su
1545 JOIN sessions s ON s.id = su.session_id
1546 WHERE s.workspace = ?1
1547 AND EXISTS (
1548 SELECT 1 FROM events e
1549 JOIN sessions ss ON ss.id = e.session_id
1550 WHERE e.session_id = su.session_id
1551 AND (
1552 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1553 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1554 )
1555 )
1556 ORDER BY su.session_id, su.skill",
1557 )?;
1558 let out: Vec<(String, String)> = stmt
1559 .query_map(
1560 params![
1561 workspace,
1562 start_ms as i64,
1563 end_ms as i64,
1564 SYNTHETIC_TS_CEILING_MS,
1565 ],
1566 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1567 )?
1568 .filter_map(|r| r.ok())
1569 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1570 .collect();
1571 Ok(out)
1572 }
1573
1574 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1576 let mut stmt = self.conn.prepare(
1577 "SELECT DISTINCT ru.rule
1578 FROM rules_used ru
1579 JOIN sessions s ON s.id = ru.session_id
1580 WHERE s.workspace = ?1
1581 AND EXISTS (
1582 SELECT 1 FROM events e
1583 JOIN sessions ss ON ss.id = e.session_id
1584 WHERE e.session_id = ru.session_id
1585 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1586 )
1587 ORDER BY ru.rule",
1588 )?;
1589 let out: Vec<String> = stmt
1590 .query_map(
1591 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1592 |r| r.get::<_, String>(0),
1593 )?
1594 .filter_map(|r| r.ok())
1595 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1596 .collect();
1597 Ok(out)
1598 }
1599
1600 pub fn rules_used_in_window(
1602 &self,
1603 workspace: &str,
1604 start_ms: u64,
1605 end_ms: u64,
1606 ) -> Result<Vec<(String, String)>> {
1607 let mut stmt = self.conn.prepare(
1608 "SELECT DISTINCT ru.session_id, ru.rule
1609 FROM rules_used ru
1610 JOIN sessions s ON s.id = ru.session_id
1611 WHERE s.workspace = ?1
1612 AND EXISTS (
1613 SELECT 1 FROM events e
1614 JOIN sessions ss ON ss.id = e.session_id
1615 WHERE e.session_id = ru.session_id
1616 AND (
1617 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1618 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1619 )
1620 )
1621 ORDER BY ru.session_id, ru.rule",
1622 )?;
1623 let out: Vec<(String, String)> = stmt
1624 .query_map(
1625 params![
1626 workspace,
1627 start_ms as i64,
1628 end_ms as i64,
1629 SYNTHETIC_TS_CEILING_MS,
1630 ],
1631 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1632 )?
1633 .filter_map(|r| r.ok())
1634 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1635 .collect();
1636 Ok(out)
1637 }
1638
1639 pub fn sessions_active_in_window(
1641 &self,
1642 workspace: &str,
1643 start_ms: u64,
1644 end_ms: u64,
1645 ) -> Result<HashSet<String>> {
1646 let mut stmt = self.conn.prepare(
1647 "SELECT DISTINCT s.id
1648 FROM sessions s
1649 WHERE s.workspace = ?1
1650 AND EXISTS (
1651 SELECT 1 FROM events e
1652 WHERE e.session_id = s.id
1653 AND (
1654 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1655 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1656 )
1657 )",
1658 )?;
1659 let out: HashSet<String> = stmt
1660 .query_map(
1661 params![
1662 workspace,
1663 start_ms as i64,
1664 end_ms as i64,
1665 SYNTHETIC_TS_CEILING_MS,
1666 ],
1667 |r| r.get(0),
1668 )?
1669 .filter_map(|r| r.ok())
1670 .collect();
1671 Ok(out)
1672 }
1673
1674 pub fn session_costs_usd_e6_in_window(
1676 &self,
1677 workspace: &str,
1678 start_ms: u64,
1679 end_ms: u64,
1680 ) -> Result<HashMap<String, i64>> {
1681 let mut stmt = self.conn.prepare(
1682 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1683 FROM events e
1684 JOIN sessions s ON s.id = e.session_id
1685 WHERE s.workspace = ?1
1686 AND (
1687 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1688 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1689 )
1690 GROUP BY e.session_id",
1691 )?;
1692 let rows: Vec<(String, i64)> = stmt
1693 .query_map(
1694 params![
1695 workspace,
1696 start_ms as i64,
1697 end_ms as i64,
1698 SYNTHETIC_TS_CEILING_MS,
1699 ],
1700 |r| Ok((r.get(0)?, r.get(1)?)),
1701 )?
1702 .filter_map(|r| r.ok())
1703 .collect();
1704 Ok(rows.into_iter().collect())
1705 }
1706
1707 pub fn guidance_report(
1709 &self,
1710 workspace: &str,
1711 window_start_ms: u64,
1712 window_end_ms: u64,
1713 skill_slugs_on_disk: &HashSet<String>,
1714 rule_slugs_on_disk: &HashSet<String>,
1715 ) -> Result<GuidanceReport> {
1716 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1717 let denom = active.len() as u64;
1718 let costs =
1719 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1720
1721 let workspace_avg_cost_per_session_usd = if denom > 0 {
1722 let total_e6: i64 = active
1723 .iter()
1724 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1725 .sum();
1726 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1727 } else {
1728 None
1729 };
1730
1731 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1732 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1733 skill_sessions.entry(skill).or_default().insert(sid);
1734 }
1735 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1736 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1737 rule_sessions.entry(rule).or_default().insert(sid);
1738 }
1739
1740 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1741
1742 let mut push_row =
1743 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1744 let sessions = sids.len() as u64;
1745 let sessions_pct = if denom > 0 {
1746 sessions as f64 * 100.0 / denom as f64
1747 } else {
1748 0.0
1749 };
1750 let total_cost_usd_e6: i64 = sids
1751 .iter()
1752 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1753 .sum();
1754 let avg_cost_per_session_usd = if sessions > 0 {
1755 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1756 } else {
1757 None
1758 };
1759 let vs_workspace_avg_cost_per_session_usd =
1760 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1761 (Some(avg), Some(w)) => Some(avg - w),
1762 _ => None,
1763 };
1764 rows.push(GuidancePerfRow {
1765 kind,
1766 id,
1767 sessions,
1768 sessions_pct,
1769 total_cost_usd_e6,
1770 avg_cost_per_session_usd,
1771 vs_workspace_avg_cost_per_session_usd,
1772 on_disk,
1773 });
1774 };
1775
1776 let mut seen_skills: HashSet<String> = HashSet::new();
1777 for (id, sids) in &skill_sessions {
1778 seen_skills.insert(id.clone());
1779 push_row(
1780 GuidanceKind::Skill,
1781 id.clone(),
1782 sids,
1783 skill_slugs_on_disk.contains(id),
1784 );
1785 }
1786 for slug in skill_slugs_on_disk {
1787 if seen_skills.contains(slug) {
1788 continue;
1789 }
1790 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1791 }
1792
1793 let mut seen_rules: HashSet<String> = HashSet::new();
1794 for (id, sids) in &rule_sessions {
1795 seen_rules.insert(id.clone());
1796 push_row(
1797 GuidanceKind::Rule,
1798 id.clone(),
1799 sids,
1800 rule_slugs_on_disk.contains(id),
1801 );
1802 }
1803 for slug in rule_slugs_on_disk {
1804 if seen_rules.contains(slug) {
1805 continue;
1806 }
1807 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1808 }
1809
1810 rows.sort_by(|a, b| {
1811 b.sessions
1812 .cmp(&a.sessions)
1813 .then_with(|| a.kind.cmp(&b.kind))
1814 .then_with(|| a.id.cmp(&b.id))
1815 });
1816
1817 Ok(GuidanceReport {
1818 workspace: workspace.to_string(),
1819 window_start_ms,
1820 window_end_ms,
1821 sessions_in_window: denom,
1822 workspace_avg_cost_per_session_usd,
1823 rows,
1824 })
1825 }
1826
1827 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1828 let mut stmt = self.conn.prepare(
1829 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1830 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1831 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1832 repo_file_count, repo_total_loc
1833 FROM sessions WHERE id = ?1",
1834 )?;
1835 let mut rows = stmt.query_map(params![id], |row| {
1836 Ok((
1837 row.get::<_, String>(0)?,
1838 row.get::<_, String>(1)?,
1839 row.get::<_, Option<String>>(2)?,
1840 row.get::<_, String>(3)?,
1841 row.get::<_, i64>(4)?,
1842 row.get::<_, Option<i64>>(5)?,
1843 row.get::<_, String>(6)?,
1844 row.get::<_, String>(7)?,
1845 row.get::<_, Option<String>>(8)?,
1846 row.get::<_, Option<String>>(9)?,
1847 row.get::<_, Option<String>>(10)?,
1848 row.get::<_, Option<i64>>(11)?,
1849 row.get::<_, Option<i64>>(12)?,
1850 row.get::<_, String>(13)?,
1851 row.get::<_, Option<String>>(14)?,
1852 row.get::<_, Option<String>>(15)?,
1853 row.get::<_, Option<String>>(16)?,
1854 row.get::<_, Option<String>>(17)?,
1855 row.get::<_, Option<String>>(18)?,
1856 row.get::<_, Option<i64>>(19)?,
1857 row.get::<_, Option<i64>>(20)?,
1858 ))
1859 })?;
1860
1861 if let Some(row) = rows.next() {
1862 let (
1863 id,
1864 agent,
1865 model,
1866 workspace,
1867 started,
1868 ended,
1869 status_str,
1870 trace,
1871 start_commit,
1872 end_commit,
1873 branch,
1874 dirty_start,
1875 dirty_end,
1876 source,
1877 prompt_fingerprint,
1878 parent_session_id,
1879 agent_version,
1880 os,
1881 arch,
1882 repo_file_count,
1883 repo_total_loc,
1884 ) = row?;
1885 Ok(Some(SessionRecord {
1886 id,
1887 agent,
1888 model,
1889 workspace,
1890 started_at_ms: started as u64,
1891 ended_at_ms: ended.map(|v| v as u64),
1892 status: status_from_str(&status_str),
1893 trace_path: trace,
1894 start_commit,
1895 end_commit,
1896 branch,
1897 dirty_start: dirty_start.map(i64_to_bool),
1898 dirty_end: dirty_end.map(i64_to_bool),
1899 repo_binding_source: empty_to_none(source),
1900 prompt_fingerprint,
1901 parent_session_id,
1902 agent_version,
1903 os,
1904 arch,
1905 repo_file_count: repo_file_count.map(|v| v as u32),
1906 repo_total_loc: repo_total_loc.map(|v| v as u64),
1907 }))
1908 } else {
1909 Ok(None)
1910 }
1911 }
1912
1913 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1914 let mut stmt = self.conn.prepare(
1915 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1916 indexed_at_ms, dirty, graph_path
1917 FROM repo_snapshots WHERE workspace = ?1
1918 ORDER BY indexed_at_ms DESC LIMIT 1",
1919 )?;
1920 let mut rows = stmt.query_map(params![workspace], |row| {
1921 Ok(RepoSnapshotRecord {
1922 id: row.get(0)?,
1923 workspace: row.get(1)?,
1924 head_commit: row.get(2)?,
1925 dirty_fingerprint: row.get(3)?,
1926 analyzer_version: row.get(4)?,
1927 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1928 dirty: row.get::<_, i64>(6)? != 0,
1929 graph_path: row.get(7)?,
1930 })
1931 })?;
1932 Ok(rows.next().transpose()?)
1933 }
1934
1935 pub fn save_repo_snapshot(
1936 &self,
1937 snapshot: &RepoSnapshotRecord,
1938 facts: &[FileFact],
1939 edges: &[RepoEdge],
1940 ) -> Result<()> {
1941 self.conn.execute(
1942 "INSERT INTO repo_snapshots (
1943 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1944 indexed_at_ms, dirty, graph_path
1945 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1946 ON CONFLICT(id) DO UPDATE SET
1947 workspace=excluded.workspace,
1948 head_commit=excluded.head_commit,
1949 dirty_fingerprint=excluded.dirty_fingerprint,
1950 analyzer_version=excluded.analyzer_version,
1951 indexed_at_ms=excluded.indexed_at_ms,
1952 dirty=excluded.dirty,
1953 graph_path=excluded.graph_path",
1954 params![
1955 snapshot.id,
1956 snapshot.workspace,
1957 snapshot.head_commit,
1958 snapshot.dirty_fingerprint,
1959 snapshot.analyzer_version,
1960 snapshot.indexed_at_ms as i64,
1961 bool_to_i64(snapshot.dirty),
1962 snapshot.graph_path,
1963 ],
1964 )?;
1965 self.conn.execute(
1966 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1967 params![snapshot.id],
1968 )?;
1969 self.conn.execute(
1970 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1971 params![snapshot.id],
1972 )?;
1973 for fact in facts {
1974 self.conn.execute(
1975 "INSERT INTO file_facts (
1976 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1977 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1978 churn_30d, churn_90d, authors_90d, last_changed_ms
1979 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1980 params![
1981 fact.snapshot_id,
1982 fact.path,
1983 fact.language,
1984 fact.bytes as i64,
1985 fact.loc as i64,
1986 fact.sloc as i64,
1987 fact.complexity_total as i64,
1988 fact.max_fn_complexity as i64,
1989 fact.symbol_count as i64,
1990 fact.import_count as i64,
1991 fact.fan_in as i64,
1992 fact.fan_out as i64,
1993 fact.churn_30d as i64,
1994 fact.churn_90d as i64,
1995 fact.authors_90d as i64,
1996 fact.last_changed_ms.map(|v| v as i64),
1997 ],
1998 )?;
1999 }
2000 for edge in edges {
2001 self.conn.execute(
2002 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2003 VALUES (?1, ?2, ?3, ?4, ?5)
2004 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2005 DO UPDATE SET weight = weight + excluded.weight",
2006 params![
2007 snapshot.id,
2008 edge.from_path,
2009 edge.to_path,
2010 edge.kind,
2011 edge.weight as i64,
2012 ],
2013 )?;
2014 }
2015 Ok(())
2016 }
2017
2018 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2019 let mut stmt = self.conn.prepare(
2020 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2021 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2022 churn_30d, churn_90d, authors_90d, last_changed_ms
2023 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2024 )?;
2025 let rows = stmt.query_map(params![snapshot_id], |row| {
2026 Ok(FileFact {
2027 snapshot_id: row.get(0)?,
2028 path: row.get(1)?,
2029 language: row.get(2)?,
2030 bytes: row.get::<_, i64>(3)? as u64,
2031 loc: row.get::<_, i64>(4)? as u32,
2032 sloc: row.get::<_, i64>(5)? as u32,
2033 complexity_total: row.get::<_, i64>(6)? as u32,
2034 max_fn_complexity: row.get::<_, i64>(7)? as u32,
2035 symbol_count: row.get::<_, i64>(8)? as u32,
2036 import_count: row.get::<_, i64>(9)? as u32,
2037 fan_in: row.get::<_, i64>(10)? as u32,
2038 fan_out: row.get::<_, i64>(11)? as u32,
2039 churn_30d: row.get::<_, i64>(12)? as u32,
2040 churn_90d: row.get::<_, i64>(13)? as u32,
2041 authors_90d: row.get::<_, i64>(14)? as u32,
2042 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2043 })
2044 })?;
2045 Ok(rows.filter_map(|row| row.ok()).collect())
2046 }
2047
2048 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2049 let mut stmt = self.conn.prepare(
2050 "SELECT from_id, to_id, kind, weight
2051 FROM repo_edges WHERE snapshot_id = ?1
2052 ORDER BY kind, from_id, to_id",
2053 )?;
2054 let rows = stmt.query_map(params![snapshot_id], |row| {
2055 Ok(RepoEdge {
2056 from_path: row.get(0)?,
2057 to_path: row.get(1)?,
2058 kind: row.get(2)?,
2059 weight: row.get::<_, i64>(3)? as u32,
2060 })
2061 })?;
2062 Ok(rows.filter_map(|row| row.ok()).collect())
2063 }
2064
2065 pub fn tool_spans_in_window(
2066 &self,
2067 workspace: &str,
2068 start_ms: u64,
2069 end_ms: u64,
2070 ) -> Result<Vec<ToolSpanView>> {
2071 let mut stmt = self.conn.prepare(
2072 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2073 reasoning_tokens, cost_usd_e6, paths_json,
2074 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2075 FROM (
2076 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2077 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2078 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2079 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2080 ts.started_at_ms AS sort_ms
2081 FROM tool_spans ts
2082 JOIN sessions s ON s.id = ts.session_id
2083 WHERE s.workspace = ?1
2084 AND ts.started_at_ms >= ?2
2085 AND ts.started_at_ms <= ?3
2086 UNION ALL
2087 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2088 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2089 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2090 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2091 ts.ended_at_ms AS sort_ms
2092 FROM tool_spans ts
2093 JOIN sessions s ON s.id = ts.session_id
2094 WHERE s.workspace = ?1
2095 AND ts.started_at_ms IS NULL
2096 AND ts.ended_at_ms >= ?2
2097 AND ts.ended_at_ms <= ?3
2098 )
2099 ORDER BY sort_ms DESC",
2100 )?;
2101 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2102 let paths_json: String = row.get(8)?;
2103 Ok(ToolSpanView {
2104 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2105 tool: row
2106 .get::<_, Option<String>>(1)?
2107 .unwrap_or_else(|| "unknown".into()),
2108 status: row.get(2)?,
2109 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2110 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2111 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2112 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2113 cost_usd_e6: row.get(7)?,
2114 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2115 parent_span_id: row.get(9)?,
2116 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2117 subtree_cost_usd_e6: row.get(11)?,
2118 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2119 })
2120 })?;
2121 Ok(rows.filter_map(|row| row.ok()).collect())
2122 }
2123
2124 pub fn session_span_tree(
2125 &self,
2126 session_id: &str,
2127 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2128 let last_event_seq = self.last_event_seq_for_session(session_id)?;
2129 if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2130 && entry.session_id == session_id
2131 && entry.last_event_seq == last_event_seq
2132 {
2133 return Ok(entry.nodes.clone());
2134 }
2135 let mut stmt = self.conn.prepare(
2136 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2137 reasoning_tokens, cost_usd_e6, paths_json,
2138 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2139 FROM tool_spans
2140 WHERE session_id = ?1
2141 ORDER BY depth ASC, started_at_ms ASC",
2142 )?;
2143 let rows = stmt.query_map(params![session_id], |row| {
2144 let paths_json: String = row.get(8)?;
2145 Ok(crate::metrics::types::ToolSpanView {
2146 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2147 tool: row
2148 .get::<_, Option<String>>(1)?
2149 .unwrap_or_else(|| "unknown".into()),
2150 status: row.get(2)?,
2151 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2152 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2153 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2154 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2155 cost_usd_e6: row.get(7)?,
2156 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2157 parent_span_id: row.get(9)?,
2158 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2159 subtree_cost_usd_e6: row.get(11)?,
2160 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2161 })
2162 })?;
2163 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2164 let nodes = crate::store::span_tree::build_tree(spans);
2165 *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2166 session_id: session_id.to_string(),
2167 last_event_seq,
2168 nodes: nodes.clone(),
2169 });
2170 Ok(nodes)
2171 }
2172
2173 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2174 let seq = self
2175 .conn
2176 .query_row(
2177 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2178 params![session_id],
2179 |r| r.get::<_, Option<i64>>(0),
2180 )?
2181 .map(|v| v as u64);
2182 Ok(seq)
2183 }
2184
2185 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2186 let mut stmt = self.conn.prepare(
2187 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2188 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2189 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2190 )?;
2191 let rows = stmt.query_map(params![session_id], |row| {
2192 let paths_json: String = row.get(12)?;
2193 Ok(ToolSpanSyncRow {
2194 span_id: row.get(0)?,
2195 session_id: row.get(1)?,
2196 tool: row.get(2)?,
2197 tool_call_id: row.get(3)?,
2198 status: row.get(4)?,
2199 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2200 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2201 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2202 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2203 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2204 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2205 cost_usd_e6: row.get(11)?,
2206 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2207 })
2208 })?;
2209 Ok(rows.filter_map(|row| row.ok()).collect())
2210 }
2211
2212 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2213 self.conn.execute(
2214 "INSERT OR REPLACE INTO session_evals
2215 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2216 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2217 rusqlite::params![
2218 eval.id,
2219 eval.session_id,
2220 eval.judge_model,
2221 eval.rubric_id,
2222 eval.score,
2223 eval.rationale,
2224 eval.flagged as i64,
2225 eval.created_at_ms as i64,
2226 ],
2227 )?;
2228 Ok(())
2229 }
2230
2231 pub fn list_evals_in_window(
2232 &self,
2233 start_ms: u64,
2234 end_ms: u64,
2235 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2236 let mut stmt = self.conn.prepare(
2237 "SELECT id, session_id, judge_model, rubric_id, score,
2238 rationale, flagged, created_at_ms
2239 FROM session_evals
2240 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2241 ORDER BY created_at_ms ASC",
2242 )?;
2243 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2244 Ok(crate::eval::types::EvalRow {
2245 id: r.get(0)?,
2246 session_id: r.get(1)?,
2247 judge_model: r.get(2)?,
2248 rubric_id: r.get(3)?,
2249 score: r.get(4)?,
2250 rationale: r.get(5)?,
2251 flagged: r.get::<_, i64>(6)? != 0,
2252 created_at_ms: r.get::<_, i64>(7)? as u64,
2253 })
2254 })?;
2255 rows.collect()
2256 }
2257
2258 pub fn list_evals_for_session(
2259 &self,
2260 session_id: &str,
2261 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2262 let mut stmt = self.conn.prepare(
2263 "SELECT id, session_id, judge_model, rubric_id, score,
2264 rationale, flagged, created_at_ms
2265 FROM session_evals
2266 WHERE session_id = ?1
2267 ORDER BY created_at_ms DESC",
2268 )?;
2269 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2270 Ok(crate::eval::types::EvalRow {
2271 id: r.get(0)?,
2272 session_id: r.get(1)?,
2273 judge_model: r.get(2)?,
2274 rubric_id: r.get(3)?,
2275 score: r.get(4)?,
2276 rationale: r.get(5)?,
2277 flagged: r.get::<_, i64>(6)? != 0,
2278 created_at_ms: r.get::<_, i64>(7)? as u64,
2279 })
2280 })?;
2281 rows.collect()
2282 }
2283
2284 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2285 use crate::feedback::types::FeedbackLabel;
2286 self.conn.execute(
2287 "INSERT OR REPLACE INTO session_feedback
2288 (id, session_id, score, label, note, created_at_ms)
2289 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2290 rusqlite::params![
2291 r.id,
2292 r.session_id,
2293 r.score.as_ref().map(|s| s.0 as i64),
2294 r.label.as_ref().map(FeedbackLabel::to_db_str),
2295 r.note,
2296 r.created_at_ms as i64,
2297 ],
2298 )?;
2299 let payload = serde_json::to_string(r).unwrap_or_default();
2300 self.conn.execute(
2301 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2302 VALUES (?1, 'session_feedback', ?2, 0)",
2303 rusqlite::params![r.session_id, payload],
2304 )?;
2305 Ok(())
2306 }
2307
2308 pub fn list_feedback_in_window(
2309 &self,
2310 start_ms: u64,
2311 end_ms: u64,
2312 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2313 let mut stmt = self.conn.prepare(
2314 "SELECT id, session_id, score, label, note, created_at_ms
2315 FROM session_feedback
2316 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2317 ORDER BY created_at_ms ASC",
2318 )?;
2319 let rows = stmt.query_map(
2320 rusqlite::params![start_ms as i64, end_ms as i64],
2321 feedback_row,
2322 )?;
2323 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2324 }
2325
2326 pub fn feedback_for_sessions(
2327 &self,
2328 ids: &[String],
2329 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2330 if ids.is_empty() {
2331 return Ok(std::collections::HashMap::new());
2332 }
2333 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2334 let sql = format!(
2335 "SELECT id, session_id, score, label, note, created_at_ms
2336 FROM session_feedback WHERE session_id IN ({placeholders})
2337 ORDER BY created_at_ms DESC"
2338 );
2339 let mut stmt = self.conn.prepare(&sql)?;
2340 let params: Vec<&dyn rusqlite::ToSql> =
2341 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2342 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2343 let mut map = std::collections::HashMap::new();
2344 for row in rows {
2345 let r = row?;
2346 map.entry(r.session_id.clone()).or_insert(r);
2347 }
2348 Ok(map)
2349 }
2350
2351 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2352 self.conn.execute(
2353 "INSERT INTO session_outcomes (
2354 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2355 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2356 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2357 ON CONFLICT(session_id) DO UPDATE SET
2358 test_passed=excluded.test_passed,
2359 test_failed=excluded.test_failed,
2360 test_skipped=excluded.test_skipped,
2361 build_ok=excluded.build_ok,
2362 lint_errors=excluded.lint_errors,
2363 revert_lines_14d=excluded.revert_lines_14d,
2364 pr_open=excluded.pr_open,
2365 ci_ok=excluded.ci_ok,
2366 measured_at_ms=excluded.measured_at_ms,
2367 measure_error=excluded.measure_error",
2368 params![
2369 row.session_id,
2370 row.test_passed,
2371 row.test_failed,
2372 row.test_skipped,
2373 row.build_ok.map(bool_to_i64),
2374 row.lint_errors,
2375 row.revert_lines_14d,
2376 row.pr_open,
2377 row.ci_ok.map(bool_to_i64),
2378 row.measured_at_ms as i64,
2379 row.measure_error.as_deref(),
2380 ],
2381 )?;
2382 Ok(())
2383 }
2384
2385 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2386 let mut stmt = self.conn.prepare(
2387 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2388 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2389 FROM session_outcomes WHERE session_id = ?1",
2390 )?;
2391 let row = stmt
2392 .query_row(params![session_id], outcome_row)
2393 .optional()?;
2394 Ok(row)
2395 }
2396
2397 pub fn list_session_outcomes_in_window(
2399 &self,
2400 workspace: &str,
2401 start_ms: u64,
2402 end_ms: u64,
2403 ) -> Result<Vec<SessionOutcomeRow>> {
2404 let mut stmt = self.conn.prepare(
2405 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2406 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2407 FROM session_outcomes o
2408 JOIN sessions s ON s.id = o.session_id
2409 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2410 ORDER BY o.measured_at_ms ASC",
2411 )?;
2412 let rows = stmt.query_map(
2413 params![workspace, start_ms as i64, end_ms as i64],
2414 outcome_row,
2415 )?;
2416 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2417 }
2418
2419 pub fn append_session_sample(
2420 &self,
2421 session_id: &str,
2422 ts_ms: u64,
2423 pid: u32,
2424 cpu_percent: Option<f64>,
2425 rss_bytes: Option<u64>,
2426 ) -> Result<()> {
2427 self.conn.execute(
2428 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2429 VALUES (?1, ?2, ?3, ?4, ?5)",
2430 params![
2431 session_id,
2432 ts_ms as i64,
2433 pid as i64,
2434 cpu_percent,
2435 rss_bytes.map(|b| b as i64)
2436 ],
2437 )?;
2438 Ok(())
2439 }
2440
2441 pub fn list_session_sample_aggs_in_window(
2443 &self,
2444 workspace: &str,
2445 start_ms: u64,
2446 end_ms: u64,
2447 ) -> Result<Vec<SessionSampleAgg>> {
2448 let mut stmt = self.conn.prepare(
2449 "SELECT ss.session_id, COUNT(*) AS n,
2450 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2451 FROM session_samples ss
2452 JOIN sessions s ON s.id = ss.session_id
2453 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2454 GROUP BY ss.session_id",
2455 )?;
2456 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2457 let sid: String = r.get(0)?;
2458 let n: i64 = r.get(1)?;
2459 let max_cpu: Option<f64> = r.get(2)?;
2460 let max_rss: Option<i64> = r.get(3)?;
2461 Ok(SessionSampleAgg {
2462 session_id: sid,
2463 sample_count: n as u64,
2464 max_cpu_percent: max_cpu.unwrap_or(0.0),
2465 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2466 })
2467 })?;
2468 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2469 }
2470
2471 pub fn list_sessions_for_eval(
2472 &self,
2473 since_ms: u64,
2474 min_cost_usd: f64,
2475 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2476 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2477 let mut stmt = self.conn.prepare(
2478 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2479 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2480 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2481 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2482 FROM sessions s
2483 WHERE s.started_at_ms >= ?1
2484 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2485 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2486 ORDER BY s.started_at_ms DESC",
2487 )?;
2488 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2489 Ok((
2490 r.get::<_, String>(0)?,
2491 r.get::<_, String>(1)?,
2492 r.get::<_, Option<String>>(2)?,
2493 r.get::<_, String>(3)?,
2494 r.get::<_, i64>(4)?,
2495 r.get::<_, Option<i64>>(5)?,
2496 r.get::<_, String>(6)?,
2497 r.get::<_, String>(7)?,
2498 r.get::<_, Option<String>>(8)?,
2499 r.get::<_, Option<String>>(9)?,
2500 r.get::<_, Option<String>>(10)?,
2501 r.get::<_, Option<i64>>(11)?,
2502 r.get::<_, Option<i64>>(12)?,
2503 r.get::<_, Option<String>>(13)?,
2504 r.get::<_, Option<String>>(14)?,
2505 r.get::<_, Option<String>>(15)?,
2506 r.get::<_, Option<String>>(16)?,
2507 r.get::<_, Option<String>>(17)?,
2508 r.get::<_, Option<String>>(18)?,
2509 r.get::<_, Option<i64>>(19)?,
2510 r.get::<_, Option<i64>>(20)?,
2511 ))
2512 })?;
2513 let mut out = Vec::new();
2514 for row in rows {
2515 let (
2516 id,
2517 agent,
2518 model,
2519 workspace,
2520 started,
2521 ended,
2522 status_str,
2523 trace,
2524 start_commit,
2525 end_commit,
2526 branch,
2527 dirty_start,
2528 dirty_end,
2529 source,
2530 prompt_fingerprint,
2531 parent_session_id,
2532 agent_version,
2533 os,
2534 arch,
2535 repo_file_count,
2536 repo_total_loc,
2537 ) = row?;
2538 out.push(crate::core::event::SessionRecord {
2539 id,
2540 agent,
2541 model,
2542 workspace,
2543 started_at_ms: started as u64,
2544 ended_at_ms: ended.map(|v| v as u64),
2545 status: status_from_str(&status_str),
2546 trace_path: trace,
2547 start_commit,
2548 end_commit,
2549 branch,
2550 dirty_start: dirty_start.map(i64_to_bool),
2551 dirty_end: dirty_end.map(i64_to_bool),
2552 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2553 prompt_fingerprint,
2554 parent_session_id,
2555 agent_version,
2556 os,
2557 arch,
2558 repo_file_count: repo_file_count.map(|v| v as u32),
2559 repo_total_loc: repo_total_loc.map(|v| v as u64),
2560 });
2561 }
2562 Ok(out)
2563 }
2564
2565 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2566 self.conn.execute(
2567 "INSERT OR IGNORE INTO prompt_snapshots
2568 (fingerprint, captured_at_ms, files_json, total_bytes)
2569 VALUES (?1, ?2, ?3, ?4)",
2570 params![
2571 snap.fingerprint,
2572 snap.captured_at_ms as i64,
2573 snap.files_json,
2574 snap.total_bytes as i64
2575 ],
2576 )?;
2577 Ok(())
2578 }
2579
2580 pub fn get_prompt_snapshot(
2581 &self,
2582 fingerprint: &str,
2583 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2584 self.conn
2585 .query_row(
2586 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2587 FROM prompt_snapshots WHERE fingerprint = ?1",
2588 params![fingerprint],
2589 |r| {
2590 Ok(crate::prompt::PromptSnapshot {
2591 fingerprint: r.get(0)?,
2592 captured_at_ms: r.get::<_, i64>(1)? as u64,
2593 files_json: r.get(2)?,
2594 total_bytes: r.get::<_, i64>(3)? as u64,
2595 })
2596 },
2597 )
2598 .optional()
2599 .map_err(Into::into)
2600 }
2601
2602 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2603 let mut stmt = self.conn.prepare(
2604 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2605 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2606 )?;
2607 let rows = stmt.query_map([], |r| {
2608 Ok(crate::prompt::PromptSnapshot {
2609 fingerprint: r.get(0)?,
2610 captured_at_ms: r.get::<_, i64>(1)? as u64,
2611 files_json: r.get(2)?,
2612 total_bytes: r.get::<_, i64>(3)? as u64,
2613 })
2614 })?;
2615 Ok(rows.filter_map(|r| r.ok()).collect())
2616 }
2617
2618 pub fn sessions_with_prompt_fingerprint(
2620 &self,
2621 workspace: &str,
2622 start_ms: u64,
2623 end_ms: u64,
2624 ) -> Result<Vec<(String, String)>> {
2625 let mut stmt = self.conn.prepare(
2626 "SELECT id, prompt_fingerprint FROM sessions
2627 WHERE workspace = ?1
2628 AND started_at_ms >= ?2 AND started_at_ms < ?3
2629 AND prompt_fingerprint IS NOT NULL",
2630 )?;
2631 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2632 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2633 })?;
2634 Ok(rows.filter_map(|r| r.ok()).collect())
2635 }
2636}
2637
2638impl Drop for Store {
2639 fn drop(&mut self) {
2640 if let Some(writer) = self.search_writer.get_mut().as_mut() {
2641 let _ = writer.commit();
2642 }
2643 }
2644}
2645
2646fn now_ms() -> u64 {
2647 std::time::SystemTime::now()
2648 .duration_since(std::time::UNIX_EPOCH)
2649 .unwrap_or_default()
2650 .as_millis() as u64
2651}
2652
2653fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
2654 let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
2655 let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
2656 Ok(rows.filter_map(|r| r.ok()).collect())
2657}
2658
2659fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
2660 raw.and_then(|s| s.trim().parse::<u64>().ok())
2661 .unwrap_or(DEFAULT_MMAP_MB)
2662 .saturating_mul(1024)
2663 .saturating_mul(1024)
2664 .min(i64::MAX as u64) as i64
2665}
2666
2667fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
2668 let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
2669 conn.execute_batch(&format!(
2670 "
2671 PRAGMA journal_mode=WAL;
2672 PRAGMA busy_timeout=5000;
2673 PRAGMA synchronous=NORMAL;
2674 PRAGMA cache_size=-65536;
2675 PRAGMA mmap_size={mmap_size};
2676 PRAGMA temp_store=MEMORY;
2677 PRAGMA wal_autocheckpoint=1000;
2678 "
2679 ))?;
2680 if mode == StoreOpenMode::ReadOnlyQuery {
2681 conn.execute_batch("PRAGMA query_only=ON;")?;
2682 }
2683 Ok(())
2684}
2685
2686fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2687 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2688}
2689
2690fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
2691 let status_str: String = row.get(6)?;
2692 Ok(SessionRecord {
2693 id: row.get(0)?,
2694 agent: row.get(1)?,
2695 model: row.get(2)?,
2696 workspace: row.get(3)?,
2697 started_at_ms: row.get::<_, i64>(4)? as u64,
2698 ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2699 status: status_from_str(&status_str),
2700 trace_path: row.get(7)?,
2701 start_commit: row.get(8)?,
2702 end_commit: row.get(9)?,
2703 branch: row.get(10)?,
2704 dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2705 dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2706 repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
2707 prompt_fingerprint: row.get(14)?,
2708 parent_session_id: row.get(15)?,
2709 agent_version: row.get(16)?,
2710 os: row.get(17)?,
2711 arch: row.get(18)?,
2712 repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2713 repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2714 })
2715}
2716
2717fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
2718 let payload_str: String = row.get(12)?;
2719 Ok(Event {
2720 session_id: row.get(0)?,
2721 seq: row.get::<_, i64>(1)? as u64,
2722 ts_ms: row.get::<_, i64>(2)? as u64,
2723 ts_exact: row.get::<_, i64>(3)? != 0,
2724 kind: kind_from_str(&row.get::<_, String>(4)?),
2725 source: source_from_str(&row.get::<_, String>(5)?),
2726 tool: row.get(6)?,
2727 tool_call_id: row.get(7)?,
2728 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2729 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2730 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2731 cost_usd_e6: row.get(11)?,
2732 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
2733 stop_reason: row.get(13)?,
2734 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
2735 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
2736 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
2737 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
2738 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
2739 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2740 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
2741 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
2742 })
2743}
2744
2745fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
2746 let mut clauses = vec!["workspace = ?".to_string()];
2747 let mut args = vec![Value::Text(workspace.to_string())];
2748 if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
2749 clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
2750 args.push(Value::Text(format!("{}%", escape_like(prefix))));
2751 }
2752 if let Some(status) = &filter.status {
2753 clauses.push("status = ?".to_string());
2754 args.push(Value::Text(format!("{status:?}")));
2755 }
2756 if let Some(since_ms) = filter.since_ms {
2757 clauses.push("started_at_ms >= ?".to_string());
2758 args.push(Value::Integer(since_ms as i64));
2759 }
2760 (format!("WHERE {}", clauses.join(" AND ")), args)
2761}
2762
2763fn escape_like(raw: &str) -> String {
2764 raw.to_lowercase()
2765 .replace('\\', "\\\\")
2766 .replace('%', "\\%")
2767 .replace('_', "\\_")
2768}
2769
2770fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2771 let cost: i64 = conn.query_row(
2772 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2773 params![workspace], |r| r.get(0),
2774 )?;
2775 let with_cost: i64 = conn.query_row(
2776 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2777 params![workspace], |r| r.get(0),
2778 )?;
2779 Ok((cost, with_cost as u64))
2780}
2781
2782fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2783 let build_raw: Option<i64> = r.get(4)?;
2784 let ci_raw: Option<i64> = r.get(8)?;
2785 Ok(SessionOutcomeRow {
2786 session_id: r.get(0)?,
2787 test_passed: r.get(1)?,
2788 test_failed: r.get(2)?,
2789 test_skipped: r.get(3)?,
2790 build_ok: build_raw.map(|v| v != 0),
2791 lint_errors: r.get(5)?,
2792 revert_lines_14d: r.get(6)?,
2793 pr_open: r.get(7)?,
2794 ci_ok: ci_raw.map(|v| v != 0),
2795 measured_at_ms: r.get::<_, i64>(9)? as u64,
2796 measure_error: r.get(10)?,
2797 })
2798}
2799
2800fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2801 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2802 let score = r
2803 .get::<_, Option<i64>>(2)?
2804 .and_then(|v| FeedbackScore::new(v as u8));
2805 let label = r
2806 .get::<_, Option<String>>(3)?
2807 .and_then(|s| FeedbackLabel::from_str_opt(&s));
2808 Ok(FeedbackRecord {
2809 id: r.get(0)?,
2810 session_id: r.get(1)?,
2811 score,
2812 label,
2813 note: r.get(4)?,
2814 created_at_ms: r.get::<_, i64>(5)? as u64,
2815 })
2816}
2817
2818fn day_label(day_idx: u64) -> &'static str {
2819 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2820}
2821
2822fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2823 let week_ago = now.saturating_sub(7 * 86_400_000);
2824 let mut stmt = conn
2825 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2826 let days: Vec<u64> = stmt
2827 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2828 .filter_map(|r| r.ok())
2829 .map(|v| v as u64 / 86_400_000)
2830 .collect();
2831 let today = now / 86_400_000;
2832 Ok((0u64..7)
2833 .map(|i| {
2834 let d = today.saturating_sub(6 - i);
2835 (
2836 day_label(d).to_string(),
2837 days.iter().filter(|&&x| x == d).count() as u64,
2838 )
2839 })
2840 .collect())
2841}
2842
2843fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2844 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2845 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2846 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2847 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2848 COUNT(e.id) FROM sessions s \
2849 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2850 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2851 let mut stmt = conn.prepare(sql)?;
2852 let out: Vec<(SessionRecord, u64)> = stmt
2853 .query_map(params![workspace], |r| {
2854 let st: String = r.get(6)?;
2855 Ok((
2856 SessionRecord {
2857 id: r.get(0)?,
2858 agent: r.get(1)?,
2859 model: r.get(2)?,
2860 workspace: r.get(3)?,
2861 started_at_ms: r.get::<_, i64>(4)? as u64,
2862 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2863 status: status_from_str(&st),
2864 trace_path: r.get(7)?,
2865 start_commit: r.get(8)?,
2866 end_commit: r.get(9)?,
2867 branch: r.get(10)?,
2868 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2869 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2870 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2871 prompt_fingerprint: r.get(14)?,
2872 parent_session_id: r.get(15)?,
2873 agent_version: r.get(16)?,
2874 os: r.get(17)?,
2875 arch: r.get(18)?,
2876 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2877 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2878 },
2879 r.get::<_, i64>(21)? as u64,
2880 ))
2881 })?
2882 .filter_map(|r| r.ok())
2883 .collect();
2884 Ok(out)
2885}
2886
2887fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2888 let mut stmt = conn.prepare(
2889 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2890 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2891 )?;
2892 let out: Vec<(String, u64)> = stmt
2893 .query_map(params![workspace], |r| {
2894 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2895 })?
2896 .filter_map(|r| r.ok())
2897 .collect();
2898 Ok(out)
2899}
2900
2901fn status_from_str(s: &str) -> SessionStatus {
2902 match s {
2903 "Running" => SessionStatus::Running,
2904 "Waiting" => SessionStatus::Waiting,
2905 "Idle" => SessionStatus::Idle,
2906 _ => SessionStatus::Done,
2907 }
2908}
2909
2910fn projector_legacy_mode() -> bool {
2911 std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
2912}
2913
2914fn is_stop_event(e: &Event) -> bool {
2915 if !matches!(e.kind, EventKind::Hook) {
2916 return false;
2917 }
2918 e.payload
2919 .get("event")
2920 .and_then(|v| v.as_str())
2921 .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
2922 == Some("Stop")
2923}
2924
2925fn kind_from_str(s: &str) -> EventKind {
2926 match s {
2927 "ToolCall" => EventKind::ToolCall,
2928 "ToolResult" => EventKind::ToolResult,
2929 "Message" => EventKind::Message,
2930 "Error" => EventKind::Error,
2931 "Cost" => EventKind::Cost,
2932 "Hook" => EventKind::Hook,
2933 "Lifecycle" => EventKind::Lifecycle,
2934 _ => EventKind::Hook,
2935 }
2936}
2937
2938fn source_from_str(s: &str) -> EventSource {
2939 match s {
2940 "Tail" => EventSource::Tail,
2941 "Hook" => EventSource::Hook,
2942 _ => EventSource::Proxy,
2943 }
2944}
2945
2946fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2947 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2948 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2949 ensure_column(conn, "sessions", "branch", "TEXT")?;
2950 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2951 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2952 ensure_column(
2953 conn,
2954 "sessions",
2955 "repo_binding_source",
2956 "TEXT NOT NULL DEFAULT ''",
2957 )?;
2958 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2959 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2960 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2961 ensure_column(conn, "events", "stop_reason", "TEXT")?;
2962 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2963 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2964 ensure_column(conn, "events", "retry_count", "INTEGER")?;
2965 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2966 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2967 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2968 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2969 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2970 ensure_column(
2971 conn,
2972 "sync_outbox",
2973 "kind",
2974 "TEXT NOT NULL DEFAULT 'events'",
2975 )?;
2976 ensure_column(
2977 conn,
2978 "experiments",
2979 "state",
2980 "TEXT NOT NULL DEFAULT 'Draft'",
2981 )?;
2982 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2983 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2984 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2985 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2986 ensure_column(conn, "sessions", "os", "TEXT")?;
2987 ensure_column(conn, "sessions", "arch", "TEXT")?;
2988 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2989 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2990 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2991 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2992 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2993 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2994 conn.execute_batch(
2995 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2996 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2997 )?;
2998 Ok(())
2999}
3000
3001fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3002 if has_column(conn, table, column)? {
3003 return Ok(());
3004 }
3005 conn.execute(
3006 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
3007 [],
3008 )?;
3009 Ok(())
3010}
3011
3012fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3013 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3014 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3015 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3016}
3017
3018fn bool_to_i64(v: bool) -> i64 {
3019 if v { 1 } else { 0 }
3020}
3021
3022fn i64_to_bool(v: i64) -> bool {
3023 v != 0
3024}
3025
3026fn empty_to_none(s: String) -> Option<String> {
3027 if s.is_empty() { None } else { Some(s) }
3028}
3029
3030#[cfg(test)]
3031mod tests {
3032 use super::*;
3033 use serde_json::json;
3034 use std::collections::HashSet;
3035 use tempfile::TempDir;
3036
3037 fn make_session(id: &str) -> SessionRecord {
3038 SessionRecord {
3039 id: id.to_string(),
3040 agent: "cursor".to_string(),
3041 model: None,
3042 workspace: "/ws".to_string(),
3043 started_at_ms: 1000,
3044 ended_at_ms: None,
3045 status: SessionStatus::Done,
3046 trace_path: "/trace".to_string(),
3047 start_commit: None,
3048 end_commit: None,
3049 branch: None,
3050 dirty_start: None,
3051 dirty_end: None,
3052 repo_binding_source: None,
3053 prompt_fingerprint: None,
3054 parent_session_id: None,
3055 agent_version: None,
3056 os: None,
3057 arch: None,
3058 repo_file_count: None,
3059 repo_total_loc: None,
3060 }
3061 }
3062
3063 fn make_event(session_id: &str, seq: u64) -> Event {
3064 Event {
3065 session_id: session_id.to_string(),
3066 seq,
3067 ts_ms: 1000 + seq * 100,
3068 ts_exact: false,
3069 kind: EventKind::ToolCall,
3070 source: EventSource::Tail,
3071 tool: Some("read_file".to_string()),
3072 tool_call_id: Some(format!("call_{seq}")),
3073 tokens_in: None,
3074 tokens_out: None,
3075 reasoning_tokens: None,
3076 cost_usd_e6: None,
3077 stop_reason: None,
3078 latency_ms: None,
3079 ttft_ms: None,
3080 retry_count: None,
3081 context_used_tokens: None,
3082 context_max_tokens: None,
3083 cache_creation_tokens: None,
3084 cache_read_tokens: None,
3085 system_prompt_tokens: None,
3086 payload: json!({}),
3087 }
3088 }
3089
3090 #[test]
3091 fn open_and_wal_mode() {
3092 let dir = TempDir::new().unwrap();
3093 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3094 let mode: String = store
3095 .conn
3096 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3097 .unwrap();
3098 assert_eq!(mode, "wal");
3099 }
3100
3101 #[test]
3102 fn open_applies_phase0_pragmas() {
3103 let dir = TempDir::new().unwrap();
3104 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3105 let synchronous: i64 = store
3106 .conn
3107 .query_row("PRAGMA synchronous", [], |r| r.get(0))
3108 .unwrap();
3109 let cache_size: i64 = store
3110 .conn
3111 .query_row("PRAGMA cache_size", [], |r| r.get(0))
3112 .unwrap();
3113 let temp_store: i64 = store
3114 .conn
3115 .query_row("PRAGMA temp_store", [], |r| r.get(0))
3116 .unwrap();
3117 let wal_autocheckpoint: i64 = store
3118 .conn
3119 .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3120 .unwrap();
3121 assert_eq!(synchronous, 1);
3122 assert_eq!(cache_size, -65_536);
3123 assert_eq!(temp_store, 2);
3124 assert_eq!(wal_autocheckpoint, 1_000);
3125 assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3126 }
3127
3128 #[test]
3129 fn read_only_open_sets_query_only() {
3130 let dir = TempDir::new().unwrap();
3131 let db = dir.path().join("kaizen.db");
3132 Store::open(&db).unwrap();
3133 let store = Store::open_read_only(&db).unwrap();
3134 let query_only: i64 = store
3135 .conn
3136 .query_row("PRAGMA query_only", [], |r| r.get(0))
3137 .unwrap();
3138 assert_eq!(query_only, 1);
3139 }
3140
3141 #[test]
3142 fn phase0_indexes_exist() {
3143 let dir = TempDir::new().unwrap();
3144 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3145 for name in [
3146 "tool_spans_session_idx",
3147 "tool_spans_started_idx",
3148 "session_samples_ts_idx",
3149 "events_ts_idx",
3150 "feedback_session_idx",
3151 ] {
3152 let found: i64 = store
3153 .conn
3154 .query_row(
3155 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3156 params![name],
3157 |r| r.get(0),
3158 )
3159 .unwrap();
3160 assert_eq!(found, 1, "{name}");
3161 }
3162 }
3163
3164 #[test]
3165 fn upsert_and_get_session() {
3166 let dir = TempDir::new().unwrap();
3167 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3168 let s = make_session("s1");
3169 store.upsert_session(&s).unwrap();
3170
3171 let got = store.get_session("s1").unwrap().unwrap();
3172 assert_eq!(got.id, "s1");
3173 assert_eq!(got.status, SessionStatus::Done);
3174 }
3175
3176 #[test]
3177 fn append_and_list_events_round_trip() {
3178 let dir = TempDir::new().unwrap();
3179 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3180 let s = make_session("s2");
3181 store.upsert_session(&s).unwrap();
3182 store.append_event(&make_event("s2", 0)).unwrap();
3183 store.append_event(&make_event("s2", 1)).unwrap();
3184
3185 let sessions = store.list_sessions("/ws").unwrap();
3186 assert_eq!(sessions.len(), 1);
3187 assert_eq!(sessions[0].id, "s2");
3188 }
3189
3190 #[test]
3191 fn list_sessions_page_orders_and_counts() {
3192 let dir = TempDir::new().unwrap();
3193 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3194 let mut a = make_session("a");
3195 a.started_at_ms = 2_000;
3196 let mut b = make_session("b");
3197 b.started_at_ms = 2_000;
3198 let mut c = make_session("c");
3199 c.started_at_ms = 1_000;
3200 store.upsert_session(&c).unwrap();
3201 store.upsert_session(&b).unwrap();
3202 store.upsert_session(&a).unwrap();
3203
3204 let page = store
3205 .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3206 .unwrap();
3207 assert_eq!(page.total, 3);
3208 assert_eq!(page.next_offset, Some(2));
3209 assert_eq!(
3210 page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3211 vec!["a", "b"]
3212 );
3213
3214 let all = store.list_sessions("/ws").unwrap();
3215 assert_eq!(
3216 all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3217 vec!["a", "b", "c"]
3218 );
3219 }
3220
3221 #[test]
3222 fn list_sessions_page_filters_in_sql_shape() {
3223 let dir = TempDir::new().unwrap();
3224 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3225 let mut cursor = make_session("cursor");
3226 cursor.agent = "Cursor".into();
3227 cursor.started_at_ms = 2_000;
3228 cursor.status = SessionStatus::Running;
3229 let mut claude = make_session("claude");
3230 claude.agent = "claude".into();
3231 claude.started_at_ms = 3_000;
3232 store.upsert_session(&cursor).unwrap();
3233 store.upsert_session(&claude).unwrap();
3234
3235 let page = store
3236 .list_sessions_page(
3237 "/ws",
3238 0,
3239 10,
3240 SessionFilter {
3241 agent_prefix: Some("cur".into()),
3242 status: Some(SessionStatus::Running),
3243 since_ms: Some(1_500),
3244 },
3245 )
3246 .unwrap();
3247 assert_eq!(page.total, 1);
3248 assert_eq!(page.rows[0].id, "cursor");
3249 }
3250
3251 #[test]
3252 fn incremental_session_helpers_find_new_rows_and_statuses() {
3253 let dir = TempDir::new().unwrap();
3254 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3255 let mut old = make_session("old");
3256 old.started_at_ms = 1_000;
3257 let mut new = make_session("new");
3258 new.started_at_ms = 2_000;
3259 new.status = SessionStatus::Running;
3260 store.upsert_session(&old).unwrap();
3261 store.upsert_session(&new).unwrap();
3262
3263 let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3264 assert_eq!(rows.len(), 1);
3265 assert_eq!(rows[0].id, "new");
3266
3267 store
3268 .update_session_status("new", SessionStatus::Done)
3269 .unwrap();
3270 let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3271 assert_eq!(statuses.len(), 1);
3272 assert_eq!(statuses[0].status, SessionStatus::Done);
3273 }
3274
3275 #[test]
3276 fn summary_stats_empty() {
3277 let dir = TempDir::new().unwrap();
3278 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3279 let stats = store.summary_stats("/ws").unwrap();
3280 assert_eq!(stats.session_count, 0);
3281 assert_eq!(stats.total_cost_usd_e6, 0);
3282 }
3283
3284 #[test]
3285 fn summary_stats_counts_sessions() {
3286 let dir = TempDir::new().unwrap();
3287 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3288 store.upsert_session(&make_session("a")).unwrap();
3289 store.upsert_session(&make_session("b")).unwrap();
3290 let stats = store.summary_stats("/ws").unwrap();
3291 assert_eq!(stats.session_count, 2);
3292 assert_eq!(stats.by_agent.len(), 1);
3293 assert_eq!(stats.by_agent[0].0, "cursor");
3294 assert_eq!(stats.by_agent[0].1, 2);
3295 }
3296
3297 #[test]
3298 fn list_events_for_session_round_trip() {
3299 let dir = TempDir::new().unwrap();
3300 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3301 store.upsert_session(&make_session("s4")).unwrap();
3302 store.append_event(&make_event("s4", 0)).unwrap();
3303 store.append_event(&make_event("s4", 1)).unwrap();
3304 let events = store.list_events_for_session("s4").unwrap();
3305 assert_eq!(events.len(), 2);
3306 assert_eq!(events[0].seq, 0);
3307 assert_eq!(events[1].seq, 1);
3308 }
3309
3310 #[test]
3311 fn list_events_page_uses_inclusive_seq_cursor() {
3312 let dir = TempDir::new().unwrap();
3313 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3314 store.upsert_session(&make_session("paged")).unwrap();
3315 for seq in 0..5 {
3316 store.append_event(&make_event("paged", seq)).unwrap();
3317 }
3318 let first = store.list_events_page("paged", 0, 2).unwrap();
3319 assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3320 let second = store
3321 .list_events_page("paged", first[1].seq + 1, 2)
3322 .unwrap();
3323 assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3324 }
3325
3326 #[test]
3327 fn append_event_dedup() {
3328 let dir = TempDir::new().unwrap();
3329 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3330 store.upsert_session(&make_session("s5")).unwrap();
3331 store.append_event(&make_event("s5", 0)).unwrap();
3332 store.append_event(&make_event("s5", 0)).unwrap();
3334 let events = store.list_events_for_session("s5").unwrap();
3335 assert_eq!(events.len(), 1);
3336 }
3337
3338 #[test]
3339 fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3340 let dir = TempDir::new().unwrap();
3341 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3342 assert!(store.session_span_tree("missing").unwrap().is_empty());
3343 assert!(store.span_tree_cache.borrow().is_some());
3344
3345 store.upsert_session(&make_session("tree")).unwrap();
3346 let call = make_event("tree", 0);
3347 store.append_event(&call).unwrap();
3348 assert!(store.span_tree_cache.borrow().is_none());
3349 assert!(store.session_span_tree("tree").unwrap().is_empty());
3350 assert!(store.span_tree_cache.borrow().is_some());
3351 let mut result = make_event("tree", 1);
3352 result.kind = EventKind::ToolResult;
3353 result.tool_call_id = call.tool_call_id.clone();
3354 store.append_event(&result).unwrap();
3355 assert!(store.span_tree_cache.borrow().is_none());
3356 let first = store.session_span_tree("tree").unwrap();
3357 assert_eq!(first.len(), 1);
3358 assert!(store.span_tree_cache.borrow().is_some());
3359 store.append_event(&make_event("tree", 2)).unwrap();
3360 assert!(store.span_tree_cache.borrow().is_none());
3361 }
3362
3363 #[test]
3364 fn tool_spans_in_window_uses_started_then_ended_fallback() {
3365 let dir = TempDir::new().unwrap();
3366 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3367 store.upsert_session(&make_session("spans")).unwrap();
3368 for (id, started, ended) in [
3369 ("started", Some(200_i64), None),
3370 ("fallback", None, Some(250_i64)),
3371 ("outside", Some(400_i64), None),
3372 ("too_old", None, Some(50_i64)),
3373 ("started_wins", Some(500_i64), Some(200_i64)),
3374 ] {
3375 store
3376 .conn
3377 .execute(
3378 "INSERT INTO tool_spans
3379 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3380 VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3381 params![id, started, ended],
3382 )
3383 .unwrap();
3384 }
3385 let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3386 let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3387 assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3388 }
3389
3390 #[test]
3391 fn upsert_idempotent() {
3392 let dir = TempDir::new().unwrap();
3393 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3394 let mut s = make_session("s3");
3395 store.upsert_session(&s).unwrap();
3396 s.status = SessionStatus::Running;
3397 store.upsert_session(&s).unwrap();
3398
3399 let got = store.get_session("s3").unwrap().unwrap();
3400 assert_eq!(got.status, SessionStatus::Running);
3401 }
3402
3403 #[test]
3404 fn append_event_indexes_path_from_payload() {
3405 let dir = TempDir::new().unwrap();
3406 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3407 store.upsert_session(&make_session("sx")).unwrap();
3408 let mut ev = make_event("sx", 0);
3409 ev.payload = json!({"input": {"path": "src/lib.rs"}});
3410 store.append_event(&ev).unwrap();
3411 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
3412 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
3413 }
3414
3415 #[test]
3416 fn update_session_status_changes_status() {
3417 let dir = TempDir::new().unwrap();
3418 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3419 store.upsert_session(&make_session("s6")).unwrap();
3420 store
3421 .update_session_status("s6", SessionStatus::Running)
3422 .unwrap();
3423 let got = store.get_session("s6").unwrap().unwrap();
3424 assert_eq!(got.status, SessionStatus::Running);
3425 }
3426
3427 #[test]
3428 fn prune_sessions_removes_old_rows_and_keeps_recent() {
3429 let dir = TempDir::new().unwrap();
3430 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3431 let mut old = make_session("old");
3432 old.started_at_ms = 1_000;
3433 let mut new = make_session("new");
3434 new.started_at_ms = 9_000_000_000_000;
3435 store.upsert_session(&old).unwrap();
3436 store.upsert_session(&new).unwrap();
3437 store.append_event(&make_event("old", 0)).unwrap();
3438
3439 let stats = store.prune_sessions_started_before(5_000).unwrap();
3440 assert_eq!(
3441 stats,
3442 PruneStats {
3443 sessions_removed: 1,
3444 events_removed: 1,
3445 }
3446 );
3447 assert!(store.get_session("old").unwrap().is_none());
3448 assert!(store.get_session("new").unwrap().is_some());
3449 let sessions = store.list_sessions("/ws").unwrap();
3450 assert_eq!(sessions.len(), 1);
3451 assert_eq!(sessions[0].id, "new");
3452 }
3453
3454 #[test]
3455 fn append_event_indexes_rules_from_payload() {
3456 let dir = TempDir::new().unwrap();
3457 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3458 store.upsert_session(&make_session("sr")).unwrap();
3459 let mut ev = make_event("sr", 0);
3460 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
3461 store.append_event(&ev).unwrap();
3462 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
3463 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
3464 }
3465
3466 #[test]
3467 fn guidance_report_counts_skill_and_rule_sessions() {
3468 let dir = TempDir::new().unwrap();
3469 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3470 store.upsert_session(&make_session("sx")).unwrap();
3471 let mut ev = make_event("sx", 0);
3472 ev.payload =
3473 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
3474 ev.cost_usd_e6 = Some(500_000);
3475 store.append_event(&ev).unwrap();
3476
3477 let mut skill_slugs = HashSet::new();
3478 skill_slugs.insert("tdd".into());
3479 let mut rule_slugs = HashSet::new();
3480 rule_slugs.insert("style".into());
3481
3482 let rep = store
3483 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
3484 .unwrap();
3485 assert_eq!(rep.sessions_in_window, 1);
3486 let tdd = rep
3487 .rows
3488 .iter()
3489 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
3490 .unwrap();
3491 assert_eq!(tdd.sessions, 1);
3492 assert!(tdd.on_disk);
3493 let style = rep
3494 .rows
3495 .iter()
3496 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
3497 .unwrap();
3498 assert_eq!(style.sessions, 1);
3499 assert!(style.on_disk);
3500 }
3501
3502 #[test]
3503 fn prune_sessions_removes_rules_used_rows() {
3504 let dir = TempDir::new().unwrap();
3505 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3506 let mut old = make_session("old_r");
3507 old.started_at_ms = 1_000;
3508 store.upsert_session(&old).unwrap();
3509 let mut ev = make_event("old_r", 0);
3510 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
3511 store.append_event(&ev).unwrap();
3512
3513 store.prune_sessions_started_before(5_000).unwrap();
3514 let n: i64 = store
3515 .conn
3516 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
3517 .unwrap();
3518 assert_eq!(n, 0);
3519 }
3520}