1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23 "CREATE TABLE IF NOT EXISTS sessions (
24 id TEXT PRIMARY KEY,
25 agent TEXT NOT NULL,
26 model TEXT,
27 workspace TEXT NOT NULL,
28 started_at_ms INTEGER NOT NULL,
29 ended_at_ms INTEGER,
30 status TEXT NOT NULL,
31 trace_path TEXT NOT NULL
32 )",
33 "CREATE TABLE IF NOT EXISTS events (
34 id INTEGER PRIMARY KEY AUTOINCREMENT,
35 session_id TEXT NOT NULL,
36 seq INTEGER NOT NULL,
37 ts_ms INTEGER NOT NULL,
38 kind TEXT NOT NULL,
39 source TEXT NOT NULL,
40 tool TEXT,
41 tokens_in INTEGER,
42 tokens_out INTEGER,
43 cost_usd_e6 INTEGER,
44 payload TEXT NOT NULL
45 )",
46 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47 "CREATE TABLE IF NOT EXISTS files_touched (
48 id INTEGER PRIMARY KEY AUTOINCREMENT,
49 session_id TEXT NOT NULL,
50 path TEXT NOT NULL
51 )",
52 "CREATE TABLE IF NOT EXISTS skills_used (
53 id INTEGER PRIMARY KEY AUTOINCREMENT,
54 session_id TEXT NOT NULL,
55 skill TEXT NOT NULL
56 )",
57 "CREATE TABLE IF NOT EXISTS sync_outbox (
58 id INTEGER PRIMARY KEY AUTOINCREMENT,
59 session_id TEXT NOT NULL,
60 payload TEXT NOT NULL,
61 sent INTEGER NOT NULL DEFAULT 0
62 )",
63 "CREATE TABLE IF NOT EXISTS experiments (
64 id TEXT PRIMARY KEY,
65 name TEXT NOT NULL,
66 created_at_ms INTEGER NOT NULL,
67 metadata TEXT NOT NULL DEFAULT '{}'
68 )",
69 "CREATE TABLE IF NOT EXISTS experiment_tags (
70 experiment_id TEXT NOT NULL,
71 session_id TEXT NOT NULL,
72 variant TEXT NOT NULL,
73 PRIMARY KEY (experiment_id, session_id)
74 )",
75 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76 "CREATE TABLE IF NOT EXISTS sync_state (
77 k TEXT PRIMARY KEY,
78 v TEXT NOT NULL
79 )",
80 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82 "CREATE TABLE IF NOT EXISTS tool_spans (
83 span_id TEXT PRIMARY KEY,
84 session_id TEXT NOT NULL,
85 tool TEXT,
86 tool_call_id TEXT,
87 status TEXT NOT NULL,
88 started_at_ms INTEGER,
89 ended_at_ms INTEGER,
90 lead_time_ms INTEGER,
91 tokens_in INTEGER,
92 tokens_out INTEGER,
93 reasoning_tokens INTEGER,
94 cost_usd_e6 INTEGER,
95 paths_json TEXT NOT NULL DEFAULT '[]'
96 )",
97 "CREATE TABLE IF NOT EXISTS tool_span_paths (
98 span_id TEXT NOT NULL,
99 path TEXT NOT NULL,
100 PRIMARY KEY (span_id, path)
101 )",
102 "CREATE TABLE IF NOT EXISTS session_repo_binding (
103 session_id TEXT PRIMARY KEY,
104 start_commit TEXT,
105 end_commit TEXT,
106 branch TEXT,
107 dirty_start INTEGER,
108 dirty_end INTEGER,
109 repo_binding_source TEXT NOT NULL DEFAULT ''
110 )",
111 "CREATE TABLE IF NOT EXISTS repo_snapshots (
112 id TEXT PRIMARY KEY,
113 workspace TEXT NOT NULL,
114 head_commit TEXT,
115 dirty_fingerprint TEXT NOT NULL,
116 analyzer_version TEXT NOT NULL,
117 indexed_at_ms INTEGER NOT NULL,
118 dirty INTEGER NOT NULL DEFAULT 0,
119 graph_path TEXT NOT NULL
120 )",
121 "CREATE TABLE IF NOT EXISTS file_facts (
122 snapshot_id TEXT NOT NULL,
123 path TEXT NOT NULL,
124 language TEXT NOT NULL,
125 bytes INTEGER NOT NULL,
126 loc INTEGER NOT NULL,
127 sloc INTEGER NOT NULL,
128 complexity_total INTEGER NOT NULL,
129 max_fn_complexity INTEGER NOT NULL,
130 symbol_count INTEGER NOT NULL,
131 import_count INTEGER NOT NULL,
132 fan_in INTEGER NOT NULL,
133 fan_out INTEGER NOT NULL,
134 churn_30d INTEGER NOT NULL,
135 churn_90d INTEGER NOT NULL,
136 authors_90d INTEGER NOT NULL,
137 last_changed_ms INTEGER,
138 PRIMARY KEY (snapshot_id, path)
139 )",
140 "CREATE TABLE IF NOT EXISTS repo_edges (
141 snapshot_id TEXT NOT NULL,
142 from_id TEXT NOT NULL,
143 to_id TEXT NOT NULL,
144 kind TEXT NOT NULL,
145 weight INTEGER NOT NULL,
146 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147 )",
148 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152 "CREATE TABLE IF NOT EXISTS rules_used (
153 id INTEGER PRIMARY KEY AUTOINCREMENT,
154 session_id TEXT NOT NULL,
155 rule TEXT NOT NULL
156 )",
157 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158 "CREATE TABLE IF NOT EXISTS remote_pull_state (
160 id INTEGER PRIMARY KEY CHECK (id = 1),
161 query_provider TEXT NOT NULL DEFAULT 'none',
162 cursor_json TEXT NOT NULL DEFAULT '',
163 last_success_ms INTEGER
164 )",
165 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166 "CREATE TABLE IF NOT EXISTS remote_sessions (
167 team_id TEXT NOT NULL,
168 workspace_hash TEXT NOT NULL,
169 session_id_hash TEXT NOT NULL,
170 json TEXT NOT NULL,
171 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172 )",
173 "CREATE TABLE IF NOT EXISTS remote_events (
174 team_id TEXT NOT NULL,
175 workspace_hash TEXT NOT NULL,
176 session_id_hash TEXT NOT NULL,
177 event_seq INTEGER NOT NULL,
178 json TEXT NOT NULL,
179 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180 )",
181 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182 team_id TEXT NOT NULL,
183 workspace_hash TEXT NOT NULL,
184 span_id_hash TEXT NOT NULL,
185 json TEXT NOT NULL,
186 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187 )",
188 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189 team_id TEXT NOT NULL,
190 workspace_hash TEXT NOT NULL,
191 snapshot_id_hash TEXT NOT NULL,
192 chunk_index INTEGER NOT NULL,
193 json TEXT NOT NULL,
194 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195 )",
196 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197 team_id TEXT NOT NULL,
198 workspace_hash TEXT NOT NULL,
199 fact_key TEXT NOT NULL,
200 json TEXT NOT NULL,
201 PRIMARY KEY (team_id, workspace_hash, fact_key)
202 )",
203 "CREATE TABLE IF NOT EXISTS session_evals (
204 id TEXT PRIMARY KEY,
205 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206 judge_model TEXT NOT NULL,
207 rubric_id TEXT NOT NULL,
208 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209 rationale TEXT NOT NULL,
210 flagged INTEGER NOT NULL DEFAULT 0,
211 created_at_ms INTEGER NOT NULL
212 );
213 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
215 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
216 fingerprint TEXT PRIMARY KEY,
217 captured_at_ms INTEGER NOT NULL,
218 files_json TEXT NOT NULL,
219 total_bytes INTEGER NOT NULL
220 )",
221];
222
223#[derive(Clone)]
225pub struct InsightsStats {
226 pub total_sessions: u64,
227 pub running_sessions: u64,
228 pub total_events: u64,
229 pub sessions_by_day: Vec<(String, u64)>,
231 pub recent: Vec<(SessionRecord, u64)>,
233 pub top_tools: Vec<(String, u64)>,
235 pub total_cost_usd_e6: i64,
236 pub sessions_with_cost: u64,
237}
238
239pub struct SyncStatusSnapshot {
241 pub pending_outbox: u64,
242 pub last_success_ms: Option<u64>,
243 pub last_error: Option<String>,
244 pub consecutive_failures: u32,
245}
246
247#[derive(serde::Serialize)]
249pub struct SummaryStats {
250 pub session_count: u64,
251 pub total_cost_usd_e6: i64,
252 pub by_agent: Vec<(String, u64)>,
253 pub by_model: Vec<(String, u64)>,
254 pub top_tools: Vec<(String, u64)>,
255}
256
257#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
259#[serde(rename_all = "lowercase")]
260pub enum GuidanceKind {
261 Skill,
262 Rule,
263}
264
265#[derive(Clone, Debug, serde::Serialize)]
267pub struct GuidancePerfRow {
268 pub kind: GuidanceKind,
269 pub id: String,
270 pub sessions: u64,
271 pub sessions_pct: f64,
272 pub total_cost_usd_e6: i64,
273 pub avg_cost_per_session_usd: Option<f64>,
274 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
275 pub on_disk: bool,
276}
277
278#[derive(Clone, Debug, serde::Serialize)]
280pub struct GuidanceReport {
281 pub workspace: String,
282 pub window_start_ms: u64,
283 pub window_end_ms: u64,
284 pub sessions_in_window: u64,
285 pub workspace_avg_cost_per_session_usd: Option<f64>,
286 pub rows: Vec<GuidancePerfRow>,
287}
288
289#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
291pub struct PruneStats {
292 pub sessions_removed: u64,
293 pub events_removed: u64,
294}
295
296pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
298pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
299
300pub struct ToolSpanSyncRow {
301 pub span_id: String,
302 pub session_id: String,
303 pub tool: Option<String>,
304 pub tool_call_id: Option<String>,
305 pub status: String,
306 pub started_at_ms: Option<u64>,
307 pub ended_at_ms: Option<u64>,
308 pub lead_time_ms: Option<u64>,
309 pub tokens_in: Option<u32>,
310 pub tokens_out: Option<u32>,
311 pub reasoning_tokens: Option<u32>,
312 pub cost_usd_e6: Option<i64>,
313 pub paths: Vec<String>,
314}
315
316pub struct Store {
317 conn: Connection,
318}
319
320impl Store {
321 pub(crate) fn conn(&self) -> &Connection {
322 &self.conn
323 }
324
325 pub fn open(path: &Path) -> Result<Self> {
326 if let Some(parent) = path.parent() {
327 std::fs::create_dir_all(parent)?;
328 }
329 let conn =
330 Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
331 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
332 for sql in MIGRATIONS {
333 conn.execute_batch(sql)?;
334 }
335 ensure_schema_columns(&conn)?;
336 Ok(Self { conn })
337 }
338
339 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
340 self.conn.execute(
341 "INSERT INTO sessions (
342 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
343 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
344 prompt_fingerprint
345 )
346 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
347 ON CONFLICT(id) DO UPDATE SET
348 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
349 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
350 status=excluded.status, trace_path=excluded.trace_path,
351 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
352 branch=excluded.branch, dirty_start=excluded.dirty_start,
353 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
354 prompt_fingerprint=excluded.prompt_fingerprint",
355 params![
356 s.id,
357 s.agent,
358 s.model,
359 s.workspace,
360 s.started_at_ms as i64,
361 s.ended_at_ms.map(|v| v as i64),
362 format!("{:?}", s.status),
363 s.trace_path,
364 s.start_commit,
365 s.end_commit,
366 s.branch,
367 s.dirty_start.map(bool_to_i64),
368 s.dirty_end.map(bool_to_i64),
369 s.repo_binding_source.clone().unwrap_or_default(),
370 s.prompt_fingerprint.as_deref(),
371 ],
372 )?;
373 self.conn.execute(
374 "INSERT INTO session_repo_binding (
375 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
376 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
377 ON CONFLICT(session_id) DO UPDATE SET
378 start_commit=excluded.start_commit,
379 end_commit=excluded.end_commit,
380 branch=excluded.branch,
381 dirty_start=excluded.dirty_start,
382 dirty_end=excluded.dirty_end,
383 repo_binding_source=excluded.repo_binding_source",
384 params![
385 s.id,
386 s.start_commit,
387 s.end_commit,
388 s.branch,
389 s.dirty_start.map(bool_to_i64),
390 s.dirty_end.map(bool_to_i64),
391 s.repo_binding_source.clone().unwrap_or_default(),
392 ],
393 )?;
394 Ok(())
395 }
396
397 pub fn ensure_session_stub(
400 &self,
401 id: &str,
402 agent: &str,
403 workspace: &str,
404 started_at_ms: u64,
405 ) -> Result<()> {
406 self.conn.execute(
407 "INSERT OR IGNORE INTO sessions (
408 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
409 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
410 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
411 params![id, agent, workspace, started_at_ms as i64],
412 )?;
413 Ok(())
414 }
415
416 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
418 let n: i64 = self.conn.query_row(
419 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
420 [session_id],
421 |r| r.get(0),
422 )?;
423 Ok(n as u64)
424 }
425
426 pub fn append_event(&self, e: &Event) -> Result<()> {
427 self.append_event_with_sync(e, None)
428 }
429
430 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
432 let payload = serde_json::to_string(&e.payload)?;
433 self.conn.execute(
434 "INSERT INTO events (
435 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
436 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
437 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
438 ON CONFLICT(session_id, seq) DO UPDATE SET
439 ts_ms = excluded.ts_ms,
440 ts_exact = excluded.ts_exact,
441 kind = excluded.kind,
442 source = excluded.source,
443 tool = excluded.tool,
444 tool_call_id = excluded.tool_call_id,
445 tokens_in = excluded.tokens_in,
446 tokens_out = excluded.tokens_out,
447 reasoning_tokens = excluded.reasoning_tokens,
448 cost_usd_e6 = excluded.cost_usd_e6,
449 payload = excluded.payload",
450 params![
451 e.session_id,
452 e.seq as i64,
453 e.ts_ms as i64,
454 bool_to_i64(e.ts_exact),
455 format!("{:?}", e.kind),
456 format!("{:?}", e.source),
457 e.tool,
458 e.tool_call_id,
459 e.tokens_in.map(|v| v as i64),
460 e.tokens_out.map(|v| v as i64),
461 e.reasoning_tokens.map(|v| v as i64),
462 e.cost_usd_e6,
463 payload,
464 ],
465 )?;
466 if self.conn.changes() == 0 {
467 return Ok(());
468 }
469 index_event_derived(&self.conn, e)?;
470 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
471 let Some(ctx) = ctx else {
472 return Ok(());
473 };
474 let sync = &ctx.sync;
475 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
476 return Ok(());
477 }
478 let Some(salt) = try_team_salt(sync) else {
479 tracing::warn!(
480 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
481 );
482 return Ok(());
483 };
484 if sync.sample_rate < 1.0 {
485 let u: f64 = rand::random();
486 if u > sync.sample_rate {
487 return Ok(());
488 }
489 }
490 let Some(session) = self.get_session(&e.session_id)? else {
491 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
492 return Ok(());
493 };
494 let mut outbound = outbound_event_from_row(e, &session, &salt);
495 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
496 let row = serde_json::to_string(&outbound)?;
497 self.conn.execute(
498 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
499 params![e.session_id, row],
500 )?;
501 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
502 Ok(())
503 }
504
505 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
506 let mut stmt = self.conn.prepare(
507 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
508 )?;
509 let rows = stmt.query_map(params![limit as i64], |row| {
510 Ok((
511 row.get::<_, i64>(0)?,
512 row.get::<_, String>(1)?,
513 row.get::<_, String>(2)?,
514 ))
515 })?;
516 let mut out = Vec::new();
517 for r in rows {
518 out.push(r?);
519 }
520 Ok(out)
521 }
522
523 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
524 for id in ids {
525 self.conn
526 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
527 }
528 Ok(())
529 }
530
531 pub fn replace_outbox_rows(
532 &self,
533 owner_id: &str,
534 kind: &str,
535 payloads: &[String],
536 ) -> Result<()> {
537 self.conn.execute(
538 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
539 params![owner_id, kind],
540 )?;
541 for payload in payloads {
542 self.conn.execute(
543 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
544 params![owner_id, kind, payload],
545 )?;
546 }
547 Ok(())
548 }
549
550 pub fn outbox_pending_count(&self) -> Result<u64> {
551 let c: i64 =
552 self.conn
553 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
554 r.get(0)
555 })?;
556 Ok(c as u64)
557 }
558
559 pub fn set_sync_state_ok(&self) -> Result<()> {
560 let now = now_ms().to_string();
561 self.conn.execute(
562 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
563 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
564 params![now],
565 )?;
566 self.conn.execute(
567 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
568 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
569 [],
570 )?;
571 self.conn
572 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
573 Ok(())
574 }
575
576 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
577 let prev: i64 = self
578 .conn
579 .query_row(
580 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
581 [],
582 |r| {
583 let s: String = r.get(0)?;
584 Ok(s.parse::<i64>().unwrap_or(0))
585 },
586 )
587 .optional()?
588 .unwrap_or(0);
589 let next = prev.saturating_add(1);
590 self.conn.execute(
591 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
592 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
593 params![msg],
594 )?;
595 self.conn.execute(
596 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
597 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
598 params![next.to_string()],
599 )?;
600 Ok(())
601 }
602
603 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
604 let pending_outbox = self.outbox_pending_count()?;
605 let last_success_ms = self
606 .conn
607 .query_row(
608 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
609 [],
610 |r| r.get::<_, String>(0),
611 )
612 .optional()?
613 .and_then(|s| s.parse().ok());
614 let last_error = self
615 .conn
616 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
617 r.get::<_, String>(0)
618 })
619 .optional()?;
620 let consecutive_failures = self
621 .conn
622 .query_row(
623 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
624 [],
625 |r| r.get::<_, String>(0),
626 )
627 .optional()?
628 .and_then(|s| s.parse().ok())
629 .unwrap_or(0);
630 Ok(SyncStatusSnapshot {
631 pending_outbox,
632 last_success_ms,
633 last_error,
634 consecutive_failures,
635 })
636 }
637
638 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
639 let row: Option<String> = self
640 .conn
641 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
642 r.get::<_, String>(0)
643 })
644 .optional()?;
645 Ok(row.and_then(|s| s.parse().ok()))
646 }
647
648 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
649 self.conn.execute(
650 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
651 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
652 params![key, v.to_string()],
653 )?;
654 Ok(())
655 }
656
657 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
659 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
660 let sessions_to_remove: i64 = tx.query_row(
661 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
662 params![cutoff_ms],
663 |r| r.get(0),
664 )?;
665 let events_to_remove: i64 = tx.query_row(
666 "SELECT COUNT(*) FROM events WHERE session_id IN \
667 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
668 params![cutoff_ms],
669 |r| r.get(0),
670 )?;
671
672 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
673 tx.execute(
674 &format!(
675 "DELETE FROM tool_span_paths WHERE span_id IN \
676 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
677 ),
678 params![cutoff_ms],
679 )?;
680 tx.execute(
681 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
682 params![cutoff_ms],
683 )?;
684 tx.execute(
685 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
686 params![cutoff_ms],
687 )?;
688 tx.execute(
689 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
690 params![cutoff_ms],
691 )?;
692 tx.execute(
693 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
694 params![cutoff_ms],
695 )?;
696 tx.execute(
697 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
698 params![cutoff_ms],
699 )?;
700 tx.execute(
701 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
702 params![cutoff_ms],
703 )?;
704 tx.execute(
705 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
706 params![cutoff_ms],
707 )?;
708 tx.execute(
709 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
710 params![cutoff_ms],
711 )?;
712 tx.execute(
713 "DELETE FROM sessions WHERE started_at_ms < ?1",
714 params![cutoff_ms],
715 )?;
716 tx.commit()?;
717 Ok(PruneStats {
718 sessions_removed: sessions_to_remove as u64,
719 events_removed: events_to_remove as u64,
720 })
721 }
722
723 pub fn vacuum(&self) -> Result<()> {
725 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
726 Ok(())
727 }
728
729 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
730 let mut stmt = self.conn.prepare(
731 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
732 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
733 prompt_fingerprint
734 FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
735 )?;
736 let rows = stmt.query_map(params![workspace], |row| {
737 Ok((
738 row.get::<_, String>(0)?,
739 row.get::<_, String>(1)?,
740 row.get::<_, Option<String>>(2)?,
741 row.get::<_, String>(3)?,
742 row.get::<_, i64>(4)?,
743 row.get::<_, Option<i64>>(5)?,
744 row.get::<_, String>(6)?,
745 row.get::<_, String>(7)?,
746 row.get::<_, Option<String>>(8)?,
747 row.get::<_, Option<String>>(9)?,
748 row.get::<_, Option<String>>(10)?,
749 row.get::<_, Option<i64>>(11)?,
750 row.get::<_, Option<i64>>(12)?,
751 row.get::<_, String>(13)?,
752 row.get::<_, Option<String>>(14)?,
753 ))
754 })?;
755
756 let mut out = Vec::new();
757 for row in rows {
758 let (
759 id,
760 agent,
761 model,
762 workspace,
763 started,
764 ended,
765 status_str,
766 trace,
767 start_commit,
768 end_commit,
769 branch,
770 dirty_start,
771 dirty_end,
772 source,
773 prompt_fingerprint,
774 ) = row?;
775 out.push(SessionRecord {
776 id,
777 agent,
778 model,
779 workspace,
780 started_at_ms: started as u64,
781 ended_at_ms: ended.map(|v| v as u64),
782 status: status_from_str(&status_str),
783 trace_path: trace,
784 start_commit,
785 end_commit,
786 branch,
787 dirty_start: dirty_start.map(i64_to_bool),
788 dirty_end: dirty_end.map(i64_to_bool),
789 repo_binding_source: empty_to_none(source),
790 prompt_fingerprint,
791 });
792 }
793 Ok(out)
794 }
795
796 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
797 let session_count: i64 = self.conn.query_row(
798 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
799 params![workspace],
800 |r| r.get(0),
801 )?;
802
803 let total_cost: i64 = self.conn.query_row(
804 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
805 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
806 params![workspace],
807 |r| r.get(0),
808 )?;
809
810 let mut stmt = self.conn.prepare(
811 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
812 )?;
813 let by_agent: Vec<(String, u64)> = stmt
814 .query_map(params![workspace], |r| {
815 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
816 })?
817 .filter_map(|r| r.ok())
818 .collect();
819
820 let mut stmt = self.conn.prepare(
821 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
822 )?;
823 let by_model: Vec<(String, u64)> = stmt
824 .query_map(params![workspace], |r| {
825 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
826 })?
827 .filter_map(|r| r.ok())
828 .collect();
829
830 let mut stmt = self.conn.prepare(
831 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
832 WHERE s.workspace = ?1 AND tool IS NOT NULL
833 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
834 )?;
835 let top_tools: Vec<(String, u64)> = stmt
836 .query_map(params![workspace], |r| {
837 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
838 })?
839 .filter_map(|r| r.ok())
840 .collect();
841
842 Ok(SummaryStats {
843 session_count: session_count as u64,
844 total_cost_usd_e6: total_cost,
845 by_agent,
846 by_model,
847 top_tools,
848 })
849 }
850
851 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
852 let mut stmt = self.conn.prepare(
853 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
854 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
855 FROM events WHERE session_id = ?1 ORDER BY seq ASC",
856 )?;
857 let rows = stmt.query_map(params![session_id], |row| {
858 Ok((
859 row.get::<_, String>(0)?,
860 row.get::<_, i64>(1)?,
861 row.get::<_, i64>(2)?,
862 row.get::<_, i64>(3)?,
863 row.get::<_, String>(4)?,
864 row.get::<_, String>(5)?,
865 row.get::<_, Option<String>>(6)?,
866 row.get::<_, Option<String>>(7)?,
867 row.get::<_, Option<i64>>(8)?,
868 row.get::<_, Option<i64>>(9)?,
869 row.get::<_, Option<i64>>(10)?,
870 row.get::<_, Option<i64>>(11)?,
871 row.get::<_, String>(12)?,
872 ))
873 })?;
874
875 let mut events = Vec::new();
876 for row in rows {
877 let (
878 sid,
879 seq,
880 ts_ms,
881 ts_exact,
882 kind_str,
883 source_str,
884 tool,
885 tool_call_id,
886 tokens_in,
887 tokens_out,
888 reasoning_tokens,
889 cost_usd_e6,
890 payload_str,
891 ) = row?;
892 events.push(Event {
893 session_id: sid,
894 seq: seq as u64,
895 ts_ms: ts_ms as u64,
896 ts_exact: ts_exact != 0,
897 kind: kind_from_str(&kind_str),
898 source: source_from_str(&source_str),
899 tool,
900 tool_call_id,
901 tokens_in: tokens_in.map(|v| v as u32),
902 tokens_out: tokens_out.map(|v| v as u32),
903 reasoning_tokens: reasoning_tokens.map(|v| v as u32),
904 cost_usd_e6,
905 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
906 });
907 }
908 Ok(events)
909 }
910
911 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
913 self.conn.execute(
914 "UPDATE sessions SET status = ?1 WHERE id = ?2",
915 params![format!("{:?}", status), id],
916 )?;
917 Ok(())
918 }
919
920 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
922 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
923 Ok(InsightsStats {
924 total_sessions: count_q(
925 &self.conn,
926 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
927 workspace,
928 )?,
929 running_sessions: count_q(
930 &self.conn,
931 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
932 workspace,
933 )?,
934 total_events: count_q(
935 &self.conn,
936 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
937 workspace,
938 )?,
939 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
940 recent: recent_sessions_3(&self.conn, workspace)?,
941 top_tools: top_tools_5(&self.conn, workspace)?,
942 total_cost_usd_e6,
943 sessions_with_cost,
944 })
945 }
946
947 pub fn retro_events_in_window(
949 &self,
950 workspace: &str,
951 start_ms: u64,
952 end_ms: u64,
953 ) -> Result<Vec<(SessionRecord, Event)>> {
954 let mut stmt = self.conn.prepare(
955 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
956 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
957 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
958 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
959 FROM events e
960 JOIN sessions s ON s.id = e.session_id
961 WHERE s.workspace = ?1
962 AND (
963 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
964 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
965 )
966 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
967 )?;
968 let rows = stmt.query_map(
969 params![
970 workspace,
971 start_ms as i64,
972 end_ms as i64,
973 SYNTHETIC_TS_CEILING_MS,
974 ],
975 |row| {
976 let payload_str: String = row.get(12)?;
977 let status_str: String = row.get(19)?;
978 Ok((
979 SessionRecord {
980 id: row.get(13)?,
981 agent: row.get(14)?,
982 model: row.get(15)?,
983 workspace: row.get(16)?,
984 started_at_ms: row.get::<_, i64>(17)? as u64,
985 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
986 status: status_from_str(&status_str),
987 trace_path: row.get(20)?,
988 start_commit: row.get(21)?,
989 end_commit: row.get(22)?,
990 branch: row.get(23)?,
991 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
992 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
993 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
994 prompt_fingerprint: None,
995 },
996 Event {
997 session_id: row.get(0)?,
998 seq: row.get::<_, i64>(1)? as u64,
999 ts_ms: row.get::<_, i64>(2)? as u64,
1000 ts_exact: row.get::<_, i64>(3)? != 0,
1001 kind: kind_from_str(&row.get::<_, String>(4)?),
1002 source: source_from_str(&row.get::<_, String>(5)?),
1003 tool: row.get(6)?,
1004 tool_call_id: row.get(7)?,
1005 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1006 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1007 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1008 cost_usd_e6: row.get(11)?,
1009 payload: serde_json::from_str(&payload_str)
1010 .unwrap_or(serde_json::Value::Null),
1011 },
1012 ))
1013 },
1014 )?;
1015
1016 let mut out = Vec::new();
1017 for r in rows {
1018 out.push(r?);
1019 }
1020 Ok(out)
1021 }
1022
1023 pub fn files_touched_in_window(
1025 &self,
1026 workspace: &str,
1027 start_ms: u64,
1028 end_ms: u64,
1029 ) -> Result<Vec<(String, String)>> {
1030 let mut stmt = self.conn.prepare(
1031 "SELECT DISTINCT ft.session_id, ft.path
1032 FROM files_touched ft
1033 JOIN sessions s ON s.id = ft.session_id
1034 WHERE s.workspace = ?1
1035 AND EXISTS (
1036 SELECT 1 FROM events e
1037 JOIN sessions ss ON ss.id = e.session_id
1038 WHERE e.session_id = ft.session_id
1039 AND (
1040 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1041 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1042 )
1043 )
1044 ORDER BY ft.session_id, ft.path",
1045 )?;
1046 let out: Vec<(String, String)> = stmt
1047 .query_map(
1048 params![
1049 workspace,
1050 start_ms as i64,
1051 end_ms as i64,
1052 SYNTHETIC_TS_CEILING_MS,
1053 ],
1054 |r| Ok((r.get(0)?, r.get(1)?)),
1055 )?
1056 .filter_map(|r| r.ok())
1057 .collect();
1058 Ok(out)
1059 }
1060
1061 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1064 let mut stmt = self.conn.prepare(
1065 "SELECT DISTINCT su.skill
1066 FROM skills_used su
1067 JOIN sessions s ON s.id = su.session_id
1068 WHERE s.workspace = ?1
1069 AND EXISTS (
1070 SELECT 1 FROM events e
1071 JOIN sessions ss ON ss.id = e.session_id
1072 WHERE e.session_id = su.session_id
1073 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1074 )
1075 ORDER BY su.skill",
1076 )?;
1077 let out: Vec<String> = stmt
1078 .query_map(
1079 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1080 |r| r.get::<_, String>(0),
1081 )?
1082 .filter_map(|r| r.ok())
1083 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1084 .collect();
1085 Ok(out)
1086 }
1087
1088 pub fn skills_used_in_window(
1090 &self,
1091 workspace: &str,
1092 start_ms: u64,
1093 end_ms: u64,
1094 ) -> Result<Vec<(String, String)>> {
1095 let mut stmt = self.conn.prepare(
1096 "SELECT DISTINCT su.session_id, su.skill
1097 FROM skills_used su
1098 JOIN sessions s ON s.id = su.session_id
1099 WHERE s.workspace = ?1
1100 AND EXISTS (
1101 SELECT 1 FROM events e
1102 JOIN sessions ss ON ss.id = e.session_id
1103 WHERE e.session_id = su.session_id
1104 AND (
1105 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1106 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1107 )
1108 )
1109 ORDER BY su.session_id, su.skill",
1110 )?;
1111 let out: Vec<(String, String)> = stmt
1112 .query_map(
1113 params![
1114 workspace,
1115 start_ms as i64,
1116 end_ms as i64,
1117 SYNTHETIC_TS_CEILING_MS,
1118 ],
1119 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1120 )?
1121 .filter_map(|r| r.ok())
1122 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1123 .collect();
1124 Ok(out)
1125 }
1126
1127 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1129 let mut stmt = self.conn.prepare(
1130 "SELECT DISTINCT ru.rule
1131 FROM rules_used ru
1132 JOIN sessions s ON s.id = ru.session_id
1133 WHERE s.workspace = ?1
1134 AND EXISTS (
1135 SELECT 1 FROM events e
1136 JOIN sessions ss ON ss.id = e.session_id
1137 WHERE e.session_id = ru.session_id
1138 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1139 )
1140 ORDER BY ru.rule",
1141 )?;
1142 let out: Vec<String> = stmt
1143 .query_map(
1144 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1145 |r| r.get::<_, String>(0),
1146 )?
1147 .filter_map(|r| r.ok())
1148 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1149 .collect();
1150 Ok(out)
1151 }
1152
1153 pub fn rules_used_in_window(
1155 &self,
1156 workspace: &str,
1157 start_ms: u64,
1158 end_ms: u64,
1159 ) -> Result<Vec<(String, String)>> {
1160 let mut stmt = self.conn.prepare(
1161 "SELECT DISTINCT ru.session_id, ru.rule
1162 FROM rules_used ru
1163 JOIN sessions s ON s.id = ru.session_id
1164 WHERE s.workspace = ?1
1165 AND EXISTS (
1166 SELECT 1 FROM events e
1167 JOIN sessions ss ON ss.id = e.session_id
1168 WHERE e.session_id = ru.session_id
1169 AND (
1170 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1171 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1172 )
1173 )
1174 ORDER BY ru.session_id, ru.rule",
1175 )?;
1176 let out: Vec<(String, String)> = stmt
1177 .query_map(
1178 params![
1179 workspace,
1180 start_ms as i64,
1181 end_ms as i64,
1182 SYNTHETIC_TS_CEILING_MS,
1183 ],
1184 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1185 )?
1186 .filter_map(|r| r.ok())
1187 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1188 .collect();
1189 Ok(out)
1190 }
1191
1192 pub fn sessions_active_in_window(
1194 &self,
1195 workspace: &str,
1196 start_ms: u64,
1197 end_ms: u64,
1198 ) -> Result<HashSet<String>> {
1199 let mut stmt = self.conn.prepare(
1200 "SELECT DISTINCT s.id
1201 FROM sessions s
1202 WHERE s.workspace = ?1
1203 AND EXISTS (
1204 SELECT 1 FROM events e
1205 WHERE e.session_id = s.id
1206 AND (
1207 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1208 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1209 )
1210 )",
1211 )?;
1212 let out: HashSet<String> = stmt
1213 .query_map(
1214 params![
1215 workspace,
1216 start_ms as i64,
1217 end_ms as i64,
1218 SYNTHETIC_TS_CEILING_MS,
1219 ],
1220 |r| r.get(0),
1221 )?
1222 .filter_map(|r| r.ok())
1223 .collect();
1224 Ok(out)
1225 }
1226
1227 pub fn session_costs_usd_e6_in_window(
1229 &self,
1230 workspace: &str,
1231 start_ms: u64,
1232 end_ms: u64,
1233 ) -> Result<HashMap<String, i64>> {
1234 let mut stmt = self.conn.prepare(
1235 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1236 FROM events e
1237 JOIN sessions s ON s.id = e.session_id
1238 WHERE s.workspace = ?1
1239 AND (
1240 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1241 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1242 )
1243 GROUP BY e.session_id",
1244 )?;
1245 let rows: Vec<(String, i64)> = stmt
1246 .query_map(
1247 params![
1248 workspace,
1249 start_ms as i64,
1250 end_ms as i64,
1251 SYNTHETIC_TS_CEILING_MS,
1252 ],
1253 |r| Ok((r.get(0)?, r.get(1)?)),
1254 )?
1255 .filter_map(|r| r.ok())
1256 .collect();
1257 Ok(rows.into_iter().collect())
1258 }
1259
1260 pub fn guidance_report(
1262 &self,
1263 workspace: &str,
1264 window_start_ms: u64,
1265 window_end_ms: u64,
1266 skill_slugs_on_disk: &HashSet<String>,
1267 rule_slugs_on_disk: &HashSet<String>,
1268 ) -> Result<GuidanceReport> {
1269 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1270 let denom = active.len() as u64;
1271 let costs =
1272 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1273
1274 let workspace_avg_cost_per_session_usd = if denom > 0 {
1275 let total_e6: i64 = active
1276 .iter()
1277 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1278 .sum();
1279 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1280 } else {
1281 None
1282 };
1283
1284 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1285 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1286 skill_sessions.entry(skill).or_default().insert(sid);
1287 }
1288 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1289 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1290 rule_sessions.entry(rule).or_default().insert(sid);
1291 }
1292
1293 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1294
1295 let mut push_row =
1296 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1297 let sessions = sids.len() as u64;
1298 let sessions_pct = if denom > 0 {
1299 sessions as f64 * 100.0 / denom as f64
1300 } else {
1301 0.0
1302 };
1303 let total_cost_usd_e6: i64 = sids
1304 .iter()
1305 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1306 .sum();
1307 let avg_cost_per_session_usd = if sessions > 0 {
1308 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1309 } else {
1310 None
1311 };
1312 let vs_workspace_avg_cost_per_session_usd =
1313 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1314 (Some(avg), Some(w)) => Some(avg - w),
1315 _ => None,
1316 };
1317 rows.push(GuidancePerfRow {
1318 kind,
1319 id,
1320 sessions,
1321 sessions_pct,
1322 total_cost_usd_e6,
1323 avg_cost_per_session_usd,
1324 vs_workspace_avg_cost_per_session_usd,
1325 on_disk,
1326 });
1327 };
1328
1329 let mut seen_skills: HashSet<String> = HashSet::new();
1330 for (id, sids) in &skill_sessions {
1331 seen_skills.insert(id.clone());
1332 push_row(
1333 GuidanceKind::Skill,
1334 id.clone(),
1335 sids,
1336 skill_slugs_on_disk.contains(id),
1337 );
1338 }
1339 for slug in skill_slugs_on_disk {
1340 if seen_skills.contains(slug) {
1341 continue;
1342 }
1343 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1344 }
1345
1346 let mut seen_rules: HashSet<String> = HashSet::new();
1347 for (id, sids) in &rule_sessions {
1348 seen_rules.insert(id.clone());
1349 push_row(
1350 GuidanceKind::Rule,
1351 id.clone(),
1352 sids,
1353 rule_slugs_on_disk.contains(id),
1354 );
1355 }
1356 for slug in rule_slugs_on_disk {
1357 if seen_rules.contains(slug) {
1358 continue;
1359 }
1360 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1361 }
1362
1363 rows.sort_by(|a, b| {
1364 b.sessions
1365 .cmp(&a.sessions)
1366 .then_with(|| a.kind.cmp(&b.kind))
1367 .then_with(|| a.id.cmp(&b.id))
1368 });
1369
1370 Ok(GuidanceReport {
1371 workspace: workspace.to_string(),
1372 window_start_ms,
1373 window_end_ms,
1374 sessions_in_window: denom,
1375 workspace_avg_cost_per_session_usd,
1376 rows,
1377 })
1378 }
1379
1380 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1381 let mut stmt = self.conn.prepare(
1382 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1383 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1384 prompt_fingerprint
1385 FROM sessions WHERE id = ?1",
1386 )?;
1387 let mut rows = stmt.query_map(params![id], |row| {
1388 Ok((
1389 row.get::<_, String>(0)?,
1390 row.get::<_, String>(1)?,
1391 row.get::<_, Option<String>>(2)?,
1392 row.get::<_, String>(3)?,
1393 row.get::<_, i64>(4)?,
1394 row.get::<_, Option<i64>>(5)?,
1395 row.get::<_, String>(6)?,
1396 row.get::<_, String>(7)?,
1397 row.get::<_, Option<String>>(8)?,
1398 row.get::<_, Option<String>>(9)?,
1399 row.get::<_, Option<String>>(10)?,
1400 row.get::<_, Option<i64>>(11)?,
1401 row.get::<_, Option<i64>>(12)?,
1402 row.get::<_, String>(13)?,
1403 row.get::<_, Option<String>>(14)?,
1404 ))
1405 })?;
1406
1407 if let Some(row) = rows.next() {
1408 let (
1409 id,
1410 agent,
1411 model,
1412 workspace,
1413 started,
1414 ended,
1415 status_str,
1416 trace,
1417 start_commit,
1418 end_commit,
1419 branch,
1420 dirty_start,
1421 dirty_end,
1422 source,
1423 prompt_fingerprint,
1424 ) = row?;
1425 Ok(Some(SessionRecord {
1426 id,
1427 agent,
1428 model,
1429 workspace,
1430 started_at_ms: started as u64,
1431 ended_at_ms: ended.map(|v| v as u64),
1432 status: status_from_str(&status_str),
1433 trace_path: trace,
1434 start_commit,
1435 end_commit,
1436 branch,
1437 dirty_start: dirty_start.map(i64_to_bool),
1438 dirty_end: dirty_end.map(i64_to_bool),
1439 repo_binding_source: empty_to_none(source),
1440 prompt_fingerprint,
1441 }))
1442 } else {
1443 Ok(None)
1444 }
1445 }
1446
1447 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1448 let mut stmt = self.conn.prepare(
1449 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1450 indexed_at_ms, dirty, graph_path
1451 FROM repo_snapshots WHERE workspace = ?1
1452 ORDER BY indexed_at_ms DESC LIMIT 1",
1453 )?;
1454 let mut rows = stmt.query_map(params![workspace], |row| {
1455 Ok(RepoSnapshotRecord {
1456 id: row.get(0)?,
1457 workspace: row.get(1)?,
1458 head_commit: row.get(2)?,
1459 dirty_fingerprint: row.get(3)?,
1460 analyzer_version: row.get(4)?,
1461 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1462 dirty: row.get::<_, i64>(6)? != 0,
1463 graph_path: row.get(7)?,
1464 })
1465 })?;
1466 Ok(rows.next().transpose()?)
1467 }
1468
1469 pub fn save_repo_snapshot(
1470 &self,
1471 snapshot: &RepoSnapshotRecord,
1472 facts: &[FileFact],
1473 edges: &[RepoEdge],
1474 ) -> Result<()> {
1475 self.conn.execute(
1476 "INSERT INTO repo_snapshots (
1477 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1478 indexed_at_ms, dirty, graph_path
1479 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1480 ON CONFLICT(id) DO UPDATE SET
1481 workspace=excluded.workspace,
1482 head_commit=excluded.head_commit,
1483 dirty_fingerprint=excluded.dirty_fingerprint,
1484 analyzer_version=excluded.analyzer_version,
1485 indexed_at_ms=excluded.indexed_at_ms,
1486 dirty=excluded.dirty,
1487 graph_path=excluded.graph_path",
1488 params![
1489 snapshot.id,
1490 snapshot.workspace,
1491 snapshot.head_commit,
1492 snapshot.dirty_fingerprint,
1493 snapshot.analyzer_version,
1494 snapshot.indexed_at_ms as i64,
1495 bool_to_i64(snapshot.dirty),
1496 snapshot.graph_path,
1497 ],
1498 )?;
1499 self.conn.execute(
1500 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1501 params![snapshot.id],
1502 )?;
1503 self.conn.execute(
1504 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1505 params![snapshot.id],
1506 )?;
1507 for fact in facts {
1508 self.conn.execute(
1509 "INSERT INTO file_facts (
1510 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1511 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1512 churn_30d, churn_90d, authors_90d, last_changed_ms
1513 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1514 params![
1515 fact.snapshot_id,
1516 fact.path,
1517 fact.language,
1518 fact.bytes as i64,
1519 fact.loc as i64,
1520 fact.sloc as i64,
1521 fact.complexity_total as i64,
1522 fact.max_fn_complexity as i64,
1523 fact.symbol_count as i64,
1524 fact.import_count as i64,
1525 fact.fan_in as i64,
1526 fact.fan_out as i64,
1527 fact.churn_30d as i64,
1528 fact.churn_90d as i64,
1529 fact.authors_90d as i64,
1530 fact.last_changed_ms.map(|v| v as i64),
1531 ],
1532 )?;
1533 }
1534 for edge in edges {
1535 self.conn.execute(
1536 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1537 VALUES (?1, ?2, ?3, ?4, ?5)
1538 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1539 DO UPDATE SET weight = weight + excluded.weight",
1540 params![
1541 snapshot.id,
1542 edge.from_path,
1543 edge.to_path,
1544 edge.kind,
1545 edge.weight as i64,
1546 ],
1547 )?;
1548 }
1549 Ok(())
1550 }
1551
1552 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1553 let mut stmt = self.conn.prepare(
1554 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1555 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1556 churn_30d, churn_90d, authors_90d, last_changed_ms
1557 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1558 )?;
1559 let rows = stmt.query_map(params![snapshot_id], |row| {
1560 Ok(FileFact {
1561 snapshot_id: row.get(0)?,
1562 path: row.get(1)?,
1563 language: row.get(2)?,
1564 bytes: row.get::<_, i64>(3)? as u64,
1565 loc: row.get::<_, i64>(4)? as u32,
1566 sloc: row.get::<_, i64>(5)? as u32,
1567 complexity_total: row.get::<_, i64>(6)? as u32,
1568 max_fn_complexity: row.get::<_, i64>(7)? as u32,
1569 symbol_count: row.get::<_, i64>(8)? as u32,
1570 import_count: row.get::<_, i64>(9)? as u32,
1571 fan_in: row.get::<_, i64>(10)? as u32,
1572 fan_out: row.get::<_, i64>(11)? as u32,
1573 churn_30d: row.get::<_, i64>(12)? as u32,
1574 churn_90d: row.get::<_, i64>(13)? as u32,
1575 authors_90d: row.get::<_, i64>(14)? as u32,
1576 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1577 })
1578 })?;
1579 Ok(rows.filter_map(|row| row.ok()).collect())
1580 }
1581
1582 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1583 let mut stmt = self.conn.prepare(
1584 "SELECT from_id, to_id, kind, weight
1585 FROM repo_edges WHERE snapshot_id = ?1
1586 ORDER BY kind, from_id, to_id",
1587 )?;
1588 let rows = stmt.query_map(params![snapshot_id], |row| {
1589 Ok(RepoEdge {
1590 from_path: row.get(0)?,
1591 to_path: row.get(1)?,
1592 kind: row.get(2)?,
1593 weight: row.get::<_, i64>(3)? as u32,
1594 })
1595 })?;
1596 Ok(rows.filter_map(|row| row.ok()).collect())
1597 }
1598
1599 pub fn tool_spans_in_window(
1600 &self,
1601 workspace: &str,
1602 start_ms: u64,
1603 end_ms: u64,
1604 ) -> Result<Vec<ToolSpanView>> {
1605 let mut stmt = self.conn.prepare(
1606 "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1607 ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1608 FROM tool_spans ts
1609 JOIN sessions s ON s.id = ts.session_id
1610 WHERE s.workspace = ?1
1611 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1612 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1613 ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1614 )?;
1615 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1616 let paths_json: String = row.get(7)?;
1617 Ok(ToolSpanView {
1618 tool: row
1619 .get::<_, Option<String>>(0)?
1620 .unwrap_or_else(|| "unknown".into()),
1621 status: row.get(1)?,
1622 lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1623 tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1624 tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1625 reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1626 cost_usd_e6: row.get(6)?,
1627 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1628 })
1629 })?;
1630 Ok(rows.filter_map(|row| row.ok()).collect())
1631 }
1632
1633 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1634 let mut stmt = self.conn.prepare(
1635 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1636 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1637 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1638 )?;
1639 let rows = stmt.query_map(params![session_id], |row| {
1640 let paths_json: String = row.get(12)?;
1641 Ok(ToolSpanSyncRow {
1642 span_id: row.get(0)?,
1643 session_id: row.get(1)?,
1644 tool: row.get(2)?,
1645 tool_call_id: row.get(3)?,
1646 status: row.get(4)?,
1647 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1648 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1649 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1650 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1651 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1652 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1653 cost_usd_e6: row.get(11)?,
1654 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1655 })
1656 })?;
1657 Ok(rows.filter_map(|row| row.ok()).collect())
1658 }
1659
1660 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1661 self.conn.execute(
1662 "INSERT OR REPLACE INTO session_evals
1663 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1664 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1665 rusqlite::params![
1666 eval.id,
1667 eval.session_id,
1668 eval.judge_model,
1669 eval.rubric_id,
1670 eval.score,
1671 eval.rationale,
1672 eval.flagged as i64,
1673 eval.created_at_ms as i64,
1674 ],
1675 )?;
1676 Ok(())
1677 }
1678
1679 pub fn list_evals_in_window(
1680 &self,
1681 start_ms: u64,
1682 end_ms: u64,
1683 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1684 let mut stmt = self.conn.prepare(
1685 "SELECT id, session_id, judge_model, rubric_id, score,
1686 rationale, flagged, created_at_ms
1687 FROM session_evals
1688 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1689 ORDER BY created_at_ms ASC",
1690 )?;
1691 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1692 Ok(crate::eval::types::EvalRow {
1693 id: r.get(0)?,
1694 session_id: r.get(1)?,
1695 judge_model: r.get(2)?,
1696 rubric_id: r.get(3)?,
1697 score: r.get(4)?,
1698 rationale: r.get(5)?,
1699 flagged: r.get::<_, i64>(6)? != 0,
1700 created_at_ms: r.get::<_, i64>(7)? as u64,
1701 })
1702 })?;
1703 rows.collect()
1704 }
1705
1706 pub fn list_evals_for_session(
1707 &self,
1708 session_id: &str,
1709 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1710 let mut stmt = self.conn.prepare(
1711 "SELECT id, session_id, judge_model, rubric_id, score,
1712 rationale, flagged, created_at_ms
1713 FROM session_evals
1714 WHERE session_id = ?1
1715 ORDER BY created_at_ms DESC",
1716 )?;
1717 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1718 Ok(crate::eval::types::EvalRow {
1719 id: r.get(0)?,
1720 session_id: r.get(1)?,
1721 judge_model: r.get(2)?,
1722 rubric_id: r.get(3)?,
1723 score: r.get(4)?,
1724 rationale: r.get(5)?,
1725 flagged: r.get::<_, i64>(6)? != 0,
1726 created_at_ms: r.get::<_, i64>(7)? as u64,
1727 })
1728 })?;
1729 rows.collect()
1730 }
1731
1732 pub fn list_sessions_for_eval(
1733 &self,
1734 since_ms: u64,
1735 min_cost_usd: f64,
1736 ) -> Result<Vec<crate::core::event::SessionRecord>> {
1737 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
1738 let mut stmt = self.conn.prepare(
1739 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1740 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
1741 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint
1742 FROM sessions s
1743 WHERE s.started_at_ms >= ?1
1744 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
1745 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
1746 ORDER BY s.started_at_ms DESC",
1747 )?;
1748 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
1749 Ok((
1750 r.get::<_, String>(0)?,
1751 r.get::<_, String>(1)?,
1752 r.get::<_, Option<String>>(2)?,
1753 r.get::<_, String>(3)?,
1754 r.get::<_, i64>(4)?,
1755 r.get::<_, Option<i64>>(5)?,
1756 r.get::<_, String>(6)?,
1757 r.get::<_, String>(7)?,
1758 r.get::<_, Option<String>>(8)?,
1759 r.get::<_, Option<String>>(9)?,
1760 r.get::<_, Option<String>>(10)?,
1761 r.get::<_, Option<i64>>(11)?,
1762 r.get::<_, Option<i64>>(12)?,
1763 r.get::<_, Option<String>>(13)?,
1764 r.get::<_, Option<String>>(14)?,
1765 ))
1766 })?;
1767 let mut out = Vec::new();
1768 for row in rows {
1769 let (
1770 id,
1771 agent,
1772 model,
1773 workspace,
1774 started,
1775 ended,
1776 status_str,
1777 trace,
1778 start_commit,
1779 end_commit,
1780 branch,
1781 dirty_start,
1782 dirty_end,
1783 source,
1784 prompt_fingerprint,
1785 ) = row?;
1786 out.push(crate::core::event::SessionRecord {
1787 id,
1788 agent,
1789 model,
1790 workspace,
1791 started_at_ms: started as u64,
1792 ended_at_ms: ended.map(|v| v as u64),
1793 status: status_from_str(&status_str),
1794 trace_path: trace,
1795 start_commit,
1796 end_commit,
1797 branch,
1798 dirty_start: dirty_start.map(i64_to_bool),
1799 dirty_end: dirty_end.map(i64_to_bool),
1800 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
1801 prompt_fingerprint,
1802 });
1803 }
1804 Ok(out)
1805 }
1806
1807 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
1808 self.conn.execute(
1809 "INSERT OR IGNORE INTO prompt_snapshots
1810 (fingerprint, captured_at_ms, files_json, total_bytes)
1811 VALUES (?1, ?2, ?3, ?4)",
1812 params![
1813 snap.fingerprint,
1814 snap.captured_at_ms as i64,
1815 snap.files_json,
1816 snap.total_bytes as i64
1817 ],
1818 )?;
1819 Ok(())
1820 }
1821
1822 pub fn get_prompt_snapshot(
1823 &self,
1824 fingerprint: &str,
1825 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
1826 self.conn
1827 .query_row(
1828 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1829 FROM prompt_snapshots WHERE fingerprint = ?1",
1830 params![fingerprint],
1831 |r| {
1832 Ok(crate::prompt::PromptSnapshot {
1833 fingerprint: r.get(0)?,
1834 captured_at_ms: r.get::<_, i64>(1)? as u64,
1835 files_json: r.get(2)?,
1836 total_bytes: r.get::<_, i64>(3)? as u64,
1837 })
1838 },
1839 )
1840 .optional()
1841 .map_err(Into::into)
1842 }
1843
1844 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
1845 let mut stmt = self.conn.prepare(
1846 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1847 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
1848 )?;
1849 let rows = stmt.query_map([], |r| {
1850 Ok(crate::prompt::PromptSnapshot {
1851 fingerprint: r.get(0)?,
1852 captured_at_ms: r.get::<_, i64>(1)? as u64,
1853 files_json: r.get(2)?,
1854 total_bytes: r.get::<_, i64>(3)? as u64,
1855 })
1856 })?;
1857 Ok(rows.filter_map(|r| r.ok()).collect())
1858 }
1859
1860 pub fn sessions_with_prompt_fingerprint(
1862 &self,
1863 workspace: &str,
1864 start_ms: u64,
1865 end_ms: u64,
1866 ) -> Result<Vec<(String, String)>> {
1867 let mut stmt = self.conn.prepare(
1868 "SELECT id, prompt_fingerprint FROM sessions
1869 WHERE workspace = ?1
1870 AND started_at_ms >= ?2 AND started_at_ms < ?3
1871 AND prompt_fingerprint IS NOT NULL",
1872 )?;
1873 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
1874 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1875 })?;
1876 Ok(rows.filter_map(|r| r.ok()).collect())
1877 }
1878}
1879
1880fn now_ms() -> u64 {
1881 std::time::SystemTime::now()
1882 .duration_since(std::time::UNIX_EPOCH)
1883 .unwrap_or_default()
1884 .as_millis() as u64
1885}
1886
1887fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1888 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1889}
1890
1891fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1892 let cost: i64 = conn.query_row(
1893 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1894 params![workspace], |r| r.get(0),
1895 )?;
1896 let with_cost: i64 = conn.query_row(
1897 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1898 params![workspace], |r| r.get(0),
1899 )?;
1900 Ok((cost, with_cost as u64))
1901}
1902
1903fn day_label(day_idx: u64) -> &'static str {
1904 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
1905}
1906
1907fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
1908 let week_ago = now.saturating_sub(7 * 86_400_000);
1909 let mut stmt = conn
1910 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
1911 let days: Vec<u64> = stmt
1912 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
1913 .filter_map(|r| r.ok())
1914 .map(|v| v as u64 / 86_400_000)
1915 .collect();
1916 let today = now / 86_400_000;
1917 Ok((0u64..7)
1918 .map(|i| {
1919 let d = today.saturating_sub(6 - i);
1920 (
1921 day_label(d).to_string(),
1922 days.iter().filter(|&&x| x == d).count() as u64,
1923 )
1924 })
1925 .collect())
1926}
1927
1928fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
1929 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
1930 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
1931 s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
1932 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
1933 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
1934 let mut stmt = conn.prepare(sql)?;
1935 let out: Vec<(SessionRecord, u64)> = stmt
1936 .query_map(params![workspace], |r| {
1937 let st: String = r.get(6)?;
1938 Ok((
1939 SessionRecord {
1940 id: r.get(0)?,
1941 agent: r.get(1)?,
1942 model: r.get(2)?,
1943 workspace: r.get(3)?,
1944 started_at_ms: r.get::<_, i64>(4)? as u64,
1945 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1946 status: status_from_str(&st),
1947 trace_path: r.get(7)?,
1948 start_commit: r.get(8)?,
1949 end_commit: r.get(9)?,
1950 branch: r.get(10)?,
1951 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
1952 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
1953 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
1954 prompt_fingerprint: None,
1955 },
1956 r.get::<_, i64>(14)? as u64,
1957 ))
1958 })?
1959 .filter_map(|r| r.ok())
1960 .collect();
1961 Ok(out)
1962}
1963
1964fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
1965 let mut stmt = conn.prepare(
1966 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
1967 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
1968 )?;
1969 let out: Vec<(String, u64)> = stmt
1970 .query_map(params![workspace], |r| {
1971 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1972 })?
1973 .filter_map(|r| r.ok())
1974 .collect();
1975 Ok(out)
1976}
1977
1978fn status_from_str(s: &str) -> SessionStatus {
1979 match s {
1980 "Running" => SessionStatus::Running,
1981 "Waiting" => SessionStatus::Waiting,
1982 "Idle" => SessionStatus::Idle,
1983 _ => SessionStatus::Done,
1984 }
1985}
1986
1987fn kind_from_str(s: &str) -> EventKind {
1988 match s {
1989 "ToolCall" => EventKind::ToolCall,
1990 "ToolResult" => EventKind::ToolResult,
1991 "Message" => EventKind::Message,
1992 "Error" => EventKind::Error,
1993 "Cost" => EventKind::Cost,
1994 _ => EventKind::Hook,
1995 }
1996}
1997
1998fn source_from_str(s: &str) -> EventSource {
1999 match s {
2000 "Tail" => EventSource::Tail,
2001 "Hook" => EventSource::Hook,
2002 _ => EventSource::Proxy,
2003 }
2004}
2005
2006fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2007 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2008 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2009 ensure_column(conn, "sessions", "branch", "TEXT")?;
2010 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2011 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2012 ensure_column(
2013 conn,
2014 "sessions",
2015 "repo_binding_source",
2016 "TEXT NOT NULL DEFAULT ''",
2017 )?;
2018 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2019 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2020 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2021 ensure_column(
2022 conn,
2023 "sync_outbox",
2024 "kind",
2025 "TEXT NOT NULL DEFAULT 'events'",
2026 )?;
2027 ensure_column(
2028 conn,
2029 "experiments",
2030 "state",
2031 "TEXT NOT NULL DEFAULT 'Draft'",
2032 )?;
2033 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2034 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2035 Ok(())
2036}
2037
2038fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2039 if has_column(conn, table, column)? {
2040 return Ok(());
2041 }
2042 conn.execute(
2043 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2044 [],
2045 )?;
2046 Ok(())
2047}
2048
2049fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2050 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2051 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2052 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2053}
2054
2055fn bool_to_i64(v: bool) -> i64 {
2056 if v { 1 } else { 0 }
2057}
2058
2059fn i64_to_bool(v: i64) -> bool {
2060 v != 0
2061}
2062
2063fn empty_to_none(s: String) -> Option<String> {
2064 if s.is_empty() { None } else { Some(s) }
2065}
2066
2067#[cfg(test)]
2068mod tests {
2069 use super::*;
2070 use serde_json::json;
2071 use std::collections::HashSet;
2072 use tempfile::TempDir;
2073
2074 fn make_session(id: &str) -> SessionRecord {
2075 SessionRecord {
2076 id: id.to_string(),
2077 agent: "cursor".to_string(),
2078 model: None,
2079 workspace: "/ws".to_string(),
2080 started_at_ms: 1000,
2081 ended_at_ms: None,
2082 status: SessionStatus::Done,
2083 trace_path: "/trace".to_string(),
2084 start_commit: None,
2085 end_commit: None,
2086 branch: None,
2087 dirty_start: None,
2088 dirty_end: None,
2089 repo_binding_source: None,
2090 prompt_fingerprint: None,
2091 }
2092 }
2093
2094 fn make_event(session_id: &str, seq: u64) -> Event {
2095 Event {
2096 session_id: session_id.to_string(),
2097 seq,
2098 ts_ms: 1000 + seq * 100,
2099 ts_exact: false,
2100 kind: EventKind::ToolCall,
2101 source: EventSource::Tail,
2102 tool: Some("read_file".to_string()),
2103 tool_call_id: Some(format!("call_{seq}")),
2104 tokens_in: None,
2105 tokens_out: None,
2106 reasoning_tokens: None,
2107 cost_usd_e6: None,
2108 payload: json!({}),
2109 }
2110 }
2111
2112 #[test]
2113 fn open_and_wal_mode() {
2114 let dir = TempDir::new().unwrap();
2115 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2116 let mode: String = store
2117 .conn
2118 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2119 .unwrap();
2120 assert_eq!(mode, "wal");
2121 }
2122
2123 #[test]
2124 fn upsert_and_get_session() {
2125 let dir = TempDir::new().unwrap();
2126 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2127 let s = make_session("s1");
2128 store.upsert_session(&s).unwrap();
2129
2130 let got = store.get_session("s1").unwrap().unwrap();
2131 assert_eq!(got.id, "s1");
2132 assert_eq!(got.status, SessionStatus::Done);
2133 }
2134
2135 #[test]
2136 fn append_and_list_events_round_trip() {
2137 let dir = TempDir::new().unwrap();
2138 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2139 let s = make_session("s2");
2140 store.upsert_session(&s).unwrap();
2141 store.append_event(&make_event("s2", 0)).unwrap();
2142 store.append_event(&make_event("s2", 1)).unwrap();
2143
2144 let sessions = store.list_sessions("/ws").unwrap();
2145 assert_eq!(sessions.len(), 1);
2146 assert_eq!(sessions[0].id, "s2");
2147 }
2148
2149 #[test]
2150 fn summary_stats_empty() {
2151 let dir = TempDir::new().unwrap();
2152 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2153 let stats = store.summary_stats("/ws").unwrap();
2154 assert_eq!(stats.session_count, 0);
2155 assert_eq!(stats.total_cost_usd_e6, 0);
2156 }
2157
2158 #[test]
2159 fn summary_stats_counts_sessions() {
2160 let dir = TempDir::new().unwrap();
2161 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2162 store.upsert_session(&make_session("a")).unwrap();
2163 store.upsert_session(&make_session("b")).unwrap();
2164 let stats = store.summary_stats("/ws").unwrap();
2165 assert_eq!(stats.session_count, 2);
2166 assert_eq!(stats.by_agent.len(), 1);
2167 assert_eq!(stats.by_agent[0].0, "cursor");
2168 assert_eq!(stats.by_agent[0].1, 2);
2169 }
2170
2171 #[test]
2172 fn list_events_for_session_round_trip() {
2173 let dir = TempDir::new().unwrap();
2174 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2175 store.upsert_session(&make_session("s4")).unwrap();
2176 store.append_event(&make_event("s4", 0)).unwrap();
2177 store.append_event(&make_event("s4", 1)).unwrap();
2178 let events = store.list_events_for_session("s4").unwrap();
2179 assert_eq!(events.len(), 2);
2180 assert_eq!(events[0].seq, 0);
2181 assert_eq!(events[1].seq, 1);
2182 }
2183
2184 #[test]
2185 fn append_event_dedup() {
2186 let dir = TempDir::new().unwrap();
2187 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2188 store.upsert_session(&make_session("s5")).unwrap();
2189 store.append_event(&make_event("s5", 0)).unwrap();
2190 store.append_event(&make_event("s5", 0)).unwrap();
2192 let events = store.list_events_for_session("s5").unwrap();
2193 assert_eq!(events.len(), 1);
2194 }
2195
2196 #[test]
2197 fn upsert_idempotent() {
2198 let dir = TempDir::new().unwrap();
2199 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2200 let mut s = make_session("s3");
2201 store.upsert_session(&s).unwrap();
2202 s.status = SessionStatus::Running;
2203 store.upsert_session(&s).unwrap();
2204
2205 let got = store.get_session("s3").unwrap().unwrap();
2206 assert_eq!(got.status, SessionStatus::Running);
2207 }
2208
2209 #[test]
2210 fn append_event_indexes_path_from_payload() {
2211 let dir = TempDir::new().unwrap();
2212 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2213 store.upsert_session(&make_session("sx")).unwrap();
2214 let mut ev = make_event("sx", 0);
2215 ev.payload = json!({"input": {"path": "src/lib.rs"}});
2216 store.append_event(&ev).unwrap();
2217 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2218 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2219 }
2220
2221 #[test]
2222 fn update_session_status_changes_status() {
2223 let dir = TempDir::new().unwrap();
2224 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2225 store.upsert_session(&make_session("s6")).unwrap();
2226 store
2227 .update_session_status("s6", SessionStatus::Running)
2228 .unwrap();
2229 let got = store.get_session("s6").unwrap().unwrap();
2230 assert_eq!(got.status, SessionStatus::Running);
2231 }
2232
2233 #[test]
2234 fn prune_sessions_removes_old_rows_and_keeps_recent() {
2235 let dir = TempDir::new().unwrap();
2236 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2237 let mut old = make_session("old");
2238 old.started_at_ms = 1_000;
2239 let mut new = make_session("new");
2240 new.started_at_ms = 9_000_000_000_000;
2241 store.upsert_session(&old).unwrap();
2242 store.upsert_session(&new).unwrap();
2243 store.append_event(&make_event("old", 0)).unwrap();
2244
2245 let stats = store.prune_sessions_started_before(5_000).unwrap();
2246 assert_eq!(
2247 stats,
2248 PruneStats {
2249 sessions_removed: 1,
2250 events_removed: 1,
2251 }
2252 );
2253 assert!(store.get_session("old").unwrap().is_none());
2254 assert!(store.get_session("new").unwrap().is_some());
2255 let sessions = store.list_sessions("/ws").unwrap();
2256 assert_eq!(sessions.len(), 1);
2257 assert_eq!(sessions[0].id, "new");
2258 }
2259
2260 #[test]
2261 fn append_event_indexes_rules_from_payload() {
2262 let dir = TempDir::new().unwrap();
2263 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2264 store.upsert_session(&make_session("sr")).unwrap();
2265 let mut ev = make_event("sr", 0);
2266 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2267 store.append_event(&ev).unwrap();
2268 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2269 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2270 }
2271
2272 #[test]
2273 fn guidance_report_counts_skill_and_rule_sessions() {
2274 let dir = TempDir::new().unwrap();
2275 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2276 store.upsert_session(&make_session("sx")).unwrap();
2277 let mut ev = make_event("sx", 0);
2278 ev.payload =
2279 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2280 ev.cost_usd_e6 = Some(500_000);
2281 store.append_event(&ev).unwrap();
2282
2283 let mut skill_slugs = HashSet::new();
2284 skill_slugs.insert("tdd".into());
2285 let mut rule_slugs = HashSet::new();
2286 rule_slugs.insert("style".into());
2287
2288 let rep = store
2289 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2290 .unwrap();
2291 assert_eq!(rep.sessions_in_window, 1);
2292 let tdd = rep
2293 .rows
2294 .iter()
2295 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2296 .unwrap();
2297 assert_eq!(tdd.sessions, 1);
2298 assert!(tdd.on_disk);
2299 let style = rep
2300 .rows
2301 .iter()
2302 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2303 .unwrap();
2304 assert_eq!(style.sessions, 1);
2305 assert!(style.on_disk);
2306 }
2307
2308 #[test]
2309 fn prune_sessions_removes_rules_used_rows() {
2310 let dir = TempDir::new().unwrap();
2311 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2312 let mut old = make_session("old_r");
2313 old.started_at_ms = 1_000;
2314 store.upsert_session(&old).unwrap();
2315 let mut ev = make_event("old_r", 0);
2316 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2317 store.append_event(&ev).unwrap();
2318
2319 store.prune_sessions_started_before(5_000).unwrap();
2320 let n: i64 = store
2321 .conn
2322 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2323 .unwrap();
2324 assert_eq!(n, 0);
2325 }
2326}