1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8 FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
16use crate::sync::context::SyncIngestContext;
17use crate::sync::outbound::outbound_event_from_row;
18use crate::sync::redact::redact_payload;
19use crate::sync::smart::enqueue_tool_spans_for_session;
20use anyhow::{Context, Result};
21use rusqlite::types::Value;
22use rusqlite::{
23 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
24};
25use std::cell::RefCell;
26use std::collections::{HashMap, HashSet};
27use std::path::{Path, PathBuf};
28
29const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
32const DEFAULT_MMAP_MB: u64 = 256;
33const SESSION_SELECT: &str = "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
34 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
35 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
36 repo_file_count, repo_total_loc FROM sessions";
37const PAIN_HOTSPOTS_SQL: &str = "
38 SELECT f.path,
39 COUNT(s.id) * f.complexity_total AS value,
40 f.complexity_total,
41 f.churn_30d
42 FROM file_facts f
43 LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
44 LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
45 AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
46 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
47 LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
48 WHERE f.snapshot_id = ?1
49 GROUP BY f.path, f.complexity_total, f.churn_30d
50 ORDER BY value DESC, f.path ASC
51 LIMIT 10";
52const TOOL_RANK_ROWS_SQL: &str = "
53 WITH scoped AS (
54 SELECT COALESCE(ts.tool, 'unknown') AS tool,
55 ts.lead_time_ms,
56 COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
57 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
58 COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
59 FROM tool_spans ts
60 JOIN sessions s ON s.id = ts.session_id
61 WHERE s.workspace = ?1
62 AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
63 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
64 ),
65 agg AS (
66 SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
67 SUM(reasoning_tokens) AS total_reasoning_tokens
68 FROM scoped GROUP BY tool
69 ),
70 lat AS (
71 SELECT tool, lead_time_ms,
72 ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
73 COUNT(*) OVER (PARTITION BY tool) AS n
74 FROM scoped WHERE lead_time_ms IS NOT NULL
75 ),
76 pct AS (
77 SELECT tool,
78 MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
79 MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
80 FROM lat GROUP BY tool
81 )
82 SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
83 agg.total_tokens, agg.total_reasoning_tokens
84 FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
85
86const MIGRATIONS: &[&str] = &[
87 "CREATE TABLE IF NOT EXISTS sessions (
88 id TEXT PRIMARY KEY,
89 agent TEXT NOT NULL,
90 model TEXT,
91 workspace TEXT NOT NULL,
92 started_at_ms INTEGER NOT NULL,
93 ended_at_ms INTEGER,
94 status TEXT NOT NULL,
95 trace_path TEXT NOT NULL
96 )",
97 "CREATE TABLE IF NOT EXISTS events (
98 id INTEGER PRIMARY KEY AUTOINCREMENT,
99 session_id TEXT NOT NULL,
100 seq INTEGER NOT NULL,
101 ts_ms INTEGER NOT NULL,
102 kind TEXT NOT NULL,
103 source TEXT NOT NULL,
104 tool TEXT,
105 tokens_in INTEGER,
106 tokens_out INTEGER,
107 cost_usd_e6 INTEGER,
108 payload TEXT NOT NULL
109 )",
110 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
111 "CREATE TABLE IF NOT EXISTS files_touched (
112 id INTEGER PRIMARY KEY AUTOINCREMENT,
113 session_id TEXT NOT NULL,
114 path TEXT NOT NULL
115 )",
116 "CREATE TABLE IF NOT EXISTS skills_used (
117 id INTEGER PRIMARY KEY AUTOINCREMENT,
118 session_id TEXT NOT NULL,
119 skill TEXT NOT NULL
120 )",
121 "CREATE TABLE IF NOT EXISTS sync_outbox (
122 id INTEGER PRIMARY KEY AUTOINCREMENT,
123 session_id TEXT NOT NULL,
124 payload TEXT NOT NULL,
125 sent INTEGER NOT NULL DEFAULT 0
126 )",
127 "CREATE TABLE IF NOT EXISTS experiments (
128 id TEXT PRIMARY KEY,
129 name TEXT NOT NULL,
130 created_at_ms INTEGER NOT NULL,
131 metadata TEXT NOT NULL DEFAULT '{}'
132 )",
133 "CREATE TABLE IF NOT EXISTS experiment_tags (
134 experiment_id TEXT NOT NULL,
135 session_id TEXT NOT NULL,
136 variant TEXT NOT NULL,
137 PRIMARY KEY (experiment_id, session_id)
138 )",
139 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
140 "CREATE TABLE IF NOT EXISTS sync_state (
141 k TEXT PRIMARY KEY,
142 v TEXT NOT NULL
143 )",
144 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
145 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
146 "CREATE TABLE IF NOT EXISTS tool_spans (
147 span_id TEXT PRIMARY KEY,
148 session_id TEXT NOT NULL,
149 tool TEXT,
150 tool_call_id TEXT,
151 status TEXT NOT NULL,
152 started_at_ms INTEGER,
153 ended_at_ms INTEGER,
154 lead_time_ms INTEGER,
155 tokens_in INTEGER,
156 tokens_out INTEGER,
157 reasoning_tokens INTEGER,
158 cost_usd_e6 INTEGER,
159 paths_json TEXT NOT NULL DEFAULT '[]'
160 )",
161 "CREATE TABLE IF NOT EXISTS tool_span_paths (
162 span_id TEXT NOT NULL,
163 path TEXT NOT NULL,
164 PRIMARY KEY (span_id, path)
165 )",
166 "CREATE TABLE IF NOT EXISTS trace_spans (
167 span_id TEXT PRIMARY KEY,
168 trace_id TEXT NOT NULL,
169 parent_span_id TEXT,
170 session_id TEXT NOT NULL,
171 kind TEXT NOT NULL,
172 name TEXT NOT NULL,
173 status TEXT NOT NULL,
174 started_at_ms INTEGER,
175 ended_at_ms INTEGER,
176 duration_ms INTEGER,
177 model TEXT,
178 tool TEXT,
179 tokens_in INTEGER,
180 tokens_out INTEGER,
181 reasoning_tokens INTEGER,
182 cost_usd_e6 INTEGER,
183 context_used_tokens INTEGER,
184 context_max_tokens INTEGER,
185 payload TEXT NOT NULL DEFAULT '{}'
186 )",
187 "CREATE INDEX IF NOT EXISTS trace_spans_session_idx ON trace_spans(session_id)",
188 "CREATE INDEX IF NOT EXISTS trace_spans_trace_idx ON trace_spans(trace_id)",
189 "CREATE INDEX IF NOT EXISTS trace_spans_started_idx ON trace_spans(started_at_ms)",
190 "CREATE TABLE IF NOT EXISTS session_repo_binding (
191 session_id TEXT PRIMARY KEY,
192 start_commit TEXT,
193 end_commit TEXT,
194 branch TEXT,
195 dirty_start INTEGER,
196 dirty_end INTEGER,
197 repo_binding_source TEXT NOT NULL DEFAULT ''
198 )",
199 "CREATE TABLE IF NOT EXISTS repo_snapshots (
200 id TEXT PRIMARY KEY,
201 workspace TEXT NOT NULL,
202 head_commit TEXT,
203 dirty_fingerprint TEXT NOT NULL,
204 analyzer_version TEXT NOT NULL,
205 indexed_at_ms INTEGER NOT NULL,
206 dirty INTEGER NOT NULL DEFAULT 0,
207 graph_path TEXT NOT NULL
208 )",
209 "CREATE TABLE IF NOT EXISTS file_facts (
210 snapshot_id TEXT NOT NULL,
211 path TEXT NOT NULL,
212 language TEXT NOT NULL,
213 bytes INTEGER NOT NULL,
214 loc INTEGER NOT NULL,
215 sloc INTEGER NOT NULL,
216 complexity_total INTEGER NOT NULL,
217 max_fn_complexity INTEGER NOT NULL,
218 symbol_count INTEGER NOT NULL,
219 import_count INTEGER NOT NULL,
220 fan_in INTEGER NOT NULL,
221 fan_out INTEGER NOT NULL,
222 churn_30d INTEGER NOT NULL,
223 churn_90d INTEGER NOT NULL,
224 authors_90d INTEGER NOT NULL,
225 last_changed_ms INTEGER,
226 PRIMARY KEY (snapshot_id, path)
227 )",
228 "CREATE TABLE IF NOT EXISTS repo_edges (
229 snapshot_id TEXT NOT NULL,
230 from_id TEXT NOT NULL,
231 to_id TEXT NOT NULL,
232 kind TEXT NOT NULL,
233 weight INTEGER NOT NULL,
234 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
235 )",
236 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
238 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
240 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_desc_idx
241 ON sessions(workspace, started_at_ms DESC, id ASC)",
242 "CREATE INDEX IF NOT EXISTS sessions_workspace_agent_lower_idx
243 ON sessions(workspace, lower(agent), started_at_ms DESC, id ASC)",
244 "CREATE TABLE IF NOT EXISTS rules_used (
245 id INTEGER PRIMARY KEY AUTOINCREMENT,
246 session_id TEXT NOT NULL,
247 rule TEXT NOT NULL
248 )",
249 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
250 "CREATE TABLE IF NOT EXISTS remote_pull_state (
252 id INTEGER PRIMARY KEY CHECK (id = 1),
253 query_provider TEXT NOT NULL DEFAULT 'none',
254 cursor_json TEXT NOT NULL DEFAULT '',
255 last_success_ms INTEGER
256 )",
257 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
258 "CREATE TABLE IF NOT EXISTS remote_sessions (
259 team_id TEXT NOT NULL,
260 workspace_hash TEXT NOT NULL,
261 session_id_hash TEXT NOT NULL,
262 json TEXT NOT NULL,
263 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
264 )",
265 "CREATE TABLE IF NOT EXISTS remote_events (
266 team_id TEXT NOT NULL,
267 workspace_hash TEXT NOT NULL,
268 session_id_hash TEXT NOT NULL,
269 event_seq INTEGER NOT NULL,
270 json TEXT NOT NULL,
271 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
272 )",
273 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
274 team_id TEXT NOT NULL,
275 workspace_hash TEXT NOT NULL,
276 span_id_hash TEXT NOT NULL,
277 json TEXT NOT NULL,
278 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
279 )",
280 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
281 team_id TEXT NOT NULL,
282 workspace_hash TEXT NOT NULL,
283 snapshot_id_hash TEXT NOT NULL,
284 chunk_index INTEGER NOT NULL,
285 json TEXT NOT NULL,
286 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
287 )",
288 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
289 team_id TEXT NOT NULL,
290 workspace_hash TEXT NOT NULL,
291 fact_key TEXT NOT NULL,
292 json TEXT NOT NULL,
293 PRIMARY KEY (team_id, workspace_hash, fact_key)
294 )",
295 "CREATE TABLE IF NOT EXISTS session_evals (
296 id TEXT PRIMARY KEY,
297 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
298 judge_model TEXT NOT NULL,
299 rubric_id TEXT NOT NULL,
300 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
301 rationale TEXT NOT NULL,
302 flagged INTEGER NOT NULL DEFAULT 0,
303 created_at_ms INTEGER NOT NULL
304 );
305 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
306 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
307 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
308 fingerprint TEXT PRIMARY KEY,
309 captured_at_ms INTEGER NOT NULL,
310 files_json TEXT NOT NULL,
311 total_bytes INTEGER NOT NULL
312 )",
313 "CREATE TABLE IF NOT EXISTS session_feedback (
314 id TEXT PRIMARY KEY,
315 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
316 score INTEGER CHECK(score BETWEEN 1 AND 5),
317 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
318 note TEXT,
319 created_at_ms INTEGER NOT NULL
320 );
321 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
322 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
323 "CREATE TABLE IF NOT EXISTS session_outcomes (
324 session_id TEXT PRIMARY KEY NOT NULL,
325 test_passed INTEGER,
326 test_failed INTEGER,
327 test_skipped INTEGER,
328 build_ok INTEGER,
329 lint_errors INTEGER,
330 revert_lines_14d INTEGER,
331 pr_open INTEGER,
332 ci_ok INTEGER,
333 measured_at_ms INTEGER NOT NULL,
334 measure_error TEXT
335 )",
336 "CREATE TABLE IF NOT EXISTS session_samples (
337 session_id TEXT NOT NULL,
338 ts_ms INTEGER NOT NULL,
339 pid INTEGER NOT NULL,
340 cpu_percent REAL,
341 rss_bytes INTEGER,
342 PRIMARY KEY (session_id, ts_ms, pid)
343 )",
344 "CREATE TABLE IF NOT EXISTS cases (
345 id TEXT PRIMARY KEY,
346 source_key TEXT NOT NULL UNIQUE,
347 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
348 reason TEXT NOT NULL,
349 label TEXT,
350 status TEXT NOT NULL CHECK(status IN ('open','archived')),
351 prompt_fingerprint TEXT,
352 metadata_json TEXT NOT NULL,
353 created_at_ms INTEGER NOT NULL
354 )",
355 "CREATE TABLE IF NOT EXISTS case_refs (
356 case_id TEXT NOT NULL REFERENCES cases(id) ON DELETE CASCADE,
357 ref_kind TEXT NOT NULL,
358 ref_key TEXT NOT NULL,
359 PRIMARY KEY (case_id, ref_kind, ref_key)
360 )",
361 "CREATE TABLE IF NOT EXISTS rules (
362 id TEXT PRIMARY KEY,
363 name TEXT NOT NULL,
364 filter TEXT NOT NULL,
365 action_json TEXT NOT NULL,
366 enabled INTEGER NOT NULL DEFAULT 1,
367 created_at_ms INTEGER NOT NULL
368 )",
369 "CREATE TABLE IF NOT EXISTS review_items (
370 id TEXT PRIMARY KEY,
371 source_key TEXT NOT NULL UNIQUE,
372 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
373 title TEXT NOT NULL,
374 status TEXT NOT NULL CHECK(status IN ('open','resolved','dismissed')),
375 created_at_ms INTEGER NOT NULL,
376 resolved_at_ms INTEGER
377 )",
378 "CREATE TABLE IF NOT EXISTS alert_events (
379 id TEXT PRIMARY KEY,
380 source_key TEXT NOT NULL UNIQUE,
381 name TEXT NOT NULL,
382 severity TEXT NOT NULL CHECK(severity IN ('info','warning','critical')),
383 message TEXT NOT NULL,
384 session_id TEXT REFERENCES sessions(id) ON DELETE SET NULL,
385 created_at_ms INTEGER NOT NULL
386 )",
387 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
388 "CREATE INDEX IF NOT EXISTS cases_status_idx ON cases(status, created_at_ms)",
389 "CREATE INDEX IF NOT EXISTS review_items_status_idx ON review_items(status, created_at_ms)",
390 "CREATE INDEX IF NOT EXISTS alert_events_name_idx ON alert_events(name, created_at_ms)",
391 "CREATE INDEX IF NOT EXISTS tool_spans_session_idx ON tool_spans(session_id)",
392 "CREATE INDEX IF NOT EXISTS tool_spans_started_idx ON tool_spans(started_at_ms)",
393 "CREATE INDEX IF NOT EXISTS tool_spans_ended_idx ON tool_spans(ended_at_ms)",
394 "CREATE INDEX IF NOT EXISTS session_samples_ts_idx ON session_samples(ts_ms)",
395 "CREATE INDEX IF NOT EXISTS events_ts_idx ON events(ts_ms)",
396 "CREATE INDEX IF NOT EXISTS events_ts_session_seq_idx ON events(ts_ms, session_id, seq)",
397 "CREATE INDEX IF NOT EXISTS events_session_ts_seq_idx ON events(session_id, ts_ms, seq)",
398 "CREATE INDEX IF NOT EXISTS events_tool_ts_session_seq_idx ON events(tool, ts_ms DESC, session_id, seq)",
399 "CREATE INDEX IF NOT EXISTS tool_spans_session_started_idx ON tool_spans(session_id, started_at_ms)",
400 "CREATE INDEX IF NOT EXISTS tool_spans_session_ended_idx ON tool_spans(session_id, ended_at_ms)",
401 "CREATE INDEX IF NOT EXISTS tool_span_paths_path_idx ON tool_span_paths(path, span_id)",
402 "CREATE INDEX IF NOT EXISTS feedback_session_idx ON session_feedback(session_id)",
403];
404
405#[derive(Clone)]
407pub struct InsightsStats {
408 pub total_sessions: u64,
409 pub running_sessions: u64,
410 pub total_events: u64,
411 pub sessions_by_day: Vec<(String, u64)>,
413 pub recent: Vec<(SessionRecord, u64)>,
415 pub top_tools: Vec<(String, u64)>,
417 pub total_cost_usd_e6: i64,
418 pub sessions_with_cost: u64,
419}
420
421pub struct SyncStatusSnapshot {
423 pub pending_outbox: u64,
424 pub last_success_ms: Option<u64>,
425 pub last_error: Option<String>,
426 pub consecutive_failures: u32,
427}
428
429#[derive(serde::Serialize)]
431pub struct SummaryStats {
432 pub session_count: u64,
433 pub total_cost_usd_e6: i64,
434 pub by_agent: Vec<(String, u64)>,
435 pub by_model: Vec<(String, u64)>,
436 pub top_tools: Vec<(String, u64)>,
437}
438
439#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
441#[serde(rename_all = "lowercase")]
442pub enum GuidanceKind {
443 Skill,
444 Rule,
445}
446
447#[derive(Clone, Debug, serde::Serialize)]
449pub struct GuidancePerfRow {
450 pub kind: GuidanceKind,
451 pub id: String,
452 pub sessions: u64,
453 pub sessions_pct: f64,
454 pub total_cost_usd_e6: i64,
455 pub avg_cost_per_session_usd: Option<f64>,
456 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
457 pub on_disk: bool,
458}
459
460#[derive(Clone, Debug, serde::Serialize)]
462pub struct GuidanceReport {
463 pub workspace: String,
464 pub window_start_ms: u64,
465 pub window_end_ms: u64,
466 pub sessions_in_window: u64,
467 pub workspace_avg_cost_per_session_usd: Option<f64>,
468 pub rows: Vec<GuidancePerfRow>,
469}
470
471#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
473pub struct PruneStats {
474 pub sessions_removed: u64,
475 pub events_removed: u64,
476}
477
478#[derive(Debug, Clone, Eq, PartialEq)]
480pub struct SessionOutcomeRow {
481 pub session_id: String,
482 pub test_passed: Option<i64>,
483 pub test_failed: Option<i64>,
484 pub test_skipped: Option<i64>,
485 pub build_ok: Option<bool>,
486 pub lint_errors: Option<i64>,
487 pub revert_lines_14d: Option<i64>,
488 pub pr_open: Option<i64>,
489 pub ci_ok: Option<bool>,
490 pub measured_at_ms: u64,
491 pub measure_error: Option<String>,
492}
493
494#[derive(Debug, Clone)]
496pub struct SessionSampleAgg {
497 pub session_id: String,
498 pub sample_count: u64,
499 pub max_cpu_percent: f64,
500 pub max_rss_bytes: u64,
501}
502
503pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
505pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
506pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
507
508pub struct ToolSpanSyncRow {
509 pub span_id: String,
510 pub session_id: String,
511 pub tool: Option<String>,
512 pub tool_call_id: Option<String>,
513 pub status: String,
514 pub started_at_ms: Option<u64>,
515 pub ended_at_ms: Option<u64>,
516 pub lead_time_ms: Option<u64>,
517 pub tokens_in: Option<u32>,
518 pub tokens_out: Option<u32>,
519 pub reasoning_tokens: Option<u32>,
520 pub cost_usd_e6: Option<i64>,
521 pub paths: Vec<String>,
522}
523
524pub(crate) struct CaptureQualityRow {
525 pub source: String,
526 pub has_tokens: bool,
527 pub has_latency: bool,
528 pub has_context: bool,
529}
530
531pub(crate) struct TraceSpanQualityRow {
532 pub kind: String,
533 pub is_orphan: bool,
534}
535
536#[derive(Debug, Clone, Copy, Eq, PartialEq)]
537pub enum StoreOpenMode {
538 ReadWrite,
539 ReadOnlyQuery,
540}
541
542#[derive(Debug, Clone)]
543pub struct SessionStatusRow {
544 pub id: String,
545 pub status: SessionStatus,
546 pub ended_at_ms: Option<u64>,
547}
548
549#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
550pub struct SessionFilter {
551 pub agent_prefix: Option<String>,
552 pub status: Option<SessionStatus>,
553 pub since_ms: Option<u64>,
554}
555
556#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
557pub struct SessionPage {
558 pub rows: Vec<SessionRecord>,
559 pub total: usize,
560 pub next_offset: Option<usize>,
561}
562
563#[derive(Clone)]
564struct SpanTreeCacheEntry {
565 session_id: String,
566 last_event_seq: Option<u64>,
567 nodes: Vec<crate::store::span_tree::SpanNode>,
568}
569
570pub struct Store {
571 conn: Connection,
572 root: PathBuf,
573 hot_log: RefCell<Option<HotLog>>,
574 search_writer: RefCell<Option<crate::search::PendingWriter>>,
575 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
576 projector: RefCell<Projector>,
577}
578
579impl Store {
580 pub(crate) fn conn(&self) -> &Connection {
581 &self.conn
582 }
583
584 pub fn open(path: &Path) -> Result<Self> {
585 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
586 }
587
588 pub fn open_read_only(path: &Path) -> Result<Self> {
589 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
590 }
591
592 pub fn open_query(path: &Path) -> Result<Self> {
593 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
594 }
595
596 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
597 if let Some(parent) = path.parent() {
598 std::fs::create_dir_all(parent)?;
599 }
600 let conn = match mode {
601 StoreOpenMode::ReadWrite => Connection::open(path),
602 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
603 path,
604 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
605 ),
606 }
607 .with_context(|| format!("open db: {}", path.display()))?;
608 apply_pragmas(&conn, mode)?;
609 if mode == StoreOpenMode::ReadWrite {
610 for sql in MIGRATIONS {
611 conn.execute_batch(sql)?;
612 }
613 ensure_schema_columns(&conn)?;
614 }
615 let store = Self {
616 conn,
617 root: path
618 .parent()
619 .unwrap_or_else(|| Path::new("."))
620 .to_path_buf(),
621 hot_log: RefCell::new(None),
622 search_writer: RefCell::new(None),
623 span_tree_cache: RefCell::new(None),
624 projector: RefCell::new(Projector::default()),
625 };
626 if mode == StoreOpenMode::ReadWrite {
627 store.warm_projector()?;
628 }
629 Ok(store)
630 }
631
632 fn invalidate_span_tree_cache(&self) {
633 *self.span_tree_cache.borrow_mut() = None;
634 }
635
636 fn warm_projector(&self) -> Result<()> {
637 let ids = self.running_session_ids()?;
638 let mut projector = self.projector.borrow_mut();
639 for id in ids {
640 for event in self.list_events_for_session(&id)? {
641 let _ = projector.apply(&event);
642 }
643 }
644 Ok(())
645 }
646
647 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
648 self.conn.execute(
649 "INSERT INTO sessions (
650 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
651 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
652 prompt_fingerprint, parent_session_id, agent_version, os, arch,
653 repo_file_count, repo_total_loc
654 )
655 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
656 ?16, ?17, ?18, ?19, ?20, ?21)
657 ON CONFLICT(id) DO UPDATE SET
658 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
659 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
660 status=excluded.status, trace_path=excluded.trace_path,
661 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
662 branch=excluded.branch, dirty_start=excluded.dirty_start,
663 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
664 prompt_fingerprint=excluded.prompt_fingerprint,
665 parent_session_id=excluded.parent_session_id,
666 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
667 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
668 params![
669 s.id,
670 s.agent,
671 s.model,
672 s.workspace,
673 s.started_at_ms as i64,
674 s.ended_at_ms.map(|v| v as i64),
675 format!("{:?}", s.status),
676 s.trace_path,
677 s.start_commit,
678 s.end_commit,
679 s.branch,
680 s.dirty_start.map(bool_to_i64),
681 s.dirty_end.map(bool_to_i64),
682 s.repo_binding_source.clone().unwrap_or_default(),
683 s.prompt_fingerprint.as_deref(),
684 s.parent_session_id.as_deref(),
685 s.agent_version.as_deref(),
686 s.os.as_deref(),
687 s.arch.as_deref(),
688 s.repo_file_count.map(|v| v as i64),
689 s.repo_total_loc.map(|v| v as i64),
690 ],
691 )?;
692 self.conn.execute(
693 "INSERT INTO session_repo_binding (
694 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
695 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
696 ON CONFLICT(session_id) DO UPDATE SET
697 start_commit=excluded.start_commit,
698 end_commit=excluded.end_commit,
699 branch=excluded.branch,
700 dirty_start=excluded.dirty_start,
701 dirty_end=excluded.dirty_end,
702 repo_binding_source=excluded.repo_binding_source",
703 params![
704 s.id,
705 s.start_commit,
706 s.end_commit,
707 s.branch,
708 s.dirty_start.map(bool_to_i64),
709 s.dirty_end.map(bool_to_i64),
710 s.repo_binding_source.clone().unwrap_or_default(),
711 ],
712 )?;
713 Ok(())
714 }
715
716 pub fn ensure_session_stub(
719 &self,
720 id: &str,
721 agent: &str,
722 workspace: &str,
723 started_at_ms: u64,
724 ) -> Result<()> {
725 self.conn.execute(
726 "INSERT OR IGNORE INTO sessions (
727 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
728 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
729 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
730 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
731 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
732 params![id, agent, workspace, started_at_ms as i64],
733 )?;
734 Ok(())
735 }
736
737 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
739 let n: i64 = self.conn.query_row(
740 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
741 [session_id],
742 |r| r.get(0),
743 )?;
744 Ok(n as u64)
745 }
746
747 pub fn append_event(&self, e: &Event) -> Result<()> {
748 self.append_event_with_sync(e, None)
749 }
750
751 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
753 let last_before = if projector_legacy_mode() {
754 None
755 } else {
756 self.last_event_seq_for_session(&e.session_id)?
757 };
758 let payload = serde_json::to_string(&e.payload)?;
759 self.conn.execute(
760 "INSERT INTO events (
761 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
762 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
763 stop_reason, latency_ms, ttft_ms, retry_count,
764 context_used_tokens, context_max_tokens,
765 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
766 ) VALUES (
767 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
768 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
769 )
770 ON CONFLICT(session_id, seq) DO UPDATE SET
771 ts_ms = excluded.ts_ms,
772 ts_exact = excluded.ts_exact,
773 kind = excluded.kind,
774 source = excluded.source,
775 tool = excluded.tool,
776 tool_call_id = excluded.tool_call_id,
777 tokens_in = excluded.tokens_in,
778 tokens_out = excluded.tokens_out,
779 reasoning_tokens = excluded.reasoning_tokens,
780 cost_usd_e6 = excluded.cost_usd_e6,
781 payload = excluded.payload,
782 stop_reason = excluded.stop_reason,
783 latency_ms = excluded.latency_ms,
784 ttft_ms = excluded.ttft_ms,
785 retry_count = excluded.retry_count,
786 context_used_tokens = excluded.context_used_tokens,
787 context_max_tokens = excluded.context_max_tokens,
788 cache_creation_tokens = excluded.cache_creation_tokens,
789 cache_read_tokens = excluded.cache_read_tokens,
790 system_prompt_tokens = excluded.system_prompt_tokens",
791 params![
792 e.session_id,
793 e.seq as i64,
794 e.ts_ms as i64,
795 bool_to_i64(e.ts_exact),
796 format!("{:?}", e.kind),
797 format!("{:?}", e.source),
798 e.tool,
799 e.tool_call_id,
800 e.tokens_in.map(|v| v as i64),
801 e.tokens_out.map(|v| v as i64),
802 e.reasoning_tokens.map(|v| v as i64),
803 e.cost_usd_e6,
804 payload,
805 e.stop_reason,
806 e.latency_ms.map(|v| v as i64),
807 e.ttft_ms.map(|v| v as i64),
808 e.retry_count.map(|v| v as i64),
809 e.context_used_tokens.map(|v| v as i64),
810 e.context_max_tokens.map(|v| v as i64),
811 e.cache_creation_tokens.map(|v| v as i64),
812 e.cache_read_tokens.map(|v| v as i64),
813 e.system_prompt_tokens.map(|v| v as i64),
814 ],
815 )?;
816 if self.conn.changes() == 0 {
817 return Ok(());
818 }
819 self.append_hot_event(e)?;
820 if projector_legacy_mode() {
821 index_event_derived(&self.conn, e)?;
822 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
823 self.invalidate_span_tree_cache();
824 } else if last_before.is_some_and(|last| e.seq <= last) {
825 self.replay_projector_session(&e.session_id)?;
826 } else {
827 let deltas = self.projector.borrow_mut().apply(e);
828 self.apply_projector_events(&deltas)?;
829 let expired = self
830 .projector
831 .borrow_mut()
832 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
833 self.apply_projector_events(&expired)?;
834 if is_stop_event(e) {
835 let flushed = self
836 .projector
837 .borrow_mut()
838 .flush_session(&e.session_id, e.ts_ms);
839 self.apply_projector_events(&flushed)?;
840 }
841 self.invalidate_span_tree_cache();
842 }
843 self.append_search_event(e);
844 let Some(ctx) = ctx else {
845 return Ok(());
846 };
847 let sync = &ctx.sync;
848 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
849 return Ok(());
850 }
851 let Some(salt) = try_team_salt(sync) else {
852 tracing::warn!(
853 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
854 );
855 return Ok(());
856 };
857 if sync.sample_rate < 1.0 {
858 let u: f64 = rand::random();
859 if u > sync.sample_rate {
860 return Ok(());
861 }
862 }
863 let Some(session) = self.get_session(&e.session_id)? else {
864 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
865 return Ok(());
866 };
867 let mut outbound = outbound_event_from_row(e, &session, &salt);
868 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
869 let row = serde_json::to_string(&outbound)?;
870 self.outbox()?.append(&e.session_id, "events", &row)?;
871 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
872 Ok(())
873 }
874
875 fn append_hot_event(&self, e: &Event) -> Result<()> {
876 if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
877 return Ok(());
878 }
879 let mut slot = self.hot_log.borrow_mut();
880 if slot.is_none() {
881 *slot = Some(HotLog::open(&self.root)?);
882 }
883 if let Some(log) = slot.as_mut() {
884 log.append(e)?;
885 }
886 Ok(())
887 }
888
889 fn append_search_event(&self, e: &Event) {
890 if let Err(err) = self.try_append_search_event(e) {
891 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
892 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
893 }
894 }
895
896 fn try_append_search_event(&self, e: &Event) -> Result<()> {
897 let Some(session) = self.get_session(&e.session_id)? else {
898 return Ok(());
899 };
900 let workspace = PathBuf::from(&session.workspace);
901 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
902 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
903 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
904 return Ok(());
905 };
906 let mut slot = self.search_writer.borrow_mut();
907 if slot.is_none() {
908 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
909 }
910 slot.as_mut().expect("writer").add(&doc)
911 }
912
913 pub fn flush_search(&self) -> Result<()> {
914 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
915 writer.commit()?;
916 }
917 Ok(())
918 }
919
920 fn outbox(&self) -> Result<Outbox> {
921 Outbox::open(&self.root)
922 }
923
924 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
925 if projector_legacy_mode() {
926 rebuild_tool_spans_for_session(&self.conn, session_id)?;
927 self.invalidate_span_tree_cache();
928 return Ok(());
929 }
930 let deltas = self
931 .projector
932 .borrow_mut()
933 .flush_session(session_id, now_ms);
934 if self.apply_projector_events(&deltas)? {
935 self.invalidate_span_tree_cache();
936 }
937 Ok(())
938 }
939
940 fn replay_projector_session(&self, session_id: &str) -> Result<()> {
941 clear_session_spans(&self.conn, session_id)?;
942 self.projector.borrow_mut().reset_session(session_id);
943 let events = self.list_events_for_session(session_id)?;
944 let mut changed = false;
945 for event in &events {
946 let deltas = self.projector.borrow_mut().apply(event);
947 changed |= self.apply_projector_events(&deltas)?;
948 }
949 if self
950 .get_session(session_id)?
951 .is_some_and(|session| session.status == SessionStatus::Done)
952 {
953 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
954 let deltas = self
955 .projector
956 .borrow_mut()
957 .flush_session(session_id, now_ms);
958 changed |= self.apply_projector_events(&deltas)?;
959 }
960 if changed {
961 self.invalidate_span_tree_cache();
962 }
963 Ok(())
964 }
965
966 fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
967 let mut changed = false;
968 for delta in deltas {
969 match delta {
970 ProjectorEvent::SpanClosed(span, sample) => {
971 upsert_tool_span_record(&self.conn, span)?;
972 tracing::debug!(
973 session_id = %sample.session_id,
974 span_id = %sample.span_id,
975 tool = ?sample.tool,
976 lead_time_ms = ?sample.lead_time_ms,
977 tokens_in = ?sample.tokens_in,
978 tokens_out = ?sample.tokens_out,
979 reasoning_tokens = ?sample.reasoning_tokens,
980 cost_usd_e6 = ?sample.cost_usd_e6,
981 paths = ?sample.paths,
982 "tool span closed"
983 );
984 changed = true;
985 }
986 ProjectorEvent::SpanPatched(span) => {
987 upsert_tool_span_record(&self.conn, span)?;
988 changed = true;
989 }
990 ProjectorEvent::FileTouched { session, path } => {
991 self.conn.execute(
992 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
993 params![session, path],
994 )?;
995 changed = true;
996 }
997 ProjectorEvent::SkillUsed { session, skill } => {
998 self.conn.execute(
999 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
1000 params![session, skill],
1001 )?;
1002 changed = true;
1003 }
1004 ProjectorEvent::RuleUsed { session, rule } => {
1005 self.conn.execute(
1006 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
1007 params![session, rule],
1008 )?;
1009 changed = true;
1010 }
1011 }
1012 }
1013 Ok(changed)
1014 }
1015
1016 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
1017 let rows = self.outbox()?.list_pending(limit)?;
1018 if !rows.is_empty() {
1019 return Ok(rows);
1020 }
1021 let mut stmt = self.conn.prepare(
1022 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
1023 )?;
1024 let rows = stmt.query_map(params![limit as i64], |row| {
1025 Ok((
1026 row.get::<_, i64>(0)?,
1027 row.get::<_, String>(1)?,
1028 row.get::<_, String>(2)?,
1029 ))
1030 })?;
1031 let mut out = Vec::new();
1032 for r in rows {
1033 out.push(r?);
1034 }
1035 Ok(out)
1036 }
1037
1038 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
1039 self.outbox()?.delete_ids(ids)?;
1040 for id in ids {
1041 self.conn
1042 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
1043 }
1044 Ok(())
1045 }
1046
1047 pub fn replace_outbox_rows(
1048 &self,
1049 owner_id: &str,
1050 kind: &str,
1051 payloads: &[String],
1052 ) -> Result<()> {
1053 self.outbox()?.replace(owner_id, kind, payloads)?;
1054 self.conn.execute(
1055 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
1056 params![owner_id, kind],
1057 )?;
1058 for payload in payloads {
1059 self.conn.execute(
1060 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
1061 params![owner_id, kind, payload],
1062 )?;
1063 }
1064 Ok(())
1065 }
1066
1067 pub fn outbox_pending_count(&self) -> Result<u64> {
1068 let redb = self.outbox()?.pending_count()?;
1069 if redb > 0 {
1070 return Ok(redb);
1071 }
1072 let c: i64 =
1073 self.conn
1074 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
1075 r.get(0)
1076 })?;
1077 Ok(c as u64)
1078 }
1079
1080 pub fn set_sync_state_ok(&self) -> Result<()> {
1081 let now = now_ms().to_string();
1082 self.conn.execute(
1083 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
1084 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1085 params![now],
1086 )?;
1087 self.conn.execute(
1088 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
1089 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1090 [],
1091 )?;
1092 self.conn
1093 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
1094 Ok(())
1095 }
1096
1097 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
1098 let prev: i64 = self
1099 .conn
1100 .query_row(
1101 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1102 [],
1103 |r| {
1104 let s: String = r.get(0)?;
1105 Ok(s.parse::<i64>().unwrap_or(0))
1106 },
1107 )
1108 .optional()?
1109 .unwrap_or(0);
1110 let next = prev.saturating_add(1);
1111 self.conn.execute(
1112 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
1113 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1114 params![msg],
1115 )?;
1116 self.conn.execute(
1117 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
1118 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1119 params![next.to_string()],
1120 )?;
1121 Ok(())
1122 }
1123
1124 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
1125 let pending_outbox = self.outbox_pending_count()?;
1126 let last_success_ms = self
1127 .conn
1128 .query_row(
1129 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
1130 [],
1131 |r| r.get::<_, String>(0),
1132 )
1133 .optional()?
1134 .and_then(|s| s.parse().ok());
1135 let last_error = self
1136 .conn
1137 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
1138 r.get::<_, String>(0)
1139 })
1140 .optional()?;
1141 let consecutive_failures = self
1142 .conn
1143 .query_row(
1144 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
1145 [],
1146 |r| r.get::<_, String>(0),
1147 )
1148 .optional()?
1149 .and_then(|s| s.parse().ok())
1150 .unwrap_or(0);
1151 Ok(SyncStatusSnapshot {
1152 pending_outbox,
1153 last_success_ms,
1154 last_error,
1155 consecutive_failures,
1156 })
1157 }
1158
1159 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
1160 let row: Option<String> = self
1161 .conn
1162 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
1163 r.get::<_, String>(0)
1164 })
1165 .optional()?;
1166 Ok(row.and_then(|s| s.parse().ok()))
1167 }
1168
1169 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
1170 self.conn.execute(
1171 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
1172 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1173 params![key, v.to_string()],
1174 )?;
1175 Ok(())
1176 }
1177
1178 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
1180 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
1181 let old_ids = old_session_ids(&tx, cutoff_ms)?;
1182 let sessions_to_remove: i64 = tx.query_row(
1183 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
1184 params![cutoff_ms],
1185 |r| r.get(0),
1186 )?;
1187 let events_to_remove: i64 = tx.query_row(
1188 "SELECT COUNT(*) FROM events WHERE session_id IN \
1189 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
1190 params![cutoff_ms],
1191 |r| r.get(0),
1192 )?;
1193
1194 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
1195 tx.execute(
1196 &format!(
1197 "DELETE FROM tool_span_paths WHERE span_id IN \
1198 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
1199 ),
1200 params![cutoff_ms],
1201 )?;
1202 tx.execute(
1203 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
1204 params![cutoff_ms],
1205 )?;
1206 tx.execute(
1207 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
1208 params![cutoff_ms],
1209 )?;
1210 tx.execute(
1211 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
1212 params![cutoff_ms],
1213 )?;
1214 tx.execute(
1215 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
1216 params![cutoff_ms],
1217 )?;
1218 tx.execute(
1219 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
1220 params![cutoff_ms],
1221 )?;
1222 tx.execute(
1223 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
1224 params![cutoff_ms],
1225 )?;
1226 tx.execute(
1227 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
1228 params![cutoff_ms],
1229 )?;
1230 tx.execute(
1231 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
1232 params![cutoff_ms],
1233 )?;
1234 tx.execute(
1235 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
1236 params![cutoff_ms],
1237 )?;
1238 tx.execute(
1239 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
1240 params![cutoff_ms],
1241 )?;
1242 tx.execute(
1243 "DELETE FROM sessions WHERE started_at_ms < ?1",
1244 params![cutoff_ms],
1245 )?;
1246 tx.commit()?;
1247 if let Some(mut writer) = self.search_writer.borrow_mut().take() {
1248 let _ = writer.commit();
1249 }
1250 if let Err(err) = crate::search::delete_sessions(&self.root, &old_ids) {
1251 tracing::warn!("search prune skipped: {err:#}");
1252 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
1253 }
1254 self.invalidate_span_tree_cache();
1255 Ok(PruneStats {
1256 sessions_removed: sessions_to_remove as u64,
1257 events_removed: events_to_remove as u64,
1258 })
1259 }
1260
1261 pub fn vacuum(&self) -> Result<()> {
1263 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
1264 Ok(())
1265 }
1266
1267 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
1268 Ok(self
1269 .list_sessions_page(workspace, 0, i64::MAX as usize, SessionFilter::default())?
1270 .rows)
1271 }
1272
1273 pub fn list_sessions_page(
1274 &self,
1275 workspace: &str,
1276 offset: usize,
1277 limit: usize,
1278 filter: SessionFilter,
1279 ) -> Result<SessionPage> {
1280 let (where_sql, args) = session_filter_sql(workspace, &filter);
1281 let total = self.query_session_page_count(&where_sql, &args)?;
1282 let rows = self.query_session_page_rows(&where_sql, &args, offset, limit)?;
1283 let next = offset.saturating_add(rows.len());
1284 Ok(SessionPage {
1285 rows,
1286 total,
1287 next_offset: (next < total).then_some(next),
1288 })
1289 }
1290
1291 fn query_session_page_count(&self, where_sql: &str, args: &[Value]) -> Result<usize> {
1292 let sql = format!("SELECT COUNT(*) FROM sessions {where_sql}");
1293 let total: i64 = self
1294 .conn
1295 .query_row(&sql, params_from_iter(args.iter()), |r| r.get(0))?;
1296 Ok(total as usize)
1297 }
1298
1299 fn query_session_page_rows(
1300 &self,
1301 where_sql: &str,
1302 args: &[Value],
1303 offset: usize,
1304 limit: usize,
1305 ) -> Result<Vec<SessionRecord>> {
1306 let sql = format!(
1307 "{SESSION_SELECT} {where_sql} ORDER BY started_at_ms DESC, id ASC LIMIT ? OFFSET ?"
1308 );
1309 let mut values = args.to_vec();
1310 values.push(Value::Integer(limit.min(i64::MAX as usize) as i64));
1311 values.push(Value::Integer(offset.min(i64::MAX as usize) as i64));
1312 let mut stmt = self.conn.prepare(&sql)?;
1313 let rows = stmt.query_map(params_from_iter(values.iter()), session_row)?;
1314 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1315 }
1316
1317 pub fn list_sessions_started_after(
1318 &self,
1319 workspace: &str,
1320 after_started_at_ms: u64,
1321 ) -> Result<Vec<SessionRecord>> {
1322 let mut stmt = self.conn.prepare(
1323 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1324 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1325 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1326 repo_file_count, repo_total_loc
1327 FROM sessions
1328 WHERE workspace = ?1 AND started_at_ms > ?2
1329 ORDER BY started_at_ms DESC, id ASC",
1330 )?;
1331 let rows = stmt.query_map(params![workspace, after_started_at_ms as i64], session_row)?;
1332 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1333 }
1334
1335 pub fn session_statuses(&self, ids: &[String]) -> Result<Vec<SessionStatusRow>> {
1336 if ids.is_empty() {
1337 return Ok(Vec::new());
1338 }
1339 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1340 let sql =
1341 format!("SELECT id, status, ended_at_ms FROM sessions WHERE id IN ({placeholders})");
1342 let mut stmt = self.conn.prepare(&sql)?;
1343 let params: Vec<&dyn rusqlite::ToSql> =
1344 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1345 let rows = stmt.query_map(params.as_slice(), |r| {
1346 let status: String = r.get(1)?;
1347 Ok(SessionStatusRow {
1348 id: r.get(0)?,
1349 status: status_from_str(&status),
1350 ended_at_ms: r.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1351 })
1352 })?;
1353 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1354 }
1355
1356 fn running_session_ids(&self) -> Result<Vec<String>> {
1357 let mut stmt = self
1358 .conn
1359 .prepare("SELECT id FROM sessions WHERE status != 'Done' ORDER BY started_at_ms ASC")?;
1360 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1361 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1362 }
1363
1364 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
1365 let session_count: i64 = self.conn.query_row(
1366 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
1367 params![workspace],
1368 |r| r.get(0),
1369 )?;
1370
1371 let total_cost: i64 = self.conn.query_row(
1372 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
1373 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
1374 params![workspace],
1375 |r| r.get(0),
1376 )?;
1377
1378 let mut stmt = self.conn.prepare(
1379 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
1380 )?;
1381 let by_agent: Vec<(String, u64)> = stmt
1382 .query_map(params![workspace], |r| {
1383 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1384 })?
1385 .filter_map(|r| r.ok())
1386 .collect();
1387
1388 let mut stmt = self.conn.prepare(
1389 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
1390 )?;
1391 let by_model: Vec<(String, u64)> = stmt
1392 .query_map(params![workspace], |r| {
1393 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1394 })?
1395 .filter_map(|r| r.ok())
1396 .collect();
1397
1398 let mut stmt = self.conn.prepare(
1399 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
1400 WHERE s.workspace = ?1 AND tool IS NOT NULL
1401 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
1402 )?;
1403 let top_tools: Vec<(String, u64)> = stmt
1404 .query_map(params![workspace], |r| {
1405 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1406 })?
1407 .filter_map(|r| r.ok())
1408 .collect();
1409
1410 Ok(SummaryStats {
1411 session_count: session_count as u64,
1412 total_cost_usd_e6: total_cost,
1413 by_agent,
1414 by_model,
1415 top_tools,
1416 })
1417 }
1418
1419 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
1420 self.list_events_page(session_id, 0, i64::MAX as usize)
1421 }
1422
1423 pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
1424 let mut stmt = self.conn.prepare(
1425 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1426 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1427 stop_reason, latency_ms, ttft_ms, retry_count,
1428 context_used_tokens, context_max_tokens,
1429 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1430 FROM events WHERE session_id = ?1 AND seq = ?2",
1431 )?;
1432 stmt.query_row(params![session_id, seq as i64], event_row)
1433 .optional()
1434 .map_err(Into::into)
1435 }
1436
1437 pub fn search_tool_events(
1438 &self,
1439 workspace: &str,
1440 tool: &str,
1441 since_ms: Option<u64>,
1442 agent: Option<&str>,
1443 limit: usize,
1444 ) -> Result<Vec<(String, Event)>> {
1445 let mut stmt = self.conn.prepare(
1446 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1447 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1448 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1449 e.context_used_tokens, e.context_max_tokens,
1450 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
1451 s.agent
1452 FROM events e JOIN sessions s ON s.id = e.session_id
1453 WHERE e.tool = ?2
1454 AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
1455 AND (?3 IS NULL OR e.ts_ms >= ?3)
1456 AND (?4 IS NULL OR s.agent = ?4)
1457 ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
1458 LIMIT ?5",
1459 )?;
1460 let since = since_ms.map(|v| v as i64);
1461 let rows = stmt.query_map(
1462 params![workspace, tool, since, agent, limit as i64],
1463 search_tool_event_row,
1464 )?;
1465 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1466 }
1467
1468 pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
1469 let mut out = Vec::new();
1470 for session in self.list_sessions(workspace)? {
1471 for event in self.list_events_for_session(&session.id)? {
1472 out.push((session.clone(), event));
1473 }
1474 }
1475 out.sort_by(|a, b| {
1476 (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
1477 });
1478 Ok(out)
1479 }
1480
1481 pub fn list_events_page(
1482 &self,
1483 session_id: &str,
1484 after_seq: u64,
1485 limit: usize,
1486 ) -> Result<Vec<Event>> {
1487 let mut stmt = self.conn.prepare(
1488 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
1489 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
1490 stop_reason, latency_ms, ttft_ms, retry_count,
1491 context_used_tokens, context_max_tokens,
1492 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
1493 FROM events
1494 WHERE session_id = ?1 AND seq >= ?2
1495 ORDER BY seq ASC LIMIT ?3",
1496 )?;
1497 let rows = stmt.query_map(
1498 params![
1499 session_id,
1500 after_seq as i64,
1501 limit.min(i64::MAX as usize) as i64
1502 ],
1503 event_row,
1504 )?;
1505 let mut events = Vec::new();
1506 for row in rows {
1507 events.push(row?);
1508 }
1509 Ok(events)
1510 }
1511
1512 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1514 self.conn.execute(
1515 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1516 params![format!("{:?}", status), id],
1517 )?;
1518 Ok(())
1519 }
1520
1521 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1523 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1524 Ok(InsightsStats {
1525 total_sessions: count_q(
1526 &self.conn,
1527 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1528 workspace,
1529 )?,
1530 running_sessions: count_q(
1531 &self.conn,
1532 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1533 workspace,
1534 )?,
1535 total_events: count_q(
1536 &self.conn,
1537 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1538 workspace,
1539 )?,
1540 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1541 recent: recent_sessions_3(&self.conn, workspace)?,
1542 top_tools: top_tools_5(&self.conn, workspace)?,
1543 total_cost_usd_e6,
1544 sessions_with_cost,
1545 })
1546 }
1547
1548 pub fn retro_events_in_window(
1550 &self,
1551 workspace: &str,
1552 start_ms: u64,
1553 end_ms: u64,
1554 ) -> Result<Vec<(SessionRecord, Event)>> {
1555 let mut stmt = self.conn.prepare(
1556 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1557 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1558 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1559 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1560 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1561 s.repo_file_count, s.repo_total_loc,
1562 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1563 e.context_used_tokens, e.context_max_tokens,
1564 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1565 FROM events e
1566 JOIN sessions s ON s.id = e.session_id
1567 WHERE s.workspace = ?1
1568 AND (
1569 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1570 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1571 )
1572 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1573 )?;
1574 let rows = stmt.query_map(
1575 params![
1576 workspace,
1577 start_ms as i64,
1578 end_ms as i64,
1579 SYNTHETIC_TS_CEILING_MS,
1580 ],
1581 |row| {
1582 let payload_str: String = row.get(12)?;
1583 let status_str: String = row.get(19)?;
1584 Ok((
1585 SessionRecord {
1586 id: row.get(13)?,
1587 agent: row.get(14)?,
1588 model: row.get(15)?,
1589 workspace: row.get(16)?,
1590 started_at_ms: row.get::<_, i64>(17)? as u64,
1591 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1592 status: status_from_str(&status_str),
1593 trace_path: row.get(20)?,
1594 start_commit: row.get(21)?,
1595 end_commit: row.get(22)?,
1596 branch: row.get(23)?,
1597 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1598 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1599 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1600 prompt_fingerprint: row.get(27)?,
1601 parent_session_id: row.get(28)?,
1602 agent_version: row.get(29)?,
1603 os: row.get(30)?,
1604 arch: row.get(31)?,
1605 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1606 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1607 },
1608 Event {
1609 session_id: row.get(0)?,
1610 seq: row.get::<_, i64>(1)? as u64,
1611 ts_ms: row.get::<_, i64>(2)? as u64,
1612 ts_exact: row.get::<_, i64>(3)? != 0,
1613 kind: kind_from_str(&row.get::<_, String>(4)?),
1614 source: source_from_str(&row.get::<_, String>(5)?),
1615 tool: row.get(6)?,
1616 tool_call_id: row.get(7)?,
1617 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1618 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1619 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1620 cost_usd_e6: row.get(11)?,
1621 payload: serde_json::from_str(&payload_str)
1622 .unwrap_or(serde_json::Value::Null),
1623 stop_reason: row.get(34)?,
1624 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1625 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1626 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1627 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1628 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1629 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1630 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1631 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1632 },
1633 ))
1634 },
1635 )?;
1636
1637 let mut out = Vec::new();
1638 for r in rows {
1639 out.push(r?);
1640 }
1641 Ok(out)
1642 }
1643
1644 pub fn experiment_metric_values_in_window(
1645 &self,
1646 workspace: &str,
1647 start_ms: u64,
1648 end_ms: u64,
1649 metric: crate::experiment::types::Metric,
1650 ) -> Result<Vec<(SessionRecord, f64)>> {
1651 use crate::experiment::types::Metric;
1652 let session_cols = "s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1653 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start,
1654 s.dirty_end, s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id,
1655 s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc";
1656 let window = "s.workspace = ?1 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1657 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))";
1658 let sql = match metric {
1659 Metric::TokensPerSession => format!(
1660 "SELECT {session_cols},
1661 SUM(COALESCE(e.tokens_in,0)+COALESCE(e.tokens_out,0)+COALESCE(e.reasoning_tokens,0)) AS value
1662 FROM sessions s JOIN events e ON e.session_id = s.id
1663 WHERE {window}
1664 GROUP BY s.id"
1665 ),
1666 Metric::CostPerSession => format!(
1667 "SELECT {session_cols}, SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 AS value
1668 FROM sessions s JOIN events e ON e.session_id = s.id
1669 WHERE {window}
1670 GROUP BY s.id"
1671 ),
1672 Metric::SuccessRate => format!(
1673 "SELECT {session_cols},
1674 CASE WHEN SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END) > 0 THEN 0.0 ELSE 1.0 END AS value
1675 FROM sessions s JOIN events e ON e.session_id = s.id
1676 WHERE {window}
1677 GROUP BY s.id"
1678 ),
1679 Metric::DurationMinutes => format!(
1680 "SELECT {session_cols},
1681 (s.ended_at_ms - s.started_at_ms) / 60000.0 AS value
1682 FROM sessions s
1683 WHERE s.workspace = ?1
1684 AND s.ended_at_ms IS NOT NULL
1685 AND EXISTS (
1686 SELECT 1 FROM events e
1687 WHERE e.session_id = s.id
1688 AND ((e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1689 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3))
1690 )"
1691 ),
1692 Metric::FilesPerSession => format!(
1693 "SELECT {session_cols}, COUNT(DISTINCT ft.path) AS value
1694 FROM sessions s
1695 JOIN events e ON e.session_id = s.id
1696 LEFT JOIN files_touched ft ON ft.session_id = s.id
1697 WHERE {window}
1698 GROUP BY s.id"
1699 ),
1700 Metric::SuccessRateByPrompt => format!(
1701 "SELECT {session_cols},
1702 1.0 - (MIN(
1703 SUM(CASE WHEN e.kind='Error' THEN 1 ELSE 0 END),
1704 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)
1705 ) * 1.0 / SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END)) AS value
1706 FROM sessions s JOIN events e ON e.session_id = s.id
1707 WHERE {window}
1708 GROUP BY s.id
1709 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1710 ),
1711 Metric::CostByPrompt => format!(
1712 "SELECT {session_cols},
1713 SUM(COALESCE(e.cost_usd_e6,0)) / 1000000.0 /
1714 SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) AS value
1715 FROM sessions s JOIN events e ON e.session_id = s.id
1716 WHERE {window}
1717 GROUP BY s.id
1718 HAVING SUM(CASE WHEN e.kind='Message' THEN 1 ELSE 0 END) > 0"
1719 ),
1720 Metric::ToolLoops => format!(
1721 "WITH calls AS (
1722 SELECT e.session_id, e.tool,
1723 LAG(e.tool) OVER (PARTITION BY e.session_id ORDER BY e.ts_ms, e.seq) AS prev_tool
1724 FROM events e JOIN sessions s ON s.id = e.session_id
1725 WHERE {window} AND e.kind='ToolCall' AND e.tool IS NOT NULL
1726 )
1727 SELECT {session_cols},
1728 SUM(CASE WHEN calls.tool = calls.prev_tool THEN 1 ELSE 0 END) AS value
1729 FROM sessions s JOIN calls ON calls.session_id = s.id
1730 GROUP BY s.id"
1731 ),
1732 };
1733 let mut stmt = self.conn.prepare(&sql)?;
1734 let rows = stmt.query_map(
1735 params![
1736 workspace,
1737 start_ms as i64,
1738 end_ms as i64,
1739 SYNTHETIC_TS_CEILING_MS,
1740 ],
1741 |row| Ok((session_row(row)?, row.get::<_, f64>(21)?)),
1742 )?;
1743 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1744 }
1745
1746 pub fn files_touched_in_window(
1748 &self,
1749 workspace: &str,
1750 start_ms: u64,
1751 end_ms: u64,
1752 ) -> Result<Vec<(String, String)>> {
1753 let mut stmt = self.conn.prepare(
1754 "SELECT DISTINCT ft.session_id, ft.path
1755 FROM files_touched ft
1756 JOIN sessions s ON s.id = ft.session_id
1757 WHERE s.workspace = ?1
1758 AND EXISTS (
1759 SELECT 1 FROM events e
1760 JOIN sessions ss ON ss.id = e.session_id
1761 WHERE e.session_id = ft.session_id
1762 AND (
1763 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1764 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1765 )
1766 )
1767 ORDER BY ft.session_id, ft.path",
1768 )?;
1769 let out: Vec<(String, String)> = stmt
1770 .query_map(
1771 params![
1772 workspace,
1773 start_ms as i64,
1774 end_ms as i64,
1775 SYNTHETIC_TS_CEILING_MS,
1776 ],
1777 |r| Ok((r.get(0)?, r.get(1)?)),
1778 )?
1779 .filter_map(|r| r.ok())
1780 .collect();
1781 Ok(out)
1782 }
1783
1784 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1787 let mut stmt = self.conn.prepare(
1788 "SELECT DISTINCT su.skill
1789 FROM skills_used su
1790 JOIN sessions s ON s.id = su.session_id
1791 WHERE s.workspace = ?1
1792 AND EXISTS (
1793 SELECT 1 FROM events e
1794 JOIN sessions ss ON ss.id = e.session_id
1795 WHERE e.session_id = su.session_id
1796 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1797 )
1798 ORDER BY su.skill",
1799 )?;
1800 let out: Vec<String> = stmt
1801 .query_map(
1802 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1803 |r| r.get::<_, String>(0),
1804 )?
1805 .filter_map(|r| r.ok())
1806 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1807 .collect();
1808 Ok(out)
1809 }
1810
1811 pub fn skills_used_in_window(
1813 &self,
1814 workspace: &str,
1815 start_ms: u64,
1816 end_ms: u64,
1817 ) -> Result<Vec<(String, String)>> {
1818 let mut stmt = self.conn.prepare(
1819 "SELECT DISTINCT su.session_id, su.skill
1820 FROM skills_used su
1821 JOIN sessions s ON s.id = su.session_id
1822 WHERE s.workspace = ?1
1823 AND EXISTS (
1824 SELECT 1 FROM events e
1825 JOIN sessions ss ON ss.id = e.session_id
1826 WHERE e.session_id = su.session_id
1827 AND (
1828 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1829 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1830 )
1831 )
1832 ORDER BY su.session_id, su.skill",
1833 )?;
1834 let out: Vec<(String, String)> = stmt
1835 .query_map(
1836 params![
1837 workspace,
1838 start_ms as i64,
1839 end_ms as i64,
1840 SYNTHETIC_TS_CEILING_MS,
1841 ],
1842 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1843 )?
1844 .filter_map(|r| r.ok())
1845 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1846 .collect();
1847 Ok(out)
1848 }
1849
1850 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1852 let mut stmt = self.conn.prepare(
1853 "SELECT DISTINCT ru.rule
1854 FROM rules_used ru
1855 JOIN sessions s ON s.id = ru.session_id
1856 WHERE s.workspace = ?1
1857 AND EXISTS (
1858 SELECT 1 FROM events e
1859 JOIN sessions ss ON ss.id = e.session_id
1860 WHERE e.session_id = ru.session_id
1861 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1862 )
1863 ORDER BY ru.rule",
1864 )?;
1865 let out: Vec<String> = stmt
1866 .query_map(
1867 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1868 |r| r.get::<_, String>(0),
1869 )?
1870 .filter_map(|r| r.ok())
1871 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1872 .collect();
1873 Ok(out)
1874 }
1875
1876 pub fn rules_used_in_window(
1878 &self,
1879 workspace: &str,
1880 start_ms: u64,
1881 end_ms: u64,
1882 ) -> Result<Vec<(String, String)>> {
1883 let mut stmt = self.conn.prepare(
1884 "SELECT DISTINCT ru.session_id, ru.rule
1885 FROM rules_used ru
1886 JOIN sessions s ON s.id = ru.session_id
1887 WHERE s.workspace = ?1
1888 AND EXISTS (
1889 SELECT 1 FROM events e
1890 JOIN sessions ss ON ss.id = e.session_id
1891 WHERE e.session_id = ru.session_id
1892 AND (
1893 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1894 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1895 )
1896 )
1897 ORDER BY ru.session_id, ru.rule",
1898 )?;
1899 let out: Vec<(String, String)> = stmt
1900 .query_map(
1901 params![
1902 workspace,
1903 start_ms as i64,
1904 end_ms as i64,
1905 SYNTHETIC_TS_CEILING_MS,
1906 ],
1907 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1908 )?
1909 .filter_map(|r| r.ok())
1910 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1911 .collect();
1912 Ok(out)
1913 }
1914
1915 pub fn sessions_active_in_window(
1917 &self,
1918 workspace: &str,
1919 start_ms: u64,
1920 end_ms: u64,
1921 ) -> Result<HashSet<String>> {
1922 let mut stmt = self.conn.prepare(
1923 "SELECT DISTINCT s.id
1924 FROM sessions s
1925 WHERE s.workspace = ?1
1926 AND EXISTS (
1927 SELECT 1 FROM events e
1928 WHERE e.session_id = s.id
1929 AND (
1930 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1931 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1932 )
1933 )",
1934 )?;
1935 let out: HashSet<String> = stmt
1936 .query_map(
1937 params![
1938 workspace,
1939 start_ms as i64,
1940 end_ms as i64,
1941 SYNTHETIC_TS_CEILING_MS,
1942 ],
1943 |r| r.get(0),
1944 )?
1945 .filter_map(|r| r.ok())
1946 .collect();
1947 Ok(out)
1948 }
1949
1950 pub fn session_costs_usd_e6_in_window(
1952 &self,
1953 workspace: &str,
1954 start_ms: u64,
1955 end_ms: u64,
1956 ) -> Result<HashMap<String, i64>> {
1957 let mut stmt = self.conn.prepare(
1958 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1959 FROM events e
1960 JOIN sessions s ON s.id = e.session_id
1961 WHERE s.workspace = ?1
1962 AND (
1963 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1964 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1965 )
1966 GROUP BY e.session_id",
1967 )?;
1968 let rows: Vec<(String, i64)> = stmt
1969 .query_map(
1970 params![
1971 workspace,
1972 start_ms as i64,
1973 end_ms as i64,
1974 SYNTHETIC_TS_CEILING_MS,
1975 ],
1976 |r| Ok((r.get(0)?, r.get(1)?)),
1977 )?
1978 .filter_map(|r| r.ok())
1979 .collect();
1980 Ok(rows.into_iter().collect())
1981 }
1982
1983 pub fn guidance_report(
1985 &self,
1986 workspace: &str,
1987 window_start_ms: u64,
1988 window_end_ms: u64,
1989 skill_slugs_on_disk: &HashSet<String>,
1990 rule_slugs_on_disk: &HashSet<String>,
1991 ) -> Result<GuidanceReport> {
1992 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1993 let denom = active.len() as u64;
1994 let costs =
1995 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1996
1997 let workspace_avg_cost_per_session_usd = if denom > 0 {
1998 let total_e6: i64 = active
1999 .iter()
2000 .map(|sid| costs.get(sid).copied().unwrap_or(0))
2001 .sum();
2002 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
2003 } else {
2004 None
2005 };
2006
2007 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
2008 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
2009 skill_sessions.entry(skill).or_default().insert(sid);
2010 }
2011 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
2012 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
2013 rule_sessions.entry(rule).or_default().insert(sid);
2014 }
2015
2016 let mut rows: Vec<GuidancePerfRow> = Vec::new();
2017
2018 let mut push_row =
2019 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
2020 let sessions = sids.len() as u64;
2021 let sessions_pct = if denom > 0 {
2022 sessions as f64 * 100.0 / denom as f64
2023 } else {
2024 0.0
2025 };
2026 let total_cost_usd_e6: i64 = sids
2027 .iter()
2028 .map(|sid| costs.get(sid).copied().unwrap_or(0))
2029 .sum();
2030 let avg_cost_per_session_usd = if sessions > 0 {
2031 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
2032 } else {
2033 None
2034 };
2035 let vs_workspace_avg_cost_per_session_usd =
2036 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
2037 (Some(avg), Some(w)) => Some(avg - w),
2038 _ => None,
2039 };
2040 rows.push(GuidancePerfRow {
2041 kind,
2042 id,
2043 sessions,
2044 sessions_pct,
2045 total_cost_usd_e6,
2046 avg_cost_per_session_usd,
2047 vs_workspace_avg_cost_per_session_usd,
2048 on_disk,
2049 });
2050 };
2051
2052 let mut seen_skills: HashSet<String> = HashSet::new();
2053 for (id, sids) in &skill_sessions {
2054 seen_skills.insert(id.clone());
2055 push_row(
2056 GuidanceKind::Skill,
2057 id.clone(),
2058 sids,
2059 skill_slugs_on_disk.contains(id),
2060 );
2061 }
2062 for slug in skill_slugs_on_disk {
2063 if seen_skills.contains(slug) {
2064 continue;
2065 }
2066 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
2067 }
2068
2069 let mut seen_rules: HashSet<String> = HashSet::new();
2070 for (id, sids) in &rule_sessions {
2071 seen_rules.insert(id.clone());
2072 push_row(
2073 GuidanceKind::Rule,
2074 id.clone(),
2075 sids,
2076 rule_slugs_on_disk.contains(id),
2077 );
2078 }
2079 for slug in rule_slugs_on_disk {
2080 if seen_rules.contains(slug) {
2081 continue;
2082 }
2083 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
2084 }
2085
2086 rows.sort_by(|a, b| {
2087 b.sessions
2088 .cmp(&a.sessions)
2089 .then_with(|| a.kind.cmp(&b.kind))
2090 .then_with(|| a.id.cmp(&b.id))
2091 });
2092
2093 Ok(GuidanceReport {
2094 workspace: workspace.to_string(),
2095 window_start_ms,
2096 window_end_ms,
2097 sessions_in_window: denom,
2098 workspace_avg_cost_per_session_usd,
2099 rows,
2100 })
2101 }
2102
2103 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
2104 let mut stmt = self.conn.prepare(
2105 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
2106 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
2107 prompt_fingerprint, parent_session_id, agent_version, os, arch,
2108 repo_file_count, repo_total_loc
2109 FROM sessions WHERE id = ?1",
2110 )?;
2111 let mut rows = stmt.query_map(params![id], |row| {
2112 Ok((
2113 row.get::<_, String>(0)?,
2114 row.get::<_, String>(1)?,
2115 row.get::<_, Option<String>>(2)?,
2116 row.get::<_, String>(3)?,
2117 row.get::<_, i64>(4)?,
2118 row.get::<_, Option<i64>>(5)?,
2119 row.get::<_, String>(6)?,
2120 row.get::<_, String>(7)?,
2121 row.get::<_, Option<String>>(8)?,
2122 row.get::<_, Option<String>>(9)?,
2123 row.get::<_, Option<String>>(10)?,
2124 row.get::<_, Option<i64>>(11)?,
2125 row.get::<_, Option<i64>>(12)?,
2126 row.get::<_, String>(13)?,
2127 row.get::<_, Option<String>>(14)?,
2128 row.get::<_, Option<String>>(15)?,
2129 row.get::<_, Option<String>>(16)?,
2130 row.get::<_, Option<String>>(17)?,
2131 row.get::<_, Option<String>>(18)?,
2132 row.get::<_, Option<i64>>(19)?,
2133 row.get::<_, Option<i64>>(20)?,
2134 ))
2135 })?;
2136
2137 if let Some(row) = rows.next() {
2138 let (
2139 id,
2140 agent,
2141 model,
2142 workspace,
2143 started,
2144 ended,
2145 status_str,
2146 trace,
2147 start_commit,
2148 end_commit,
2149 branch,
2150 dirty_start,
2151 dirty_end,
2152 source,
2153 prompt_fingerprint,
2154 parent_session_id,
2155 agent_version,
2156 os,
2157 arch,
2158 repo_file_count,
2159 repo_total_loc,
2160 ) = row?;
2161 Ok(Some(SessionRecord {
2162 id,
2163 agent,
2164 model,
2165 workspace,
2166 started_at_ms: started as u64,
2167 ended_at_ms: ended.map(|v| v as u64),
2168 status: status_from_str(&status_str),
2169 trace_path: trace,
2170 start_commit,
2171 end_commit,
2172 branch,
2173 dirty_start: dirty_start.map(i64_to_bool),
2174 dirty_end: dirty_end.map(i64_to_bool),
2175 repo_binding_source: empty_to_none(source),
2176 prompt_fingerprint,
2177 parent_session_id,
2178 agent_version,
2179 os,
2180 arch,
2181 repo_file_count: repo_file_count.map(|v| v as u32),
2182 repo_total_loc: repo_total_loc.map(|v| v as u64),
2183 }))
2184 } else {
2185 Ok(None)
2186 }
2187 }
2188
2189 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
2190 let mut stmt = self.conn.prepare(
2191 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2192 indexed_at_ms, dirty, graph_path
2193 FROM repo_snapshots WHERE workspace = ?1
2194 ORDER BY indexed_at_ms DESC LIMIT 1",
2195 )?;
2196 let mut rows = stmt.query_map(params![workspace], |row| {
2197 Ok(RepoSnapshotRecord {
2198 id: row.get(0)?,
2199 workspace: row.get(1)?,
2200 head_commit: row.get(2)?,
2201 dirty_fingerprint: row.get(3)?,
2202 analyzer_version: row.get(4)?,
2203 indexed_at_ms: row.get::<_, i64>(5)? as u64,
2204 dirty: row.get::<_, i64>(6)? != 0,
2205 graph_path: row.get(7)?,
2206 })
2207 })?;
2208 Ok(rows.next().transpose()?)
2209 }
2210
2211 pub fn save_repo_snapshot(
2212 &self,
2213 snapshot: &RepoSnapshotRecord,
2214 facts: &[FileFact],
2215 edges: &[RepoEdge],
2216 ) -> Result<()> {
2217 self.conn.execute(
2218 "INSERT INTO repo_snapshots (
2219 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
2220 indexed_at_ms, dirty, graph_path
2221 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
2222 ON CONFLICT(id) DO UPDATE SET
2223 workspace=excluded.workspace,
2224 head_commit=excluded.head_commit,
2225 dirty_fingerprint=excluded.dirty_fingerprint,
2226 analyzer_version=excluded.analyzer_version,
2227 indexed_at_ms=excluded.indexed_at_ms,
2228 dirty=excluded.dirty,
2229 graph_path=excluded.graph_path",
2230 params![
2231 snapshot.id,
2232 snapshot.workspace,
2233 snapshot.head_commit,
2234 snapshot.dirty_fingerprint,
2235 snapshot.analyzer_version,
2236 snapshot.indexed_at_ms as i64,
2237 bool_to_i64(snapshot.dirty),
2238 snapshot.graph_path,
2239 ],
2240 )?;
2241 self.conn.execute(
2242 "DELETE FROM file_facts WHERE snapshot_id = ?1",
2243 params![snapshot.id],
2244 )?;
2245 self.conn.execute(
2246 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
2247 params![snapshot.id],
2248 )?;
2249 for fact in facts {
2250 self.conn.execute(
2251 "INSERT INTO file_facts (
2252 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2253 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2254 churn_30d, churn_90d, authors_90d, last_changed_ms
2255 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
2256 params![
2257 fact.snapshot_id,
2258 fact.path,
2259 fact.language,
2260 fact.bytes as i64,
2261 fact.loc as i64,
2262 fact.sloc as i64,
2263 fact.complexity_total as i64,
2264 fact.max_fn_complexity as i64,
2265 fact.symbol_count as i64,
2266 fact.import_count as i64,
2267 fact.fan_in as i64,
2268 fact.fan_out as i64,
2269 fact.churn_30d as i64,
2270 fact.churn_90d as i64,
2271 fact.authors_90d as i64,
2272 fact.last_changed_ms.map(|v| v as i64),
2273 ],
2274 )?;
2275 }
2276 for edge in edges {
2277 self.conn.execute(
2278 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
2279 VALUES (?1, ?2, ?3, ?4, ?5)
2280 ON CONFLICT(snapshot_id, from_id, to_id, kind)
2281 DO UPDATE SET weight = weight + excluded.weight",
2282 params![
2283 snapshot.id,
2284 edge.from_path,
2285 edge.to_path,
2286 edge.kind,
2287 edge.weight as i64,
2288 ],
2289 )?;
2290 }
2291 Ok(())
2292 }
2293
2294 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
2295 let mut stmt = self.conn.prepare(
2296 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
2297 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
2298 churn_30d, churn_90d, authors_90d, last_changed_ms
2299 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
2300 )?;
2301 let rows = stmt.query_map(params![snapshot_id], |row| {
2302 Ok(FileFact {
2303 snapshot_id: row.get(0)?,
2304 path: row.get(1)?,
2305 language: row.get(2)?,
2306 bytes: row.get::<_, i64>(3)? as u64,
2307 loc: row.get::<_, i64>(4)? as u32,
2308 sloc: row.get::<_, i64>(5)? as u32,
2309 complexity_total: row.get::<_, i64>(6)? as u32,
2310 max_fn_complexity: row.get::<_, i64>(7)? as u32,
2311 symbol_count: row.get::<_, i64>(8)? as u32,
2312 import_count: row.get::<_, i64>(9)? as u32,
2313 fan_in: row.get::<_, i64>(10)? as u32,
2314 fan_out: row.get::<_, i64>(11)? as u32,
2315 churn_30d: row.get::<_, i64>(12)? as u32,
2316 churn_90d: row.get::<_, i64>(13)? as u32,
2317 authors_90d: row.get::<_, i64>(14)? as u32,
2318 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
2319 })
2320 })?;
2321 Ok(rows.filter_map(|row| row.ok()).collect())
2322 }
2323
2324 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
2325 let mut stmt = self.conn.prepare(
2326 "SELECT from_id, to_id, kind, weight
2327 FROM repo_edges WHERE snapshot_id = ?1
2328 ORDER BY kind, from_id, to_id",
2329 )?;
2330 let rows = stmt.query_map(params![snapshot_id], |row| {
2331 Ok(RepoEdge {
2332 from_path: row.get(0)?,
2333 to_path: row.get(1)?,
2334 kind: row.get(2)?,
2335 weight: row.get::<_, i64>(3)? as u32,
2336 })
2337 })?;
2338 Ok(rows.filter_map(|row| row.ok()).collect())
2339 }
2340
2341 pub fn hottest_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2342 self.ranked_files_for_snapshot(snapshot_id, "churn_30d * complexity_total")
2343 }
2344
2345 pub fn most_changed_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2346 self.ranked_files_for_snapshot(snapshot_id, "churn_30d")
2347 }
2348
2349 pub fn most_complex_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2350 self.ranked_files_for_snapshot(snapshot_id, "complexity_total")
2351 }
2352
2353 pub fn highest_risk_files_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RankedFile>> {
2354 self.ranked_files_for_snapshot(snapshot_id, "churn_30d * authors_90d * complexity_total")
2355 }
2356
2357 fn ranked_files_for_snapshot(
2358 &self,
2359 snapshot_id: &str,
2360 value_sql: &str,
2361 ) -> Result<Vec<RankedFile>> {
2362 let sql = format!(
2363 "SELECT path, {value_sql}, complexity_total, churn_30d
2364 FROM file_facts WHERE snapshot_id = ?1
2365 ORDER BY {value_sql} DESC, path ASC LIMIT 10"
2366 );
2367 let mut stmt = self.conn.prepare(&sql)?;
2368 let rows = stmt.query_map(params![snapshot_id], ranked_file_row)?;
2369 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2370 }
2371
2372 pub fn pain_hotspots_for_snapshot(
2373 &self,
2374 snapshot_id: &str,
2375 workspace: &str,
2376 start_ms: u64,
2377 end_ms: u64,
2378 ) -> Result<Vec<RankedFile>> {
2379 let mut stmt = self.conn.prepare(PAIN_HOTSPOTS_SQL)?;
2380 let rows = stmt.query_map(
2381 params![snapshot_id, workspace, start_ms as i64, end_ms as i64],
2382 ranked_file_row,
2383 )?;
2384 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2385 }
2386
2387 pub fn tool_rank_rows_in_window(
2388 &self,
2389 workspace: &str,
2390 start_ms: u64,
2391 end_ms: u64,
2392 ) -> Result<Vec<RankedTool>> {
2393 let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
2394 let rows = stmt.query_map(
2395 params![workspace, start_ms as i64, end_ms as i64],
2396 ranked_tool_row,
2397 )?;
2398 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2399 }
2400
2401 pub fn tool_spans_in_window(
2402 &self,
2403 workspace: &str,
2404 start_ms: u64,
2405 end_ms: u64,
2406 ) -> Result<Vec<ToolSpanView>> {
2407 let mut stmt = self.conn.prepare(
2408 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2409 reasoning_tokens, cost_usd_e6, paths_json,
2410 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2411 FROM (
2412 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2413 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2414 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2415 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2416 ts.started_at_ms AS sort_ms
2417 FROM tool_spans ts
2418 JOIN sessions s ON s.id = ts.session_id
2419 WHERE s.workspace = ?1
2420 AND ts.started_at_ms >= ?2
2421 AND ts.started_at_ms <= ?3
2422 UNION ALL
2423 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
2424 ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
2425 ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
2426 ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
2427 ts.ended_at_ms AS sort_ms
2428 FROM tool_spans ts
2429 JOIN sessions s ON s.id = ts.session_id
2430 WHERE s.workspace = ?1
2431 AND ts.started_at_ms IS NULL
2432 AND ts.ended_at_ms >= ?2
2433 AND ts.ended_at_ms <= ?3
2434 )
2435 ORDER BY sort_ms DESC",
2436 )?;
2437 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2438 let paths_json: String = row.get(8)?;
2439 Ok(ToolSpanView {
2440 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2441 tool: row
2442 .get::<_, Option<String>>(1)?
2443 .unwrap_or_else(|| "unknown".into()),
2444 status: row.get(2)?,
2445 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2446 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2447 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2448 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2449 cost_usd_e6: row.get(7)?,
2450 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2451 parent_span_id: row.get(9)?,
2452 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2453 subtree_cost_usd_e6: row.get(11)?,
2454 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2455 })
2456 })?;
2457 Ok(rows.filter_map(|row| row.ok()).collect())
2458 }
2459
2460 pub fn session_span_tree(
2461 &self,
2462 session_id: &str,
2463 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
2464 let last_event_seq = self.last_event_seq_for_session(session_id)?;
2465 if let Some(entry) = self.span_tree_cache.borrow().as_ref()
2466 && entry.session_id == session_id
2467 && entry.last_event_seq == last_event_seq
2468 {
2469 return Ok(entry.nodes.clone());
2470 }
2471 let mut stmt = self.conn.prepare(
2472 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
2473 reasoning_tokens, cost_usd_e6, paths_json,
2474 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
2475 FROM tool_spans
2476 WHERE session_id = ?1
2477 ORDER BY depth ASC, started_at_ms ASC",
2478 )?;
2479 let rows = stmt.query_map(params![session_id], |row| {
2480 let paths_json: String = row.get(8)?;
2481 Ok(crate::metrics::types::ToolSpanView {
2482 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
2483 tool: row
2484 .get::<_, Option<String>>(1)?
2485 .unwrap_or_else(|| "unknown".into()),
2486 status: row.get(2)?,
2487 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
2488 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
2489 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
2490 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
2491 cost_usd_e6: row.get(7)?,
2492 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2493 parent_span_id: row.get(9)?,
2494 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
2495 subtree_cost_usd_e6: row.get(11)?,
2496 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
2497 })
2498 })?;
2499 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
2500 let nodes = crate::store::span_tree::build_tree(spans);
2501 *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
2502 session_id: session_id.to_string(),
2503 last_event_seq,
2504 nodes: nodes.clone(),
2505 });
2506 Ok(nodes)
2507 }
2508
2509 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
2510 let seq = self
2511 .conn
2512 .query_row(
2513 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
2514 params![session_id],
2515 |r| r.get::<_, Option<i64>>(0),
2516 )?
2517 .map(|v| v as u64);
2518 Ok(seq)
2519 }
2520
2521 pub fn tool_spans_sync_rows_in_window(
2527 &self,
2528 workspace: &str,
2529 start_ms: u64,
2530 end_ms: u64,
2531 ) -> Result<Vec<ToolSpanSyncRow>> {
2532 let mut stmt = self.conn.prepare(
2533 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
2534 lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2535 FROM (
2536 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2537 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2538 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2539 ts.started_at_ms AS sort_ms
2540 FROM tool_spans ts
2541 JOIN sessions s ON s.id = ts.session_id
2542 WHERE s.workspace = ?1
2543 AND ts.started_at_ms IS NOT NULL
2544 AND ts.started_at_ms >= ?2
2545 AND ts.started_at_ms <= ?3
2546 UNION ALL
2547 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
2548 ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
2549 ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
2550 ts.ended_at_ms AS sort_ms
2551 FROM tool_spans ts
2552 JOIN sessions s ON s.id = ts.session_id
2553 WHERE s.workspace = ?1
2554 AND ts.started_at_ms IS NULL
2555 AND ts.ended_at_ms IS NOT NULL
2556 AND ts.ended_at_ms >= ?2
2557 AND ts.ended_at_ms <= ?3
2558 )
2559 ORDER BY sort_ms ASC, span_id ASC",
2560 )?;
2561 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2562 let paths_json: String = row.get(12)?;
2563 Ok(ToolSpanSyncRow {
2564 span_id: row.get(0)?,
2565 session_id: row.get(1)?,
2566 tool: row.get(2)?,
2567 tool_call_id: row.get(3)?,
2568 status: row.get(4)?,
2569 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2570 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2571 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2572 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2573 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2574 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2575 cost_usd_e6: row.get(11)?,
2576 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2577 })
2578 })?;
2579 Ok(rows.filter_map(|row| row.ok()).collect())
2580 }
2581
2582 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
2583 let mut stmt = self.conn.prepare(
2584 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
2585 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
2586 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
2587 )?;
2588 let rows = stmt.query_map(params![session_id], |row| {
2589 let paths_json: String = row.get(12)?;
2590 Ok(ToolSpanSyncRow {
2591 span_id: row.get(0)?,
2592 session_id: row.get(1)?,
2593 tool: row.get(2)?,
2594 tool_call_id: row.get(3)?,
2595 status: row.get(4)?,
2596 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2597 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
2598 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
2599 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
2600 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
2601 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
2602 cost_usd_e6: row.get(11)?,
2603 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
2604 })
2605 })?;
2606 Ok(rows.filter_map(|row| row.ok()).collect())
2607 }
2608
2609 pub fn upsert_trace_span(&self, span: &TraceSpanRecord) -> Result<()> {
2610 self.conn.execute(
2611 "INSERT INTO trace_spans (
2612 span_id, trace_id, parent_span_id, session_id, kind, name, status,
2613 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2614 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2615 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)
2616 ON CONFLICT(span_id) DO UPDATE SET
2617 trace_id=excluded.trace_id, parent_span_id=excluded.parent_span_id,
2618 session_id=excluded.session_id, kind=excluded.kind, name=excluded.name,
2619 status=excluded.status, started_at_ms=excluded.started_at_ms,
2620 ended_at_ms=excluded.ended_at_ms, duration_ms=excluded.duration_ms,
2621 model=excluded.model, tool=excluded.tool, tokens_in=excluded.tokens_in,
2622 tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
2623 cost_usd_e6=excluded.cost_usd_e6,
2624 context_used_tokens=excluded.context_used_tokens,
2625 context_max_tokens=excluded.context_max_tokens, payload=excluded.payload",
2626 params![
2627 span.span_id.as_str(),
2628 span.trace_id.as_str(),
2629 span.parent_span_id.as_deref(),
2630 span.session_id.as_str(),
2631 span.kind.as_str(),
2632 span.name.as_str(),
2633 span.status.as_str(),
2634 span.started_at_ms.map(|v| v as i64),
2635 span.ended_at_ms.map(|v| v as i64),
2636 span.duration_ms.map(|v| v as i64),
2637 span.model.as_deref(),
2638 span.tool.as_deref(),
2639 span.tokens_in.map(|v| v as i64),
2640 span.tokens_out.map(|v| v as i64),
2641 span.reasoning_tokens.map(|v| v as i64),
2642 span.cost_usd_e6,
2643 span.context_used_tokens.map(|v| v as i64),
2644 span.context_max_tokens.map(|v| v as i64),
2645 serde_json::to_string(&span.payload)?,
2646 ],
2647 )?;
2648 Ok(())
2649 }
2650
2651 pub fn trace_spans_for_session(&self, session_id: &str) -> Result<Vec<TraceSpanRecord>> {
2652 let mut stmt = self.conn.prepare(
2653 "SELECT span_id, trace_id, parent_span_id, session_id, kind, name, status,
2654 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
2655 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
2656 FROM trace_spans WHERE session_id = ?1
2657 ORDER BY COALESCE(started_at_ms, ended_at_ms, 0), span_id",
2658 )?;
2659 let rows = stmt.query_map(params![session_id], trace_span_from_row)?;
2660 Ok(rows.filter_map(|row| row.ok()).collect())
2661 }
2662
2663 pub(crate) fn capture_quality_rows(
2664 &self,
2665 workspace: &str,
2666 start_ms: u64,
2667 end_ms: u64,
2668 ) -> Result<Vec<CaptureQualityRow>> {
2669 let mut stmt = self.conn.prepare(
2670 "SELECT e.source,
2671 e.tokens_in IS NOT NULL OR e.tokens_out IS NOT NULL OR e.reasoning_tokens IS NOT NULL,
2672 e.latency_ms IS NOT NULL OR e.ttft_ms IS NOT NULL,
2673 e.context_used_tokens IS NOT NULL AND e.context_max_tokens IS NOT NULL
2674 FROM events e JOIN sessions s ON s.id = e.session_id
2675 WHERE s.workspace = ?1 AND e.ts_ms >= ?2 AND e.ts_ms <= ?3",
2676 )?;
2677 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2678 Ok(CaptureQualityRow {
2679 source: row.get(0)?,
2680 has_tokens: row.get::<_, i64>(1)? != 0,
2681 has_latency: row.get::<_, i64>(2)? != 0,
2682 has_context: row.get::<_, i64>(3)? != 0,
2683 })
2684 })?;
2685 Ok(rows.filter_map(|row| row.ok()).collect())
2686 }
2687
2688 pub(crate) fn trace_span_quality_rows(
2689 &self,
2690 workspace: &str,
2691 start_ms: u64,
2692 end_ms: u64,
2693 ) -> Result<Vec<TraceSpanQualityRow>> {
2694 let mut stmt = self.conn.prepare(
2695 "SELECT ts.kind,
2696 ts.parent_span_id IS NOT NULL
2697 AND parent.span_id IS NULL
2698 AND ts.kind NOT IN ('session', 'agent')
2699 FROM trace_spans ts
2700 JOIN sessions s ON s.id = ts.session_id
2701 LEFT JOIN trace_spans parent ON parent.span_id = ts.parent_span_id
2702 WHERE s.workspace = ?1
2703 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
2704 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3",
2705 )?;
2706 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
2707 Ok(TraceSpanQualityRow {
2708 kind: row.get(0)?,
2709 is_orphan: row.get::<_, i64>(1)? != 0,
2710 })
2711 })?;
2712 Ok(rows.filter_map(|row| row.ok()).collect())
2713 }
2714
2715 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
2716 self.conn.execute(
2717 "INSERT OR REPLACE INTO session_evals
2718 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
2719 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2720 rusqlite::params![
2721 eval.id,
2722 eval.session_id,
2723 eval.judge_model,
2724 eval.rubric_id,
2725 eval.score,
2726 eval.rationale,
2727 eval.flagged as i64,
2728 eval.created_at_ms as i64,
2729 ],
2730 )?;
2731 Ok(())
2732 }
2733
2734 pub fn list_evals_in_window(
2735 &self,
2736 start_ms: u64,
2737 end_ms: u64,
2738 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2739 let mut stmt = self.conn.prepare(
2740 "SELECT id, session_id, judge_model, rubric_id, score,
2741 rationale, flagged, created_at_ms
2742 FROM session_evals
2743 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2744 ORDER BY created_at_ms ASC",
2745 )?;
2746 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
2747 Ok(crate::eval::types::EvalRow {
2748 id: r.get(0)?,
2749 session_id: r.get(1)?,
2750 judge_model: r.get(2)?,
2751 rubric_id: r.get(3)?,
2752 score: r.get(4)?,
2753 rationale: r.get(5)?,
2754 flagged: r.get::<_, i64>(6)? != 0,
2755 created_at_ms: r.get::<_, i64>(7)? as u64,
2756 })
2757 })?;
2758 rows.collect()
2759 }
2760
2761 pub fn list_evals_for_session(
2762 &self,
2763 session_id: &str,
2764 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
2765 let mut stmt = self.conn.prepare(
2766 "SELECT id, session_id, judge_model, rubric_id, score,
2767 rationale, flagged, created_at_ms
2768 FROM session_evals
2769 WHERE session_id = ?1
2770 ORDER BY created_at_ms DESC",
2771 )?;
2772 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
2773 Ok(crate::eval::types::EvalRow {
2774 id: r.get(0)?,
2775 session_id: r.get(1)?,
2776 judge_model: r.get(2)?,
2777 rubric_id: r.get(3)?,
2778 score: r.get(4)?,
2779 rationale: r.get(5)?,
2780 flagged: r.get::<_, i64>(6)? != 0,
2781 created_at_ms: r.get::<_, i64>(7)? as u64,
2782 })
2783 })?;
2784 rows.collect()
2785 }
2786
2787 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
2788 use crate::feedback::types::FeedbackLabel;
2789 self.conn.execute(
2790 "INSERT OR REPLACE INTO session_feedback
2791 (id, session_id, score, label, note, created_at_ms)
2792 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
2793 rusqlite::params![
2794 r.id,
2795 r.session_id,
2796 r.score.as_ref().map(|s| s.0 as i64),
2797 r.label.as_ref().map(FeedbackLabel::to_db_str),
2798 r.note,
2799 r.created_at_ms as i64,
2800 ],
2801 )?;
2802 let payload = serde_json::to_string(r).unwrap_or_default();
2803 self.conn.execute(
2804 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
2805 VALUES (?1, 'session_feedback', ?2, 0)",
2806 rusqlite::params![r.session_id, payload],
2807 )?;
2808 Ok(())
2809 }
2810
2811 pub fn list_feedback_in_window(
2812 &self,
2813 start_ms: u64,
2814 end_ms: u64,
2815 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
2816 let mut stmt = self.conn.prepare(
2817 "SELECT id, session_id, score, label, note, created_at_ms
2818 FROM session_feedback
2819 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
2820 ORDER BY created_at_ms ASC",
2821 )?;
2822 let rows = stmt.query_map(
2823 rusqlite::params![start_ms as i64, end_ms as i64],
2824 feedback_row,
2825 )?;
2826 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2827 }
2828
2829 pub fn feedback_for_sessions(
2830 &self,
2831 ids: &[String],
2832 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
2833 if ids.is_empty() {
2834 return Ok(std::collections::HashMap::new());
2835 }
2836 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
2837 let sql = format!(
2838 "SELECT id, session_id, score, label, note, created_at_ms
2839 FROM session_feedback WHERE session_id IN ({placeholders})
2840 ORDER BY created_at_ms DESC"
2841 );
2842 let mut stmt = self.conn.prepare(&sql)?;
2843 let params: Vec<&dyn rusqlite::ToSql> =
2844 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2845 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
2846 let mut map = std::collections::HashMap::new();
2847 for row in rows {
2848 let r = row?;
2849 map.entry(r.session_id.clone()).or_insert(r);
2850 }
2851 Ok(map)
2852 }
2853
2854 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
2855 self.conn.execute(
2856 "INSERT INTO session_outcomes (
2857 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2858 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2859 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
2860 ON CONFLICT(session_id) DO UPDATE SET
2861 test_passed=excluded.test_passed,
2862 test_failed=excluded.test_failed,
2863 test_skipped=excluded.test_skipped,
2864 build_ok=excluded.build_ok,
2865 lint_errors=excluded.lint_errors,
2866 revert_lines_14d=excluded.revert_lines_14d,
2867 pr_open=excluded.pr_open,
2868 ci_ok=excluded.ci_ok,
2869 measured_at_ms=excluded.measured_at_ms,
2870 measure_error=excluded.measure_error",
2871 params![
2872 row.session_id,
2873 row.test_passed,
2874 row.test_failed,
2875 row.test_skipped,
2876 row.build_ok.map(bool_to_i64),
2877 row.lint_errors,
2878 row.revert_lines_14d,
2879 row.pr_open,
2880 row.ci_ok.map(bool_to_i64),
2881 row.measured_at_ms as i64,
2882 row.measure_error.as_deref(),
2883 ],
2884 )?;
2885 Ok(())
2886 }
2887
2888 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2889 let mut stmt = self.conn.prepare(
2890 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2891 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2892 FROM session_outcomes WHERE session_id = ?1",
2893 )?;
2894 let row = stmt
2895 .query_row(params![session_id], outcome_row)
2896 .optional()?;
2897 Ok(row)
2898 }
2899
2900 pub fn list_session_outcomes_in_window(
2902 &self,
2903 workspace: &str,
2904 start_ms: u64,
2905 end_ms: u64,
2906 ) -> Result<Vec<SessionOutcomeRow>> {
2907 let mut stmt = self.conn.prepare(
2908 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2909 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2910 FROM session_outcomes o
2911 JOIN sessions s ON s.id = o.session_id
2912 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2913 ORDER BY o.measured_at_ms ASC",
2914 )?;
2915 let rows = stmt.query_map(
2916 params![workspace, start_ms as i64, end_ms as i64],
2917 outcome_row,
2918 )?;
2919 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2920 }
2921
2922 pub fn append_session_sample(
2923 &self,
2924 session_id: &str,
2925 ts_ms: u64,
2926 pid: u32,
2927 cpu_percent: Option<f64>,
2928 rss_bytes: Option<u64>,
2929 ) -> Result<()> {
2930 self.conn.execute(
2931 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2932 VALUES (?1, ?2, ?3, ?4, ?5)",
2933 params![
2934 session_id,
2935 ts_ms as i64,
2936 pid as i64,
2937 cpu_percent,
2938 rss_bytes.map(|b| b as i64)
2939 ],
2940 )?;
2941 Ok(())
2942 }
2943
2944 pub fn list_session_sample_aggs_in_window(
2946 &self,
2947 workspace: &str,
2948 start_ms: u64,
2949 end_ms: u64,
2950 ) -> Result<Vec<SessionSampleAgg>> {
2951 let mut stmt = self.conn.prepare(
2952 "SELECT ss.session_id, COUNT(*) AS n,
2953 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2954 FROM session_samples ss
2955 JOIN sessions s ON s.id = ss.session_id
2956 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2957 GROUP BY ss.session_id",
2958 )?;
2959 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2960 let sid: String = r.get(0)?;
2961 let n: i64 = r.get(1)?;
2962 let max_cpu: Option<f64> = r.get(2)?;
2963 let max_rss: Option<i64> = r.get(3)?;
2964 Ok(SessionSampleAgg {
2965 session_id: sid,
2966 sample_count: n as u64,
2967 max_cpu_percent: max_cpu.unwrap_or(0.0),
2968 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2969 })
2970 })?;
2971 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2972 }
2973
2974 pub fn list_sessions_for_eval(
2975 &self,
2976 since_ms: u64,
2977 min_cost_usd: f64,
2978 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2979 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2980 let mut stmt = self.conn.prepare(
2981 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2982 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2983 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2984 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2985 FROM sessions s
2986 WHERE s.started_at_ms >= ?1
2987 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2988 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2989 ORDER BY s.started_at_ms DESC",
2990 )?;
2991 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2992 Ok((
2993 r.get::<_, String>(0)?,
2994 r.get::<_, String>(1)?,
2995 r.get::<_, Option<String>>(2)?,
2996 r.get::<_, String>(3)?,
2997 r.get::<_, i64>(4)?,
2998 r.get::<_, Option<i64>>(5)?,
2999 r.get::<_, String>(6)?,
3000 r.get::<_, String>(7)?,
3001 r.get::<_, Option<String>>(8)?,
3002 r.get::<_, Option<String>>(9)?,
3003 r.get::<_, Option<String>>(10)?,
3004 r.get::<_, Option<i64>>(11)?,
3005 r.get::<_, Option<i64>>(12)?,
3006 r.get::<_, Option<String>>(13)?,
3007 r.get::<_, Option<String>>(14)?,
3008 r.get::<_, Option<String>>(15)?,
3009 r.get::<_, Option<String>>(16)?,
3010 r.get::<_, Option<String>>(17)?,
3011 r.get::<_, Option<String>>(18)?,
3012 r.get::<_, Option<i64>>(19)?,
3013 r.get::<_, Option<i64>>(20)?,
3014 ))
3015 })?;
3016 let mut out = Vec::new();
3017 for row in rows {
3018 let (
3019 id,
3020 agent,
3021 model,
3022 workspace,
3023 started,
3024 ended,
3025 status_str,
3026 trace,
3027 start_commit,
3028 end_commit,
3029 branch,
3030 dirty_start,
3031 dirty_end,
3032 source,
3033 prompt_fingerprint,
3034 parent_session_id,
3035 agent_version,
3036 os,
3037 arch,
3038 repo_file_count,
3039 repo_total_loc,
3040 ) = row?;
3041 out.push(crate::core::event::SessionRecord {
3042 id,
3043 agent,
3044 model,
3045 workspace,
3046 started_at_ms: started as u64,
3047 ended_at_ms: ended.map(|v| v as u64),
3048 status: status_from_str(&status_str),
3049 trace_path: trace,
3050 start_commit,
3051 end_commit,
3052 branch,
3053 dirty_start: dirty_start.map(i64_to_bool),
3054 dirty_end: dirty_end.map(i64_to_bool),
3055 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
3056 prompt_fingerprint,
3057 parent_session_id,
3058 agent_version,
3059 os,
3060 arch,
3061 repo_file_count: repo_file_count.map(|v| v as u32),
3062 repo_total_loc: repo_total_loc.map(|v| v as u64),
3063 });
3064 }
3065 Ok(out)
3066 }
3067
3068 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
3069 self.conn.execute(
3070 "INSERT OR IGNORE INTO prompt_snapshots
3071 (fingerprint, captured_at_ms, files_json, total_bytes)
3072 VALUES (?1, ?2, ?3, ?4)",
3073 params![
3074 snap.fingerprint,
3075 snap.captured_at_ms as i64,
3076 snap.files_json,
3077 snap.total_bytes as i64
3078 ],
3079 )?;
3080 Ok(())
3081 }
3082
3083 pub fn get_prompt_snapshot(
3084 &self,
3085 fingerprint: &str,
3086 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
3087 self.conn
3088 .query_row(
3089 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3090 FROM prompt_snapshots WHERE fingerprint = ?1",
3091 params![fingerprint],
3092 |r| {
3093 Ok(crate::prompt::PromptSnapshot {
3094 fingerprint: r.get(0)?,
3095 captured_at_ms: r.get::<_, i64>(1)? as u64,
3096 files_json: r.get(2)?,
3097 total_bytes: r.get::<_, i64>(3)? as u64,
3098 })
3099 },
3100 )
3101 .optional()
3102 .map_err(Into::into)
3103 }
3104
3105 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
3106 let mut stmt = self.conn.prepare(
3107 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
3108 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
3109 )?;
3110 let rows = stmt.query_map([], |r| {
3111 Ok(crate::prompt::PromptSnapshot {
3112 fingerprint: r.get(0)?,
3113 captured_at_ms: r.get::<_, i64>(1)? as u64,
3114 files_json: r.get(2)?,
3115 total_bytes: r.get::<_, i64>(3)? as u64,
3116 })
3117 })?;
3118 Ok(rows.filter_map(|r| r.ok()).collect())
3119 }
3120
3121 pub fn sessions_with_prompt_fingerprint(
3123 &self,
3124 workspace: &str,
3125 start_ms: u64,
3126 end_ms: u64,
3127 ) -> Result<Vec<(String, String)>> {
3128 let mut stmt = self.conn.prepare(
3129 "SELECT id, prompt_fingerprint FROM sessions
3130 WHERE workspace = ?1
3131 AND started_at_ms >= ?2 AND started_at_ms < ?3
3132 AND prompt_fingerprint IS NOT NULL",
3133 )?;
3134 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
3135 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
3136 })?;
3137 Ok(rows.filter_map(|r| r.ok()).collect())
3138 }
3139}
3140
3141impl Drop for Store {
3142 fn drop(&mut self) {
3143 if let Some(writer) = self.search_writer.get_mut().as_mut() {
3144 let _ = writer.commit();
3145 }
3146 }
3147}
3148
3149fn now_ms() -> u64 {
3150 std::time::SystemTime::now()
3151 .duration_since(std::time::UNIX_EPOCH)
3152 .unwrap_or_default()
3153 .as_millis() as u64
3154}
3155
3156fn old_session_ids(tx: &rusqlite::Transaction<'_>, cutoff_ms: i64) -> Result<Vec<String>> {
3157 let mut stmt = tx.prepare("SELECT id FROM sessions WHERE started_at_ms < ?1")?;
3158 let rows = stmt.query_map(params![cutoff_ms], |r| r.get::<_, String>(0))?;
3159 Ok(rows.filter_map(|r| r.ok()).collect())
3160}
3161
3162fn mmap_size_bytes_from_mb(raw: Option<&str>) -> i64 {
3163 raw.and_then(|s| s.trim().parse::<u64>().ok())
3164 .unwrap_or(DEFAULT_MMAP_MB)
3165 .saturating_mul(1024)
3166 .saturating_mul(1024)
3167 .min(i64::MAX as u64) as i64
3168}
3169
3170fn apply_pragmas(conn: &Connection, mode: StoreOpenMode) -> Result<()> {
3171 let mmap_size = mmap_size_bytes_from_mb(std::env::var("KAIZEN_MMAP_MB").ok().as_deref());
3172 conn.execute_batch(&format!(
3173 "
3174 PRAGMA journal_mode=WAL;
3175 PRAGMA busy_timeout=5000;
3176 PRAGMA synchronous=NORMAL;
3177 PRAGMA cache_size=-65536;
3178 PRAGMA mmap_size={mmap_size};
3179 PRAGMA temp_store=MEMORY;
3180 PRAGMA wal_autocheckpoint=1000;
3181 "
3182 ))?;
3183 if mode == StoreOpenMode::ReadOnlyQuery {
3184 conn.execute_batch("PRAGMA query_only=ON;")?;
3185 }
3186 Ok(())
3187}
3188
3189fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
3190 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
3191}
3192
3193fn session_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
3194 let status_str: String = row.get(6)?;
3195 Ok(SessionRecord {
3196 id: row.get(0)?,
3197 agent: row.get(1)?,
3198 model: row.get(2)?,
3199 workspace: row.get(3)?,
3200 started_at_ms: row.get::<_, i64>(4)? as u64,
3201 ended_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3202 status: status_from_str(&status_str),
3203 trace_path: row.get(7)?,
3204 start_commit: row.get(8)?,
3205 end_commit: row.get(9)?,
3206 branch: row.get(10)?,
3207 dirty_start: row.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3208 dirty_end: row.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3209 repo_binding_source: empty_to_none(row.get::<_, String>(13)?),
3210 prompt_fingerprint: row.get(14)?,
3211 parent_session_id: row.get(15)?,
3212 agent_version: row.get(16)?,
3213 os: row.get(17)?,
3214 arch: row.get(18)?,
3215 repo_file_count: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3216 repo_total_loc: row.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3217 })
3218}
3219
3220fn ranked_file_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedFile> {
3221 Ok(RankedFile {
3222 path: row.get(0)?,
3223 value: row.get::<_, i64>(1)? as u64,
3224 complexity_total: row.get::<_, i64>(2)? as u32,
3225 churn_30d: row.get::<_, i64>(3)? as u32,
3226 })
3227}
3228
3229fn ranked_tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<RankedTool> {
3230 Ok(RankedTool {
3231 tool: row.get(0)?,
3232 calls: row.get::<_, i64>(1)? as u64,
3233 p50_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
3234 p95_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
3235 total_tokens: row.get::<_, i64>(4)? as u64,
3236 total_reasoning_tokens: row.get::<_, i64>(5)? as u64,
3237 })
3238}
3239
3240fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
3241 let payload_str: String = row.get(12)?;
3242 Ok(Event {
3243 session_id: row.get(0)?,
3244 seq: row.get::<_, i64>(1)? as u64,
3245 ts_ms: row.get::<_, i64>(2)? as u64,
3246 ts_exact: row.get::<_, i64>(3)? != 0,
3247 kind: kind_from_str(&row.get::<_, String>(4)?),
3248 source: source_from_str(&row.get::<_, String>(5)?),
3249 tool: row.get(6)?,
3250 tool_call_id: row.get(7)?,
3251 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
3252 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3253 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
3254 cost_usd_e6: row.get(11)?,
3255 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
3256 stop_reason: row.get(13)?,
3257 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3258 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
3259 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
3260 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3261 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
3262 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3263 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
3264 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
3265 })
3266}
3267
3268fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> {
3269 Ok((row.get(22)?, event_row(row)?))
3270}
3271
3272fn session_filter_sql(workspace: &str, filter: &SessionFilter) -> (String, Vec<Value>) {
3273 let mut clauses = vec!["workspace = ?".to_string()];
3274 let mut args = vec![Value::Text(workspace.to_string())];
3275 if let Some(prefix) = filter.agent_prefix.as_deref().filter(|s| !s.is_empty()) {
3276 clauses.push("lower(agent) LIKE ? ESCAPE '\\'".to_string());
3277 args.push(Value::Text(format!("{}%", escape_like(prefix))));
3278 }
3279 if let Some(status) = &filter.status {
3280 clauses.push("status = ?".to_string());
3281 args.push(Value::Text(format!("{status:?}")));
3282 }
3283 if let Some(since_ms) = filter.since_ms {
3284 clauses.push("started_at_ms >= ?".to_string());
3285 args.push(Value::Integer(since_ms as i64));
3286 }
3287 (format!("WHERE {}", clauses.join(" AND ")), args)
3288}
3289
3290fn escape_like(raw: &str) -> String {
3291 raw.to_lowercase()
3292 .replace('\\', "\\\\")
3293 .replace('%', "\\%")
3294 .replace('_', "\\_")
3295}
3296
3297fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
3298 let cost: i64 = conn.query_row(
3299 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
3300 params![workspace], |r| r.get(0),
3301 )?;
3302 let with_cost: i64 = conn.query_row(
3303 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
3304 params![workspace], |r| r.get(0),
3305 )?;
3306 Ok((cost, with_cost as u64))
3307}
3308
3309fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
3310 let build_raw: Option<i64> = r.get(4)?;
3311 let ci_raw: Option<i64> = r.get(8)?;
3312 Ok(SessionOutcomeRow {
3313 session_id: r.get(0)?,
3314 test_passed: r.get(1)?,
3315 test_failed: r.get(2)?,
3316 test_skipped: r.get(3)?,
3317 build_ok: build_raw.map(|v| v != 0),
3318 lint_errors: r.get(5)?,
3319 revert_lines_14d: r.get(6)?,
3320 pr_open: r.get(7)?,
3321 ci_ok: ci_raw.map(|v| v != 0),
3322 measured_at_ms: r.get::<_, i64>(9)? as u64,
3323 measure_error: r.get(10)?,
3324 })
3325}
3326
3327fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
3328 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
3329 let score = r
3330 .get::<_, Option<i64>>(2)?
3331 .and_then(|v| FeedbackScore::new(v as u8));
3332 let label = r
3333 .get::<_, Option<String>>(3)?
3334 .and_then(|s| FeedbackLabel::from_str_opt(&s));
3335 Ok(FeedbackRecord {
3336 id: r.get(0)?,
3337 session_id: r.get(1)?,
3338 score,
3339 label,
3340 note: r.get(4)?,
3341 created_at_ms: r.get::<_, i64>(5)? as u64,
3342 })
3343}
3344
3345fn day_label(day_idx: u64) -> &'static str {
3346 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
3347}
3348
3349fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
3350 let week_ago = now.saturating_sub(7 * 86_400_000);
3351 let mut stmt = conn
3352 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
3353 let days: Vec<u64> = stmt
3354 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
3355 .filter_map(|r| r.ok())
3356 .map(|v| v as u64 / 86_400_000)
3357 .collect();
3358 let today = now / 86_400_000;
3359 Ok((0u64..7)
3360 .map(|i| {
3361 let d = today.saturating_sub(6 - i);
3362 (
3363 day_label(d).to_string(),
3364 days.iter().filter(|&&x| x == d).count() as u64,
3365 )
3366 })
3367 .collect())
3368}
3369
3370fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
3371 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
3372 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
3373 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
3374 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
3375 COUNT(e.id) FROM sessions s \
3376 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
3377 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
3378 let mut stmt = conn.prepare(sql)?;
3379 let out: Vec<(SessionRecord, u64)> = stmt
3380 .query_map(params![workspace], |r| {
3381 let st: String = r.get(6)?;
3382 Ok((
3383 SessionRecord {
3384 id: r.get(0)?,
3385 agent: r.get(1)?,
3386 model: r.get(2)?,
3387 workspace: r.get(3)?,
3388 started_at_ms: r.get::<_, i64>(4)? as u64,
3389 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
3390 status: status_from_str(&st),
3391 trace_path: r.get(7)?,
3392 start_commit: r.get(8)?,
3393 end_commit: r.get(9)?,
3394 branch: r.get(10)?,
3395 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
3396 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
3397 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
3398 prompt_fingerprint: r.get(14)?,
3399 parent_session_id: r.get(15)?,
3400 agent_version: r.get(16)?,
3401 os: r.get(17)?,
3402 arch: r.get(18)?,
3403 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
3404 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
3405 },
3406 r.get::<_, i64>(21)? as u64,
3407 ))
3408 })?
3409 .filter_map(|r| r.ok())
3410 .collect();
3411 Ok(out)
3412}
3413
3414fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
3415 let mut stmt = conn.prepare(
3416 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
3417 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
3418 )?;
3419 let out: Vec<(String, u64)> = stmt
3420 .query_map(params![workspace], |r| {
3421 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
3422 })?
3423 .filter_map(|r| r.ok())
3424 .collect();
3425 Ok(out)
3426}
3427
3428fn trace_span_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TraceSpanRecord> {
3429 let kind: String = row.get(4)?;
3430 let payload: String = row.get(18)?;
3431 Ok(TraceSpanRecord {
3432 span_id: row.get(0)?,
3433 trace_id: row.get(1)?,
3434 parent_span_id: row.get(2)?,
3435 session_id: row.get(3)?,
3436 kind: TraceSpanKind::parse(&kind),
3437 name: row.get(5)?,
3438 status: row.get(6)?,
3439 started_at_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
3440 ended_at_ms: row.get::<_, Option<i64>>(8)?.map(|v| v as u64),
3441 duration_ms: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
3442 model: row.get(10)?,
3443 tool: row.get(11)?,
3444 tokens_in: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
3445 tokens_out: row.get::<_, Option<i64>>(13)?.map(|v| v as u32),
3446 reasoning_tokens: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
3447 cost_usd_e6: row.get(15)?,
3448 context_used_tokens: row.get::<_, Option<i64>>(16)?.map(|v| v as u32),
3449 context_max_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
3450 payload: serde_json::from_str(&payload).unwrap_or_default(),
3451 })
3452}
3453
3454fn status_from_str(s: &str) -> SessionStatus {
3455 match s {
3456 "Running" => SessionStatus::Running,
3457 "Waiting" => SessionStatus::Waiting,
3458 "Idle" => SessionStatus::Idle,
3459 _ => SessionStatus::Done,
3460 }
3461}
3462
3463fn projector_legacy_mode() -> bool {
3464 std::env::var("KAIZEN_PROJECTOR").is_ok_and(|v| v == "legacy")
3465}
3466
3467fn is_stop_event(e: &Event) -> bool {
3468 if !matches!(e.kind, EventKind::Hook) {
3469 return false;
3470 }
3471 e.payload
3472 .get("event")
3473 .and_then(|v| v.as_str())
3474 .or_else(|| e.payload.get("hook_event_name").and_then(|v| v.as_str()))
3475 == Some("Stop")
3476}
3477
3478fn kind_from_str(s: &str) -> EventKind {
3479 match s {
3480 "ToolCall" => EventKind::ToolCall,
3481 "ToolResult" => EventKind::ToolResult,
3482 "Message" => EventKind::Message,
3483 "Error" => EventKind::Error,
3484 "Cost" => EventKind::Cost,
3485 "Hook" => EventKind::Hook,
3486 "Lifecycle" => EventKind::Lifecycle,
3487 _ => EventKind::Hook,
3488 }
3489}
3490
3491fn source_from_str(s: &str) -> EventSource {
3492 match s {
3493 "Tail" => EventSource::Tail,
3494 "Hook" => EventSource::Hook,
3495 _ => EventSource::Proxy,
3496 }
3497}
3498
3499fn ensure_schema_columns(conn: &Connection) -> Result<()> {
3500 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
3501 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
3502 ensure_column(conn, "sessions", "branch", "TEXT")?;
3503 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
3504 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
3505 ensure_column(
3506 conn,
3507 "sessions",
3508 "repo_binding_source",
3509 "TEXT NOT NULL DEFAULT ''",
3510 )?;
3511 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
3512 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
3513 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
3514 ensure_column(conn, "events", "stop_reason", "TEXT")?;
3515 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
3516 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
3517 ensure_column(conn, "events", "retry_count", "INTEGER")?;
3518 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
3519 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
3520 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
3521 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
3522 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
3523 ensure_column(
3524 conn,
3525 "sync_outbox",
3526 "kind",
3527 "TEXT NOT NULL DEFAULT 'events'",
3528 )?;
3529 ensure_column(
3530 conn,
3531 "experiments",
3532 "state",
3533 "TEXT NOT NULL DEFAULT 'Draft'",
3534 )?;
3535 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
3536 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
3537 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
3538 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
3539 ensure_column(conn, "sessions", "os", "TEXT")?;
3540 ensure_column(conn, "sessions", "arch", "TEXT")?;
3541 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
3542 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
3543 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
3544 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
3545 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
3546 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
3547 conn.execute_batch(
3548 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
3549 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
3550 )?;
3551 Ok(())
3552}
3553
3554fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
3555 if has_column(conn, table, column)? {
3556 return Ok(());
3557 }
3558 let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}");
3559 match conn.execute(&sql, []) {
3560 Ok(_) => Ok(()),
3561 Err(err) if column_was_added_by_race(conn, table, column, &err)? => Ok(()),
3562 Err(err) => Err(err.into()),
3563 }
3564}
3565
3566fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
3567 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
3568 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
3569 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
3570}
3571
3572fn column_was_added_by_race(
3573 conn: &Connection,
3574 table: &str,
3575 column: &str,
3576 err: &rusqlite::Error,
3577) -> Result<bool> {
3578 if !is_duplicate_column_error(err) {
3579 return Ok(false);
3580 }
3581 has_column(conn, table, column)
3582}
3583
3584fn is_duplicate_column_error(err: &rusqlite::Error) -> bool {
3585 matches!(
3586 err,
3587 rusqlite::Error::SqliteFailure(_, Some(message))
3588 if message.contains("duplicate column name")
3589 )
3590}
3591
3592fn bool_to_i64(v: bool) -> i64 {
3593 if v { 1 } else { 0 }
3594}
3595
3596fn i64_to_bool(v: i64) -> bool {
3597 v != 0
3598}
3599
3600fn empty_to_none(s: String) -> Option<String> {
3601 if s.is_empty() { None } else { Some(s) }
3602}
3603
3604#[cfg(test)]
3605mod tests {
3606 use super::*;
3607 use serde_json::json;
3608 use std::collections::HashSet;
3609 use tempfile::TempDir;
3610
3611 fn make_session(id: &str) -> SessionRecord {
3612 SessionRecord {
3613 id: id.to_string(),
3614 agent: "cursor".to_string(),
3615 model: None,
3616 workspace: "/ws".to_string(),
3617 started_at_ms: 1000,
3618 ended_at_ms: None,
3619 status: SessionStatus::Done,
3620 trace_path: "/trace".to_string(),
3621 start_commit: None,
3622 end_commit: None,
3623 branch: None,
3624 dirty_start: None,
3625 dirty_end: None,
3626 repo_binding_source: None,
3627 prompt_fingerprint: None,
3628 parent_session_id: None,
3629 agent_version: None,
3630 os: None,
3631 arch: None,
3632 repo_file_count: None,
3633 repo_total_loc: None,
3634 }
3635 }
3636
3637 fn make_event(session_id: &str, seq: u64) -> Event {
3638 Event {
3639 session_id: session_id.to_string(),
3640 seq,
3641 ts_ms: 1000 + seq * 100,
3642 ts_exact: false,
3643 kind: EventKind::ToolCall,
3644 source: EventSource::Tail,
3645 tool: Some("read_file".to_string()),
3646 tool_call_id: Some(format!("call_{seq}")),
3647 tokens_in: None,
3648 tokens_out: None,
3649 reasoning_tokens: None,
3650 cost_usd_e6: None,
3651 stop_reason: None,
3652 latency_ms: None,
3653 ttft_ms: None,
3654 retry_count: None,
3655 context_used_tokens: None,
3656 context_max_tokens: None,
3657 cache_creation_tokens: None,
3658 cache_read_tokens: None,
3659 system_prompt_tokens: None,
3660 payload: json!({}),
3661 }
3662 }
3663
3664 #[test]
3665 fn open_and_wal_mode() {
3666 let dir = TempDir::new().unwrap();
3667 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3668 let mode: String = store
3669 .conn
3670 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
3671 .unwrap();
3672 assert_eq!(mode, "wal");
3673 }
3674
3675 #[test]
3676 fn open_applies_phase0_pragmas() {
3677 let dir = TempDir::new().unwrap();
3678 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3679 let synchronous: i64 = store
3680 .conn
3681 .query_row("PRAGMA synchronous", [], |r| r.get(0))
3682 .unwrap();
3683 let cache_size: i64 = store
3684 .conn
3685 .query_row("PRAGMA cache_size", [], |r| r.get(0))
3686 .unwrap();
3687 let temp_store: i64 = store
3688 .conn
3689 .query_row("PRAGMA temp_store", [], |r| r.get(0))
3690 .unwrap();
3691 let wal_autocheckpoint: i64 = store
3692 .conn
3693 .query_row("PRAGMA wal_autocheckpoint", [], |r| r.get(0))
3694 .unwrap();
3695 assert_eq!(synchronous, 1);
3696 assert_eq!(cache_size, -65_536);
3697 assert_eq!(temp_store, 2);
3698 assert_eq!(wal_autocheckpoint, 1_000);
3699 assert_eq!(mmap_size_bytes_from_mb(Some("64")), 67_108_864);
3700 }
3701
3702 #[test]
3703 fn ensure_column_tolerates_duplicate_from_race() {
3704 let conn = Connection::open_in_memory().unwrap();
3705 conn.execute_batch("CREATE TABLE sessions (id TEXT, start_commit TEXT)")
3706 .unwrap();
3707 let err = conn
3708 .execute("ALTER TABLE sessions ADD COLUMN start_commit TEXT", [])
3709 .unwrap_err();
3710 assert!(column_was_added_by_race(&conn, "sessions", "start_commit", &err).unwrap());
3711 }
3712
3713 #[test]
3714 fn read_only_open_sets_query_only() {
3715 let dir = TempDir::new().unwrap();
3716 let db = dir.path().join("kaizen.db");
3717 Store::open(&db).unwrap();
3718 let store = Store::open_read_only(&db).unwrap();
3719 let query_only: i64 = store
3720 .conn
3721 .query_row("PRAGMA query_only", [], |r| r.get(0))
3722 .unwrap();
3723 assert_eq!(query_only, 1);
3724 }
3725
3726 #[test]
3727 fn phase0_indexes_exist() {
3728 let dir = TempDir::new().unwrap();
3729 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3730 for name in [
3731 "tool_spans_session_idx",
3732 "tool_spans_started_idx",
3733 "session_samples_ts_idx",
3734 "events_ts_idx",
3735 "feedback_session_idx",
3736 ] {
3737 let found: i64 = store
3738 .conn
3739 .query_row(
3740 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name=?1",
3741 params![name],
3742 |r| r.get(0),
3743 )
3744 .unwrap();
3745 assert_eq!(found, 1, "{name}");
3746 }
3747 }
3748
3749 #[test]
3750 fn upsert_and_get_session() {
3751 let dir = TempDir::new().unwrap();
3752 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3753 let s = make_session("s1");
3754 store.upsert_session(&s).unwrap();
3755
3756 let got = store.get_session("s1").unwrap().unwrap();
3757 assert_eq!(got.id, "s1");
3758 assert_eq!(got.status, SessionStatus::Done);
3759 }
3760
3761 #[test]
3762 fn append_and_list_events_round_trip() {
3763 let dir = TempDir::new().unwrap();
3764 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3765 let s = make_session("s2");
3766 store.upsert_session(&s).unwrap();
3767 store.append_event(&make_event("s2", 0)).unwrap();
3768 store.append_event(&make_event("s2", 1)).unwrap();
3769
3770 let sessions = store.list_sessions("/ws").unwrap();
3771 assert_eq!(sessions.len(), 1);
3772 assert_eq!(sessions[0].id, "s2");
3773 }
3774
3775 #[test]
3776 fn list_sessions_page_orders_and_counts() {
3777 let dir = TempDir::new().unwrap();
3778 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3779 let mut a = make_session("a");
3780 a.started_at_ms = 2_000;
3781 let mut b = make_session("b");
3782 b.started_at_ms = 2_000;
3783 let mut c = make_session("c");
3784 c.started_at_ms = 1_000;
3785 store.upsert_session(&c).unwrap();
3786 store.upsert_session(&b).unwrap();
3787 store.upsert_session(&a).unwrap();
3788
3789 let page = store
3790 .list_sessions_page("/ws", 0, 2, SessionFilter::default())
3791 .unwrap();
3792 assert_eq!(page.total, 3);
3793 assert_eq!(page.next_offset, Some(2));
3794 assert_eq!(
3795 page.rows.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3796 vec!["a", "b"]
3797 );
3798
3799 let all = store.list_sessions("/ws").unwrap();
3800 assert_eq!(
3801 all.iter().map(|s| s.id.as_str()).collect::<Vec<_>>(),
3802 vec!["a", "b", "c"]
3803 );
3804 }
3805
3806 #[test]
3807 fn list_sessions_page_filters_in_sql_shape() {
3808 let dir = TempDir::new().unwrap();
3809 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3810 let mut cursor = make_session("cursor");
3811 cursor.agent = "Cursor".into();
3812 cursor.started_at_ms = 2_000;
3813 cursor.status = SessionStatus::Running;
3814 let mut claude = make_session("claude");
3815 claude.agent = "claude".into();
3816 claude.started_at_ms = 3_000;
3817 store.upsert_session(&cursor).unwrap();
3818 store.upsert_session(&claude).unwrap();
3819
3820 let page = store
3821 .list_sessions_page(
3822 "/ws",
3823 0,
3824 10,
3825 SessionFilter {
3826 agent_prefix: Some("cur".into()),
3827 status: Some(SessionStatus::Running),
3828 since_ms: Some(1_500),
3829 },
3830 )
3831 .unwrap();
3832 assert_eq!(page.total, 1);
3833 assert_eq!(page.rows[0].id, "cursor");
3834 }
3835
3836 #[test]
3837 fn incremental_session_helpers_find_new_rows_and_statuses() {
3838 let dir = TempDir::new().unwrap();
3839 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3840 let mut old = make_session("old");
3841 old.started_at_ms = 1_000;
3842 let mut new = make_session("new");
3843 new.started_at_ms = 2_000;
3844 new.status = SessionStatus::Running;
3845 store.upsert_session(&old).unwrap();
3846 store.upsert_session(&new).unwrap();
3847
3848 let rows = store.list_sessions_started_after("/ws", 1_500).unwrap();
3849 assert_eq!(rows.len(), 1);
3850 assert_eq!(rows[0].id, "new");
3851
3852 store
3853 .update_session_status("new", SessionStatus::Done)
3854 .unwrap();
3855 let statuses = store.session_statuses(&["new".to_string()]).unwrap();
3856 assert_eq!(statuses.len(), 1);
3857 assert_eq!(statuses[0].status, SessionStatus::Done);
3858 }
3859
3860 #[test]
3861 fn summary_stats_empty() {
3862 let dir = TempDir::new().unwrap();
3863 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3864 let stats = store.summary_stats("/ws").unwrap();
3865 assert_eq!(stats.session_count, 0);
3866 assert_eq!(stats.total_cost_usd_e6, 0);
3867 }
3868
3869 #[test]
3870 fn summary_stats_counts_sessions() {
3871 let dir = TempDir::new().unwrap();
3872 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3873 store.upsert_session(&make_session("a")).unwrap();
3874 store.upsert_session(&make_session("b")).unwrap();
3875 let stats = store.summary_stats("/ws").unwrap();
3876 assert_eq!(stats.session_count, 2);
3877 assert_eq!(stats.by_agent.len(), 1);
3878 assert_eq!(stats.by_agent[0].0, "cursor");
3879 assert_eq!(stats.by_agent[0].1, 2);
3880 }
3881
3882 #[test]
3883 fn list_events_for_session_round_trip() {
3884 let dir = TempDir::new().unwrap();
3885 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3886 store.upsert_session(&make_session("s4")).unwrap();
3887 store.append_event(&make_event("s4", 0)).unwrap();
3888 store.append_event(&make_event("s4", 1)).unwrap();
3889 let events = store.list_events_for_session("s4").unwrap();
3890 assert_eq!(events.len(), 2);
3891 assert_eq!(events[0].seq, 0);
3892 assert_eq!(events[1].seq, 1);
3893 }
3894
3895 #[test]
3896 fn list_events_page_uses_inclusive_seq_cursor() {
3897 let dir = TempDir::new().unwrap();
3898 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3899 store.upsert_session(&make_session("paged")).unwrap();
3900 for seq in 0..5 {
3901 store.append_event(&make_event("paged", seq)).unwrap();
3902 }
3903 let first = store.list_events_page("paged", 0, 2).unwrap();
3904 assert_eq!(first.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![0, 1]);
3905 let second = store
3906 .list_events_page("paged", first[1].seq + 1, 2)
3907 .unwrap();
3908 assert_eq!(second.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![2, 3]);
3909 }
3910
3911 #[test]
3912 fn append_event_dedup() {
3913 let dir = TempDir::new().unwrap();
3914 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3915 store.upsert_session(&make_session("s5")).unwrap();
3916 store.append_event(&make_event("s5", 0)).unwrap();
3917 store.append_event(&make_event("s5", 0)).unwrap();
3919 let events = store.list_events_for_session("s5").unwrap();
3920 assert_eq!(events.len(), 1);
3921 }
3922
3923 #[test]
3924 fn span_tree_cache_hits_empty_and_invalidates_on_append() {
3925 let dir = TempDir::new().unwrap();
3926 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3927 assert!(store.session_span_tree("missing").unwrap().is_empty());
3928 assert!(store.span_tree_cache.borrow().is_some());
3929
3930 store.upsert_session(&make_session("tree")).unwrap();
3931 let call = make_event("tree", 0);
3932 store.append_event(&call).unwrap();
3933 assert!(store.span_tree_cache.borrow().is_none());
3934 assert!(store.session_span_tree("tree").unwrap().is_empty());
3935 assert!(store.span_tree_cache.borrow().is_some());
3936 let mut result = make_event("tree", 1);
3937 result.kind = EventKind::ToolResult;
3938 result.tool_call_id = call.tool_call_id.clone();
3939 store.append_event(&result).unwrap();
3940 assert!(store.span_tree_cache.borrow().is_none());
3941 let first = store.session_span_tree("tree").unwrap();
3942 assert_eq!(first.len(), 1);
3943 assert!(store.span_tree_cache.borrow().is_some());
3944 store.append_event(&make_event("tree", 2)).unwrap();
3945 assert!(store.span_tree_cache.borrow().is_none());
3946 }
3947
3948 #[test]
3949 fn tool_spans_in_window_uses_started_then_ended_fallback() {
3950 let dir = TempDir::new().unwrap();
3951 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3952 store.upsert_session(&make_session("spans")).unwrap();
3953 for (id, started, ended) in [
3954 ("started", Some(200_i64), None),
3955 ("fallback", None, Some(250_i64)),
3956 ("outside", Some(400_i64), None),
3957 ("too_old", None, Some(50_i64)),
3958 ("started_wins", Some(500_i64), Some(200_i64)),
3959 ] {
3960 store
3961 .conn
3962 .execute(
3963 "INSERT INTO tool_spans
3964 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3965 VALUES (?1, 'spans', 'read', 'done', ?2, ?3, '[]')",
3966 params![id, started, ended],
3967 )
3968 .unwrap();
3969 }
3970 let rows = store.tool_spans_in_window("/ws", 100, 300).unwrap();
3971 let ids = rows.into_iter().map(|r| r.span_id).collect::<Vec<_>>();
3972 assert_eq!(ids, vec!["fallback".to_string(), "started".to_string()]);
3973 }
3974
3975 #[test]
3976 fn tool_spans_sync_rows_in_window_returns_session_id_with_filtering() {
3977 let dir = TempDir::new().unwrap();
3978 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
3979 store.upsert_session(&make_session("s1")).unwrap();
3980 for (id, started, ended) in [
3981 ("inside_started", Some(150_i64), None),
3982 ("inside_ended_only", None, Some(220_i64)),
3983 ("after_window", Some(400_i64), None),
3984 ("before_window", None, Some(50_i64)),
3985 ] {
3986 store
3987 .conn
3988 .execute(
3989 "INSERT INTO tool_spans
3990 (span_id, session_id, tool, status, started_at_ms, ended_at_ms, paths_json)
3991 VALUES (?1, 's1', 'read', 'done', ?2, ?3, '[]')",
3992 params![id, started, ended],
3993 )
3994 .unwrap();
3995 }
3996 let rows = store
3997 .tool_spans_sync_rows_in_window("/ws", 100, 300)
3998 .unwrap();
3999 let ids: Vec<_> = rows.iter().map(|r| r.span_id.as_str()).collect();
4000 assert_eq!(ids, vec!["inside_started", "inside_ended_only"]);
4001 assert!(rows.iter().all(|r| r.session_id == "s1"));
4002 }
4003
4004 #[test]
4005 fn upsert_idempotent() {
4006 let dir = TempDir::new().unwrap();
4007 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4008 let mut s = make_session("s3");
4009 store.upsert_session(&s).unwrap();
4010 s.status = SessionStatus::Running;
4011 store.upsert_session(&s).unwrap();
4012
4013 let got = store.get_session("s3").unwrap().unwrap();
4014 assert_eq!(got.status, SessionStatus::Running);
4015 }
4016
4017 #[test]
4018 fn append_event_indexes_path_from_payload() {
4019 let dir = TempDir::new().unwrap();
4020 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4021 store.upsert_session(&make_session("sx")).unwrap();
4022 let mut ev = make_event("sx", 0);
4023 ev.payload = json!({"input": {"path": "src/lib.rs"}});
4024 store.append_event(&ev).unwrap();
4025 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
4026 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
4027 }
4028
4029 #[test]
4030 fn update_session_status_changes_status() {
4031 let dir = TempDir::new().unwrap();
4032 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4033 store.upsert_session(&make_session("s6")).unwrap();
4034 store
4035 .update_session_status("s6", SessionStatus::Running)
4036 .unwrap();
4037 let got = store.get_session("s6").unwrap().unwrap();
4038 assert_eq!(got.status, SessionStatus::Running);
4039 }
4040
4041 #[test]
4042 fn prune_sessions_removes_old_rows_and_keeps_recent() {
4043 let dir = TempDir::new().unwrap();
4044 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4045 let mut old = make_session("old");
4046 old.started_at_ms = 1_000;
4047 let mut new = make_session("new");
4048 new.started_at_ms = 9_000_000_000_000;
4049 store.upsert_session(&old).unwrap();
4050 store.upsert_session(&new).unwrap();
4051 store.append_event(&make_event("old", 0)).unwrap();
4052
4053 let stats = store.prune_sessions_started_before(5_000).unwrap();
4054 assert_eq!(
4055 stats,
4056 PruneStats {
4057 sessions_removed: 1,
4058 events_removed: 1,
4059 }
4060 );
4061 assert!(store.get_session("old").unwrap().is_none());
4062 assert!(store.get_session("new").unwrap().is_some());
4063 let sessions = store.list_sessions("/ws").unwrap();
4064 assert_eq!(sessions.len(), 1);
4065 assert_eq!(sessions[0].id, "new");
4066 }
4067
4068 #[test]
4069 fn append_event_indexes_rules_from_payload() {
4070 let dir = TempDir::new().unwrap();
4071 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4072 store.upsert_session(&make_session("sr")).unwrap();
4073 let mut ev = make_event("sr", 0);
4074 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
4075 store.append_event(&ev).unwrap();
4076 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
4077 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
4078 }
4079
4080 #[test]
4081 fn guidance_report_counts_skill_and_rule_sessions() {
4082 let dir = TempDir::new().unwrap();
4083 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4084 store.upsert_session(&make_session("sx")).unwrap();
4085 let mut ev = make_event("sx", 0);
4086 ev.payload =
4087 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
4088 ev.cost_usd_e6 = Some(500_000);
4089 store.append_event(&ev).unwrap();
4090
4091 let mut skill_slugs = HashSet::new();
4092 skill_slugs.insert("tdd".into());
4093 let mut rule_slugs = HashSet::new();
4094 rule_slugs.insert("style".into());
4095
4096 let rep = store
4097 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
4098 .unwrap();
4099 assert_eq!(rep.sessions_in_window, 1);
4100 let tdd = rep
4101 .rows
4102 .iter()
4103 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
4104 .unwrap();
4105 assert_eq!(tdd.sessions, 1);
4106 assert!(tdd.on_disk);
4107 let style = rep
4108 .rows
4109 .iter()
4110 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
4111 .unwrap();
4112 assert_eq!(style.sessions, 1);
4113 assert!(style.on_disk);
4114 }
4115
4116 #[test]
4117 fn prune_sessions_removes_rules_used_rows() {
4118 let dir = TempDir::new().unwrap();
4119 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
4120 let mut old = make_session("old_r");
4121 old.started_at_ms = 1_000;
4122 store.upsert_session(&old).unwrap();
4123 let mut ev = make_event("old_r", 0);
4124 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
4125 store.append_event(&ev).unwrap();
4126
4127 store.prune_sessions_started_before(5_000).unwrap();
4128 let n: i64 = store
4129 .conn
4130 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
4131 .unwrap();
4132 assert_eq!(n, 0);
4133 }
4134}