1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23 "CREATE TABLE IF NOT EXISTS sessions (
24 id TEXT PRIMARY KEY,
25 agent TEXT NOT NULL,
26 model TEXT,
27 workspace TEXT NOT NULL,
28 started_at_ms INTEGER NOT NULL,
29 ended_at_ms INTEGER,
30 status TEXT NOT NULL,
31 trace_path TEXT NOT NULL
32 )",
33 "CREATE TABLE IF NOT EXISTS events (
34 id INTEGER PRIMARY KEY AUTOINCREMENT,
35 session_id TEXT NOT NULL,
36 seq INTEGER NOT NULL,
37 ts_ms INTEGER NOT NULL,
38 kind TEXT NOT NULL,
39 source TEXT NOT NULL,
40 tool TEXT,
41 tokens_in INTEGER,
42 tokens_out INTEGER,
43 cost_usd_e6 INTEGER,
44 payload TEXT NOT NULL
45 )",
46 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47 "CREATE TABLE IF NOT EXISTS files_touched (
48 id INTEGER PRIMARY KEY AUTOINCREMENT,
49 session_id TEXT NOT NULL,
50 path TEXT NOT NULL
51 )",
52 "CREATE TABLE IF NOT EXISTS skills_used (
53 id INTEGER PRIMARY KEY AUTOINCREMENT,
54 session_id TEXT NOT NULL,
55 skill TEXT NOT NULL
56 )",
57 "CREATE TABLE IF NOT EXISTS sync_outbox (
58 id INTEGER PRIMARY KEY AUTOINCREMENT,
59 session_id TEXT NOT NULL,
60 payload TEXT NOT NULL,
61 sent INTEGER NOT NULL DEFAULT 0
62 )",
63 "CREATE TABLE IF NOT EXISTS experiments (
64 id TEXT PRIMARY KEY,
65 name TEXT NOT NULL,
66 created_at_ms INTEGER NOT NULL,
67 metadata TEXT NOT NULL DEFAULT '{}'
68 )",
69 "CREATE TABLE IF NOT EXISTS experiment_tags (
70 experiment_id TEXT NOT NULL,
71 session_id TEXT NOT NULL,
72 variant TEXT NOT NULL,
73 PRIMARY KEY (experiment_id, session_id)
74 )",
75 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76 "CREATE TABLE IF NOT EXISTS sync_state (
77 k TEXT PRIMARY KEY,
78 v TEXT NOT NULL
79 )",
80 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82 "CREATE TABLE IF NOT EXISTS tool_spans (
83 span_id TEXT PRIMARY KEY,
84 session_id TEXT NOT NULL,
85 tool TEXT,
86 tool_call_id TEXT,
87 status TEXT NOT NULL,
88 started_at_ms INTEGER,
89 ended_at_ms INTEGER,
90 lead_time_ms INTEGER,
91 tokens_in INTEGER,
92 tokens_out INTEGER,
93 reasoning_tokens INTEGER,
94 cost_usd_e6 INTEGER,
95 paths_json TEXT NOT NULL DEFAULT '[]'
96 )",
97 "CREATE TABLE IF NOT EXISTS tool_span_paths (
98 span_id TEXT NOT NULL,
99 path TEXT NOT NULL,
100 PRIMARY KEY (span_id, path)
101 )",
102 "CREATE TABLE IF NOT EXISTS session_repo_binding (
103 session_id TEXT PRIMARY KEY,
104 start_commit TEXT,
105 end_commit TEXT,
106 branch TEXT,
107 dirty_start INTEGER,
108 dirty_end INTEGER,
109 repo_binding_source TEXT NOT NULL DEFAULT ''
110 )",
111 "CREATE TABLE IF NOT EXISTS repo_snapshots (
112 id TEXT PRIMARY KEY,
113 workspace TEXT NOT NULL,
114 head_commit TEXT,
115 dirty_fingerprint TEXT NOT NULL,
116 analyzer_version TEXT NOT NULL,
117 indexed_at_ms INTEGER NOT NULL,
118 dirty INTEGER NOT NULL DEFAULT 0,
119 graph_path TEXT NOT NULL
120 )",
121 "CREATE TABLE IF NOT EXISTS file_facts (
122 snapshot_id TEXT NOT NULL,
123 path TEXT NOT NULL,
124 language TEXT NOT NULL,
125 bytes INTEGER NOT NULL,
126 loc INTEGER NOT NULL,
127 sloc INTEGER NOT NULL,
128 complexity_total INTEGER NOT NULL,
129 max_fn_complexity INTEGER NOT NULL,
130 symbol_count INTEGER NOT NULL,
131 import_count INTEGER NOT NULL,
132 fan_in INTEGER NOT NULL,
133 fan_out INTEGER NOT NULL,
134 churn_30d INTEGER NOT NULL,
135 churn_90d INTEGER NOT NULL,
136 authors_90d INTEGER NOT NULL,
137 last_changed_ms INTEGER,
138 PRIMARY KEY (snapshot_id, path)
139 )",
140 "CREATE TABLE IF NOT EXISTS repo_edges (
141 snapshot_id TEXT NOT NULL,
142 from_id TEXT NOT NULL,
143 to_id TEXT NOT NULL,
144 kind TEXT NOT NULL,
145 weight INTEGER NOT NULL,
146 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147 )",
148 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152 "CREATE TABLE IF NOT EXISTS rules_used (
153 id INTEGER PRIMARY KEY AUTOINCREMENT,
154 session_id TEXT NOT NULL,
155 rule TEXT NOT NULL
156 )",
157 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158 "CREATE TABLE IF NOT EXISTS remote_pull_state (
160 id INTEGER PRIMARY KEY CHECK (id = 1),
161 query_provider TEXT NOT NULL DEFAULT 'none',
162 cursor_json TEXT NOT NULL DEFAULT '',
163 last_success_ms INTEGER
164 )",
165 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166 "CREATE TABLE IF NOT EXISTS remote_sessions (
167 team_id TEXT NOT NULL,
168 workspace_hash TEXT NOT NULL,
169 session_id_hash TEXT NOT NULL,
170 json TEXT NOT NULL,
171 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172 )",
173 "CREATE TABLE IF NOT EXISTS remote_events (
174 team_id TEXT NOT NULL,
175 workspace_hash TEXT NOT NULL,
176 session_id_hash TEXT NOT NULL,
177 event_seq INTEGER NOT NULL,
178 json TEXT NOT NULL,
179 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180 )",
181 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182 team_id TEXT NOT NULL,
183 workspace_hash TEXT NOT NULL,
184 span_id_hash TEXT NOT NULL,
185 json TEXT NOT NULL,
186 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187 )",
188 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189 team_id TEXT NOT NULL,
190 workspace_hash TEXT NOT NULL,
191 snapshot_id_hash TEXT NOT NULL,
192 chunk_index INTEGER NOT NULL,
193 json TEXT NOT NULL,
194 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195 )",
196 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197 team_id TEXT NOT NULL,
198 workspace_hash TEXT NOT NULL,
199 fact_key TEXT NOT NULL,
200 json TEXT NOT NULL,
201 PRIMARY KEY (team_id, workspace_hash, fact_key)
202 )",
203 "CREATE TABLE IF NOT EXISTS session_evals (
204 id TEXT PRIMARY KEY,
205 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206 judge_model TEXT NOT NULL,
207 rubric_id TEXT NOT NULL,
208 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209 rationale TEXT NOT NULL,
210 flagged INTEGER NOT NULL DEFAULT 0,
211 created_at_ms INTEGER NOT NULL
212 );
213 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
215];
216
217#[derive(Clone)]
219pub struct InsightsStats {
220 pub total_sessions: u64,
221 pub running_sessions: u64,
222 pub total_events: u64,
223 pub sessions_by_day: Vec<(String, u64)>,
225 pub recent: Vec<(SessionRecord, u64)>,
227 pub top_tools: Vec<(String, u64)>,
229 pub total_cost_usd_e6: i64,
230 pub sessions_with_cost: u64,
231}
232
233pub struct SyncStatusSnapshot {
235 pub pending_outbox: u64,
236 pub last_success_ms: Option<u64>,
237 pub last_error: Option<String>,
238 pub consecutive_failures: u32,
239}
240
241#[derive(serde::Serialize)]
243pub struct SummaryStats {
244 pub session_count: u64,
245 pub total_cost_usd_e6: i64,
246 pub by_agent: Vec<(String, u64)>,
247 pub by_model: Vec<(String, u64)>,
248 pub top_tools: Vec<(String, u64)>,
249}
250
251#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
253#[serde(rename_all = "lowercase")]
254pub enum GuidanceKind {
255 Skill,
256 Rule,
257}
258
259#[derive(Clone, Debug, serde::Serialize)]
261pub struct GuidancePerfRow {
262 pub kind: GuidanceKind,
263 pub id: String,
264 pub sessions: u64,
265 pub sessions_pct: f64,
266 pub total_cost_usd_e6: i64,
267 pub avg_cost_per_session_usd: Option<f64>,
268 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
269 pub on_disk: bool,
270}
271
272#[derive(Clone, Debug, serde::Serialize)]
274pub struct GuidanceReport {
275 pub workspace: String,
276 pub window_start_ms: u64,
277 pub window_end_ms: u64,
278 pub sessions_in_window: u64,
279 pub workspace_avg_cost_per_session_usd: Option<f64>,
280 pub rows: Vec<GuidancePerfRow>,
281}
282
283#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
285pub struct PruneStats {
286 pub sessions_removed: u64,
287 pub events_removed: u64,
288}
289
290pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
292pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
293
294pub struct ToolSpanSyncRow {
295 pub span_id: String,
296 pub session_id: String,
297 pub tool: Option<String>,
298 pub tool_call_id: Option<String>,
299 pub status: String,
300 pub started_at_ms: Option<u64>,
301 pub ended_at_ms: Option<u64>,
302 pub lead_time_ms: Option<u64>,
303 pub tokens_in: Option<u32>,
304 pub tokens_out: Option<u32>,
305 pub reasoning_tokens: Option<u32>,
306 pub cost_usd_e6: Option<i64>,
307 pub paths: Vec<String>,
308}
309
310pub struct Store {
311 conn: Connection,
312}
313
314impl Store {
315 pub(crate) fn conn(&self) -> &Connection {
316 &self.conn
317 }
318
319 pub fn open(path: &Path) -> Result<Self> {
320 if let Some(parent) = path.parent() {
321 std::fs::create_dir_all(parent)?;
322 }
323 let conn =
324 Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
325 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
326 for sql in MIGRATIONS {
327 conn.execute_batch(sql)?;
328 }
329 ensure_schema_columns(&conn)?;
330 Ok(Self { conn })
331 }
332
333 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
334 self.conn.execute(
335 "INSERT INTO sessions (
336 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
337 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
338 )
339 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
340 ON CONFLICT(id) DO UPDATE SET
341 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
342 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
343 status=excluded.status, trace_path=excluded.trace_path,
344 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
345 branch=excluded.branch, dirty_start=excluded.dirty_start,
346 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source",
347 params![
348 s.id,
349 s.agent,
350 s.model,
351 s.workspace,
352 s.started_at_ms as i64,
353 s.ended_at_ms.map(|v| v as i64),
354 format!("{:?}", s.status),
355 s.trace_path,
356 s.start_commit,
357 s.end_commit,
358 s.branch,
359 s.dirty_start.map(bool_to_i64),
360 s.dirty_end.map(bool_to_i64),
361 s.repo_binding_source.clone().unwrap_or_default(),
362 ],
363 )?;
364 self.conn.execute(
365 "INSERT INTO session_repo_binding (
366 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
367 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
368 ON CONFLICT(session_id) DO UPDATE SET
369 start_commit=excluded.start_commit,
370 end_commit=excluded.end_commit,
371 branch=excluded.branch,
372 dirty_start=excluded.dirty_start,
373 dirty_end=excluded.dirty_end,
374 repo_binding_source=excluded.repo_binding_source",
375 params![
376 s.id,
377 s.start_commit,
378 s.end_commit,
379 s.branch,
380 s.dirty_start.map(bool_to_i64),
381 s.dirty_end.map(bool_to_i64),
382 s.repo_binding_source.clone().unwrap_or_default(),
383 ],
384 )?;
385 Ok(())
386 }
387
388 pub fn ensure_session_stub(
391 &self,
392 id: &str,
393 agent: &str,
394 workspace: &str,
395 started_at_ms: u64,
396 ) -> Result<()> {
397 self.conn.execute(
398 "INSERT OR IGNORE INTO sessions (
399 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
400 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
401 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
402 params![id, agent, workspace, started_at_ms as i64],
403 )?;
404 Ok(())
405 }
406
407 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
409 let n: i64 = self.conn.query_row(
410 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
411 [session_id],
412 |r| r.get(0),
413 )?;
414 Ok(n as u64)
415 }
416
417 pub fn append_event(&self, e: &Event) -> Result<()> {
418 self.append_event_with_sync(e, None)
419 }
420
421 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
423 let payload = serde_json::to_string(&e.payload)?;
424 self.conn.execute(
425 "INSERT INTO events (
426 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
427 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
428 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
429 ON CONFLICT(session_id, seq) DO UPDATE SET
430 ts_ms = excluded.ts_ms,
431 ts_exact = excluded.ts_exact,
432 kind = excluded.kind,
433 source = excluded.source,
434 tool = excluded.tool,
435 tool_call_id = excluded.tool_call_id,
436 tokens_in = excluded.tokens_in,
437 tokens_out = excluded.tokens_out,
438 reasoning_tokens = excluded.reasoning_tokens,
439 cost_usd_e6 = excluded.cost_usd_e6,
440 payload = excluded.payload",
441 params![
442 e.session_id,
443 e.seq as i64,
444 e.ts_ms as i64,
445 bool_to_i64(e.ts_exact),
446 format!("{:?}", e.kind),
447 format!("{:?}", e.source),
448 e.tool,
449 e.tool_call_id,
450 e.tokens_in.map(|v| v as i64),
451 e.tokens_out.map(|v| v as i64),
452 e.reasoning_tokens.map(|v| v as i64),
453 e.cost_usd_e6,
454 payload,
455 ],
456 )?;
457 if self.conn.changes() == 0 {
458 return Ok(());
459 }
460 index_event_derived(&self.conn, e)?;
461 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
462 let Some(ctx) = ctx else {
463 return Ok(());
464 };
465 let sync = &ctx.sync;
466 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
467 return Ok(());
468 }
469 let Some(salt) = try_team_salt(sync) else {
470 tracing::warn!(
471 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
472 );
473 return Ok(());
474 };
475 if sync.sample_rate < 1.0 {
476 let u: f64 = rand::random();
477 if u > sync.sample_rate {
478 return Ok(());
479 }
480 }
481 let Some(session) = self.get_session(&e.session_id)? else {
482 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
483 return Ok(());
484 };
485 let mut outbound = outbound_event_from_row(e, &session, &salt);
486 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
487 let row = serde_json::to_string(&outbound)?;
488 self.conn.execute(
489 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
490 params![e.session_id, row],
491 )?;
492 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
493 Ok(())
494 }
495
496 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
497 let mut stmt = self.conn.prepare(
498 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
499 )?;
500 let rows = stmt.query_map(params![limit as i64], |row| {
501 Ok((
502 row.get::<_, i64>(0)?,
503 row.get::<_, String>(1)?,
504 row.get::<_, String>(2)?,
505 ))
506 })?;
507 let mut out = Vec::new();
508 for r in rows {
509 out.push(r?);
510 }
511 Ok(out)
512 }
513
514 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
515 for id in ids {
516 self.conn
517 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
518 }
519 Ok(())
520 }
521
522 pub fn replace_outbox_rows(
523 &self,
524 owner_id: &str,
525 kind: &str,
526 payloads: &[String],
527 ) -> Result<()> {
528 self.conn.execute(
529 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
530 params![owner_id, kind],
531 )?;
532 for payload in payloads {
533 self.conn.execute(
534 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
535 params![owner_id, kind, payload],
536 )?;
537 }
538 Ok(())
539 }
540
541 pub fn outbox_pending_count(&self) -> Result<u64> {
542 let c: i64 =
543 self.conn
544 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
545 r.get(0)
546 })?;
547 Ok(c as u64)
548 }
549
550 pub fn set_sync_state_ok(&self) -> Result<()> {
551 let now = now_ms().to_string();
552 self.conn.execute(
553 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
554 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
555 params![now],
556 )?;
557 self.conn.execute(
558 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
559 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
560 [],
561 )?;
562 self.conn
563 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
564 Ok(())
565 }
566
567 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
568 let prev: i64 = self
569 .conn
570 .query_row(
571 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
572 [],
573 |r| {
574 let s: String = r.get(0)?;
575 Ok(s.parse::<i64>().unwrap_or(0))
576 },
577 )
578 .optional()?
579 .unwrap_or(0);
580 let next = prev.saturating_add(1);
581 self.conn.execute(
582 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
583 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
584 params![msg],
585 )?;
586 self.conn.execute(
587 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
588 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
589 params![next.to_string()],
590 )?;
591 Ok(())
592 }
593
594 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
595 let pending_outbox = self.outbox_pending_count()?;
596 let last_success_ms = self
597 .conn
598 .query_row(
599 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
600 [],
601 |r| r.get::<_, String>(0),
602 )
603 .optional()?
604 .and_then(|s| s.parse().ok());
605 let last_error = self
606 .conn
607 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
608 r.get::<_, String>(0)
609 })
610 .optional()?;
611 let consecutive_failures = self
612 .conn
613 .query_row(
614 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
615 [],
616 |r| r.get::<_, String>(0),
617 )
618 .optional()?
619 .and_then(|s| s.parse().ok())
620 .unwrap_or(0);
621 Ok(SyncStatusSnapshot {
622 pending_outbox,
623 last_success_ms,
624 last_error,
625 consecutive_failures,
626 })
627 }
628
629 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
630 let row: Option<String> = self
631 .conn
632 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
633 r.get::<_, String>(0)
634 })
635 .optional()?;
636 Ok(row.and_then(|s| s.parse().ok()))
637 }
638
639 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
640 self.conn.execute(
641 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
642 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
643 params![key, v.to_string()],
644 )?;
645 Ok(())
646 }
647
648 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
650 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
651 let sessions_to_remove: i64 = tx.query_row(
652 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
653 params![cutoff_ms],
654 |r| r.get(0),
655 )?;
656 let events_to_remove: i64 = tx.query_row(
657 "SELECT COUNT(*) FROM events WHERE session_id IN \
658 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
659 params![cutoff_ms],
660 |r| r.get(0),
661 )?;
662
663 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
664 tx.execute(
665 &format!(
666 "DELETE FROM tool_span_paths WHERE span_id IN \
667 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
668 ),
669 params![cutoff_ms],
670 )?;
671 tx.execute(
672 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
673 params![cutoff_ms],
674 )?;
675 tx.execute(
676 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
677 params![cutoff_ms],
678 )?;
679 tx.execute(
680 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
681 params![cutoff_ms],
682 )?;
683 tx.execute(
684 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
685 params![cutoff_ms],
686 )?;
687 tx.execute(
688 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
689 params![cutoff_ms],
690 )?;
691 tx.execute(
692 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
693 params![cutoff_ms],
694 )?;
695 tx.execute(
696 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
697 params![cutoff_ms],
698 )?;
699 tx.execute(
700 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
701 params![cutoff_ms],
702 )?;
703 tx.execute(
704 "DELETE FROM sessions WHERE started_at_ms < ?1",
705 params![cutoff_ms],
706 )?;
707 tx.commit()?;
708 Ok(PruneStats {
709 sessions_removed: sessions_to_remove as u64,
710 events_removed: events_to_remove as u64,
711 })
712 }
713
714 pub fn vacuum(&self) -> Result<()> {
716 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
717 Ok(())
718 }
719
720 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
721 let mut stmt = self.conn.prepare(
722 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
723 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
724 FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
725 )?;
726 let rows = stmt.query_map(params![workspace], |row| {
727 Ok((
728 row.get::<_, String>(0)?,
729 row.get::<_, String>(1)?,
730 row.get::<_, Option<String>>(2)?,
731 row.get::<_, String>(3)?,
732 row.get::<_, i64>(4)?,
733 row.get::<_, Option<i64>>(5)?,
734 row.get::<_, String>(6)?,
735 row.get::<_, String>(7)?,
736 row.get::<_, Option<String>>(8)?,
737 row.get::<_, Option<String>>(9)?,
738 row.get::<_, Option<String>>(10)?,
739 row.get::<_, Option<i64>>(11)?,
740 row.get::<_, Option<i64>>(12)?,
741 row.get::<_, String>(13)?,
742 ))
743 })?;
744
745 let mut out = Vec::new();
746 for row in rows {
747 let (
748 id,
749 agent,
750 model,
751 workspace,
752 started,
753 ended,
754 status_str,
755 trace,
756 start_commit,
757 end_commit,
758 branch,
759 dirty_start,
760 dirty_end,
761 source,
762 ) = row?;
763 out.push(SessionRecord {
764 id,
765 agent,
766 model,
767 workspace,
768 started_at_ms: started as u64,
769 ended_at_ms: ended.map(|v| v as u64),
770 status: status_from_str(&status_str),
771 trace_path: trace,
772 start_commit,
773 end_commit,
774 branch,
775 dirty_start: dirty_start.map(i64_to_bool),
776 dirty_end: dirty_end.map(i64_to_bool),
777 repo_binding_source: empty_to_none(source),
778 });
779 }
780 Ok(out)
781 }
782
783 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
784 let session_count: i64 = self.conn.query_row(
785 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
786 params![workspace],
787 |r| r.get(0),
788 )?;
789
790 let total_cost: i64 = self.conn.query_row(
791 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
792 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
793 params![workspace],
794 |r| r.get(0),
795 )?;
796
797 let mut stmt = self.conn.prepare(
798 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
799 )?;
800 let by_agent: Vec<(String, u64)> = stmt
801 .query_map(params![workspace], |r| {
802 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
803 })?
804 .filter_map(|r| r.ok())
805 .collect();
806
807 let mut stmt = self.conn.prepare(
808 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
809 )?;
810 let by_model: Vec<(String, u64)> = stmt
811 .query_map(params![workspace], |r| {
812 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
813 })?
814 .filter_map(|r| r.ok())
815 .collect();
816
817 let mut stmt = self.conn.prepare(
818 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
819 WHERE s.workspace = ?1 AND tool IS NOT NULL
820 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
821 )?;
822 let top_tools: Vec<(String, u64)> = stmt
823 .query_map(params![workspace], |r| {
824 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
825 })?
826 .filter_map(|r| r.ok())
827 .collect();
828
829 Ok(SummaryStats {
830 session_count: session_count as u64,
831 total_cost_usd_e6: total_cost,
832 by_agent,
833 by_model,
834 top_tools,
835 })
836 }
837
838 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
839 let mut stmt = self.conn.prepare(
840 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
841 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
842 FROM events WHERE session_id = ?1 ORDER BY seq ASC",
843 )?;
844 let rows = stmt.query_map(params![session_id], |row| {
845 Ok((
846 row.get::<_, String>(0)?,
847 row.get::<_, i64>(1)?,
848 row.get::<_, i64>(2)?,
849 row.get::<_, i64>(3)?,
850 row.get::<_, String>(4)?,
851 row.get::<_, String>(5)?,
852 row.get::<_, Option<String>>(6)?,
853 row.get::<_, Option<String>>(7)?,
854 row.get::<_, Option<i64>>(8)?,
855 row.get::<_, Option<i64>>(9)?,
856 row.get::<_, Option<i64>>(10)?,
857 row.get::<_, Option<i64>>(11)?,
858 row.get::<_, String>(12)?,
859 ))
860 })?;
861
862 let mut events = Vec::new();
863 for row in rows {
864 let (
865 sid,
866 seq,
867 ts_ms,
868 ts_exact,
869 kind_str,
870 source_str,
871 tool,
872 tool_call_id,
873 tokens_in,
874 tokens_out,
875 reasoning_tokens,
876 cost_usd_e6,
877 payload_str,
878 ) = row?;
879 events.push(Event {
880 session_id: sid,
881 seq: seq as u64,
882 ts_ms: ts_ms as u64,
883 ts_exact: ts_exact != 0,
884 kind: kind_from_str(&kind_str),
885 source: source_from_str(&source_str),
886 tool,
887 tool_call_id,
888 tokens_in: tokens_in.map(|v| v as u32),
889 tokens_out: tokens_out.map(|v| v as u32),
890 reasoning_tokens: reasoning_tokens.map(|v| v as u32),
891 cost_usd_e6,
892 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
893 });
894 }
895 Ok(events)
896 }
897
898 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
900 self.conn.execute(
901 "UPDATE sessions SET status = ?1 WHERE id = ?2",
902 params![format!("{:?}", status), id],
903 )?;
904 Ok(())
905 }
906
907 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
909 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
910 Ok(InsightsStats {
911 total_sessions: count_q(
912 &self.conn,
913 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
914 workspace,
915 )?,
916 running_sessions: count_q(
917 &self.conn,
918 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
919 workspace,
920 )?,
921 total_events: count_q(
922 &self.conn,
923 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
924 workspace,
925 )?,
926 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
927 recent: recent_sessions_3(&self.conn, workspace)?,
928 top_tools: top_tools_5(&self.conn, workspace)?,
929 total_cost_usd_e6,
930 sessions_with_cost,
931 })
932 }
933
934 pub fn retro_events_in_window(
936 &self,
937 workspace: &str,
938 start_ms: u64,
939 end_ms: u64,
940 ) -> Result<Vec<(SessionRecord, Event)>> {
941 let mut stmt = self.conn.prepare(
942 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
943 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
944 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
945 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
946 FROM events e
947 JOIN sessions s ON s.id = e.session_id
948 WHERE s.workspace = ?1
949 AND (
950 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
951 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
952 )
953 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
954 )?;
955 let rows = stmt.query_map(
956 params![
957 workspace,
958 start_ms as i64,
959 end_ms as i64,
960 SYNTHETIC_TS_CEILING_MS,
961 ],
962 |row| {
963 let payload_str: String = row.get(12)?;
964 let status_str: String = row.get(19)?;
965 Ok((
966 SessionRecord {
967 id: row.get(13)?,
968 agent: row.get(14)?,
969 model: row.get(15)?,
970 workspace: row.get(16)?,
971 started_at_ms: row.get::<_, i64>(17)? as u64,
972 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
973 status: status_from_str(&status_str),
974 trace_path: row.get(20)?,
975 start_commit: row.get(21)?,
976 end_commit: row.get(22)?,
977 branch: row.get(23)?,
978 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
979 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
980 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
981 },
982 Event {
983 session_id: row.get(0)?,
984 seq: row.get::<_, i64>(1)? as u64,
985 ts_ms: row.get::<_, i64>(2)? as u64,
986 ts_exact: row.get::<_, i64>(3)? != 0,
987 kind: kind_from_str(&row.get::<_, String>(4)?),
988 source: source_from_str(&row.get::<_, String>(5)?),
989 tool: row.get(6)?,
990 tool_call_id: row.get(7)?,
991 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
992 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
993 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
994 cost_usd_e6: row.get(11)?,
995 payload: serde_json::from_str(&payload_str)
996 .unwrap_or(serde_json::Value::Null),
997 },
998 ))
999 },
1000 )?;
1001
1002 let mut out = Vec::new();
1003 for r in rows {
1004 out.push(r?);
1005 }
1006 Ok(out)
1007 }
1008
1009 pub fn files_touched_in_window(
1011 &self,
1012 workspace: &str,
1013 start_ms: u64,
1014 end_ms: u64,
1015 ) -> Result<Vec<(String, String)>> {
1016 let mut stmt = self.conn.prepare(
1017 "SELECT DISTINCT ft.session_id, ft.path
1018 FROM files_touched ft
1019 JOIN sessions s ON s.id = ft.session_id
1020 WHERE s.workspace = ?1
1021 AND EXISTS (
1022 SELECT 1 FROM events e
1023 JOIN sessions ss ON ss.id = e.session_id
1024 WHERE e.session_id = ft.session_id
1025 AND (
1026 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1027 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1028 )
1029 )
1030 ORDER BY ft.session_id, ft.path",
1031 )?;
1032 let out: Vec<(String, String)> = stmt
1033 .query_map(
1034 params![
1035 workspace,
1036 start_ms as i64,
1037 end_ms as i64,
1038 SYNTHETIC_TS_CEILING_MS,
1039 ],
1040 |r| Ok((r.get(0)?, r.get(1)?)),
1041 )?
1042 .filter_map(|r| r.ok())
1043 .collect();
1044 Ok(out)
1045 }
1046
1047 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1050 let mut stmt = self.conn.prepare(
1051 "SELECT DISTINCT su.skill
1052 FROM skills_used su
1053 JOIN sessions s ON s.id = su.session_id
1054 WHERE s.workspace = ?1
1055 AND EXISTS (
1056 SELECT 1 FROM events e
1057 JOIN sessions ss ON ss.id = e.session_id
1058 WHERE e.session_id = su.session_id
1059 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1060 )
1061 ORDER BY su.skill",
1062 )?;
1063 let out: Vec<String> = stmt
1064 .query_map(
1065 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1066 |r| r.get::<_, String>(0),
1067 )?
1068 .filter_map(|r| r.ok())
1069 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1070 .collect();
1071 Ok(out)
1072 }
1073
1074 pub fn skills_used_in_window(
1076 &self,
1077 workspace: &str,
1078 start_ms: u64,
1079 end_ms: u64,
1080 ) -> Result<Vec<(String, String)>> {
1081 let mut stmt = self.conn.prepare(
1082 "SELECT DISTINCT su.session_id, su.skill
1083 FROM skills_used su
1084 JOIN sessions s ON s.id = su.session_id
1085 WHERE s.workspace = ?1
1086 AND EXISTS (
1087 SELECT 1 FROM events e
1088 JOIN sessions ss ON ss.id = e.session_id
1089 WHERE e.session_id = su.session_id
1090 AND (
1091 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1092 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1093 )
1094 )
1095 ORDER BY su.session_id, su.skill",
1096 )?;
1097 let out: Vec<(String, String)> = stmt
1098 .query_map(
1099 params![
1100 workspace,
1101 start_ms as i64,
1102 end_ms as i64,
1103 SYNTHETIC_TS_CEILING_MS,
1104 ],
1105 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1106 )?
1107 .filter_map(|r| r.ok())
1108 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1109 .collect();
1110 Ok(out)
1111 }
1112
1113 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1115 let mut stmt = self.conn.prepare(
1116 "SELECT DISTINCT ru.rule
1117 FROM rules_used ru
1118 JOIN sessions s ON s.id = ru.session_id
1119 WHERE s.workspace = ?1
1120 AND EXISTS (
1121 SELECT 1 FROM events e
1122 JOIN sessions ss ON ss.id = e.session_id
1123 WHERE e.session_id = ru.session_id
1124 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1125 )
1126 ORDER BY ru.rule",
1127 )?;
1128 let out: Vec<String> = stmt
1129 .query_map(
1130 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1131 |r| r.get::<_, String>(0),
1132 )?
1133 .filter_map(|r| r.ok())
1134 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1135 .collect();
1136 Ok(out)
1137 }
1138
1139 pub fn rules_used_in_window(
1141 &self,
1142 workspace: &str,
1143 start_ms: u64,
1144 end_ms: u64,
1145 ) -> Result<Vec<(String, String)>> {
1146 let mut stmt = self.conn.prepare(
1147 "SELECT DISTINCT ru.session_id, ru.rule
1148 FROM rules_used ru
1149 JOIN sessions s ON s.id = ru.session_id
1150 WHERE s.workspace = ?1
1151 AND EXISTS (
1152 SELECT 1 FROM events e
1153 JOIN sessions ss ON ss.id = e.session_id
1154 WHERE e.session_id = ru.session_id
1155 AND (
1156 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1157 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1158 )
1159 )
1160 ORDER BY ru.session_id, ru.rule",
1161 )?;
1162 let out: Vec<(String, String)> = stmt
1163 .query_map(
1164 params![
1165 workspace,
1166 start_ms as i64,
1167 end_ms as i64,
1168 SYNTHETIC_TS_CEILING_MS,
1169 ],
1170 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1171 )?
1172 .filter_map(|r| r.ok())
1173 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1174 .collect();
1175 Ok(out)
1176 }
1177
1178 pub fn sessions_active_in_window(
1180 &self,
1181 workspace: &str,
1182 start_ms: u64,
1183 end_ms: u64,
1184 ) -> Result<HashSet<String>> {
1185 let mut stmt = self.conn.prepare(
1186 "SELECT DISTINCT s.id
1187 FROM sessions s
1188 WHERE s.workspace = ?1
1189 AND EXISTS (
1190 SELECT 1 FROM events e
1191 WHERE e.session_id = s.id
1192 AND (
1193 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1194 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1195 )
1196 )",
1197 )?;
1198 let out: HashSet<String> = stmt
1199 .query_map(
1200 params![
1201 workspace,
1202 start_ms as i64,
1203 end_ms as i64,
1204 SYNTHETIC_TS_CEILING_MS,
1205 ],
1206 |r| r.get(0),
1207 )?
1208 .filter_map(|r| r.ok())
1209 .collect();
1210 Ok(out)
1211 }
1212
1213 pub fn session_costs_usd_e6_in_window(
1215 &self,
1216 workspace: &str,
1217 start_ms: u64,
1218 end_ms: u64,
1219 ) -> Result<HashMap<String, i64>> {
1220 let mut stmt = self.conn.prepare(
1221 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1222 FROM events e
1223 JOIN sessions s ON s.id = e.session_id
1224 WHERE s.workspace = ?1
1225 AND (
1226 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1227 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1228 )
1229 GROUP BY e.session_id",
1230 )?;
1231 let rows: Vec<(String, i64)> = stmt
1232 .query_map(
1233 params![
1234 workspace,
1235 start_ms as i64,
1236 end_ms as i64,
1237 SYNTHETIC_TS_CEILING_MS,
1238 ],
1239 |r| Ok((r.get(0)?, r.get(1)?)),
1240 )?
1241 .filter_map(|r| r.ok())
1242 .collect();
1243 Ok(rows.into_iter().collect())
1244 }
1245
1246 pub fn guidance_report(
1248 &self,
1249 workspace: &str,
1250 window_start_ms: u64,
1251 window_end_ms: u64,
1252 skill_slugs_on_disk: &HashSet<String>,
1253 rule_slugs_on_disk: &HashSet<String>,
1254 ) -> Result<GuidanceReport> {
1255 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1256 let denom = active.len() as u64;
1257 let costs =
1258 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1259
1260 let workspace_avg_cost_per_session_usd = if denom > 0 {
1261 let total_e6: i64 = active
1262 .iter()
1263 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1264 .sum();
1265 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1266 } else {
1267 None
1268 };
1269
1270 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1271 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1272 skill_sessions.entry(skill).or_default().insert(sid);
1273 }
1274 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1275 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1276 rule_sessions.entry(rule).or_default().insert(sid);
1277 }
1278
1279 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1280
1281 let mut push_row =
1282 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1283 let sessions = sids.len() as u64;
1284 let sessions_pct = if denom > 0 {
1285 sessions as f64 * 100.0 / denom as f64
1286 } else {
1287 0.0
1288 };
1289 let total_cost_usd_e6: i64 = sids
1290 .iter()
1291 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1292 .sum();
1293 let avg_cost_per_session_usd = if sessions > 0 {
1294 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1295 } else {
1296 None
1297 };
1298 let vs_workspace_avg_cost_per_session_usd =
1299 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1300 (Some(avg), Some(w)) => Some(avg - w),
1301 _ => None,
1302 };
1303 rows.push(GuidancePerfRow {
1304 kind,
1305 id,
1306 sessions,
1307 sessions_pct,
1308 total_cost_usd_e6,
1309 avg_cost_per_session_usd,
1310 vs_workspace_avg_cost_per_session_usd,
1311 on_disk,
1312 });
1313 };
1314
1315 let mut seen_skills: HashSet<String> = HashSet::new();
1316 for (id, sids) in &skill_sessions {
1317 seen_skills.insert(id.clone());
1318 push_row(
1319 GuidanceKind::Skill,
1320 id.clone(),
1321 sids,
1322 skill_slugs_on_disk.contains(id),
1323 );
1324 }
1325 for slug in skill_slugs_on_disk {
1326 if seen_skills.contains(slug) {
1327 continue;
1328 }
1329 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1330 }
1331
1332 let mut seen_rules: HashSet<String> = HashSet::new();
1333 for (id, sids) in &rule_sessions {
1334 seen_rules.insert(id.clone());
1335 push_row(
1336 GuidanceKind::Rule,
1337 id.clone(),
1338 sids,
1339 rule_slugs_on_disk.contains(id),
1340 );
1341 }
1342 for slug in rule_slugs_on_disk {
1343 if seen_rules.contains(slug) {
1344 continue;
1345 }
1346 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1347 }
1348
1349 rows.sort_by(|a, b| {
1350 b.sessions
1351 .cmp(&a.sessions)
1352 .then_with(|| a.kind.cmp(&b.kind))
1353 .then_with(|| a.id.cmp(&b.id))
1354 });
1355
1356 Ok(GuidanceReport {
1357 workspace: workspace.to_string(),
1358 window_start_ms,
1359 window_end_ms,
1360 sessions_in_window: denom,
1361 workspace_avg_cost_per_session_usd,
1362 rows,
1363 })
1364 }
1365
1366 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1367 let mut stmt = self.conn.prepare(
1368 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1369 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
1370 FROM sessions WHERE id = ?1",
1371 )?;
1372 let mut rows = stmt.query_map(params![id], |row| {
1373 Ok((
1374 row.get::<_, String>(0)?,
1375 row.get::<_, String>(1)?,
1376 row.get::<_, Option<String>>(2)?,
1377 row.get::<_, String>(3)?,
1378 row.get::<_, i64>(4)?,
1379 row.get::<_, Option<i64>>(5)?,
1380 row.get::<_, String>(6)?,
1381 row.get::<_, String>(7)?,
1382 row.get::<_, Option<String>>(8)?,
1383 row.get::<_, Option<String>>(9)?,
1384 row.get::<_, Option<String>>(10)?,
1385 row.get::<_, Option<i64>>(11)?,
1386 row.get::<_, Option<i64>>(12)?,
1387 row.get::<_, String>(13)?,
1388 ))
1389 })?;
1390
1391 if let Some(row) = rows.next() {
1392 let (
1393 id,
1394 agent,
1395 model,
1396 workspace,
1397 started,
1398 ended,
1399 status_str,
1400 trace,
1401 start_commit,
1402 end_commit,
1403 branch,
1404 dirty_start,
1405 dirty_end,
1406 source,
1407 ) = row?;
1408 Ok(Some(SessionRecord {
1409 id,
1410 agent,
1411 model,
1412 workspace,
1413 started_at_ms: started as u64,
1414 ended_at_ms: ended.map(|v| v as u64),
1415 status: status_from_str(&status_str),
1416 trace_path: trace,
1417 start_commit,
1418 end_commit,
1419 branch,
1420 dirty_start: dirty_start.map(i64_to_bool),
1421 dirty_end: dirty_end.map(i64_to_bool),
1422 repo_binding_source: empty_to_none(source),
1423 }))
1424 } else {
1425 Ok(None)
1426 }
1427 }
1428
1429 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1430 let mut stmt = self.conn.prepare(
1431 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1432 indexed_at_ms, dirty, graph_path
1433 FROM repo_snapshots WHERE workspace = ?1
1434 ORDER BY indexed_at_ms DESC LIMIT 1",
1435 )?;
1436 let mut rows = stmt.query_map(params![workspace], |row| {
1437 Ok(RepoSnapshotRecord {
1438 id: row.get(0)?,
1439 workspace: row.get(1)?,
1440 head_commit: row.get(2)?,
1441 dirty_fingerprint: row.get(3)?,
1442 analyzer_version: row.get(4)?,
1443 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1444 dirty: row.get::<_, i64>(6)? != 0,
1445 graph_path: row.get(7)?,
1446 })
1447 })?;
1448 Ok(rows.next().transpose()?)
1449 }
1450
1451 pub fn save_repo_snapshot(
1452 &self,
1453 snapshot: &RepoSnapshotRecord,
1454 facts: &[FileFact],
1455 edges: &[RepoEdge],
1456 ) -> Result<()> {
1457 self.conn.execute(
1458 "INSERT INTO repo_snapshots (
1459 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1460 indexed_at_ms, dirty, graph_path
1461 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1462 ON CONFLICT(id) DO UPDATE SET
1463 workspace=excluded.workspace,
1464 head_commit=excluded.head_commit,
1465 dirty_fingerprint=excluded.dirty_fingerprint,
1466 analyzer_version=excluded.analyzer_version,
1467 indexed_at_ms=excluded.indexed_at_ms,
1468 dirty=excluded.dirty,
1469 graph_path=excluded.graph_path",
1470 params![
1471 snapshot.id,
1472 snapshot.workspace,
1473 snapshot.head_commit,
1474 snapshot.dirty_fingerprint,
1475 snapshot.analyzer_version,
1476 snapshot.indexed_at_ms as i64,
1477 bool_to_i64(snapshot.dirty),
1478 snapshot.graph_path,
1479 ],
1480 )?;
1481 self.conn.execute(
1482 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1483 params![snapshot.id],
1484 )?;
1485 self.conn.execute(
1486 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1487 params![snapshot.id],
1488 )?;
1489 for fact in facts {
1490 self.conn.execute(
1491 "INSERT INTO file_facts (
1492 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1493 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1494 churn_30d, churn_90d, authors_90d, last_changed_ms
1495 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1496 params![
1497 fact.snapshot_id,
1498 fact.path,
1499 fact.language,
1500 fact.bytes as i64,
1501 fact.loc as i64,
1502 fact.sloc as i64,
1503 fact.complexity_total as i64,
1504 fact.max_fn_complexity as i64,
1505 fact.symbol_count as i64,
1506 fact.import_count as i64,
1507 fact.fan_in as i64,
1508 fact.fan_out as i64,
1509 fact.churn_30d as i64,
1510 fact.churn_90d as i64,
1511 fact.authors_90d as i64,
1512 fact.last_changed_ms.map(|v| v as i64),
1513 ],
1514 )?;
1515 }
1516 for edge in edges {
1517 self.conn.execute(
1518 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1519 VALUES (?1, ?2, ?3, ?4, ?5)
1520 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1521 DO UPDATE SET weight = weight + excluded.weight",
1522 params![
1523 snapshot.id,
1524 edge.from_path,
1525 edge.to_path,
1526 edge.kind,
1527 edge.weight as i64,
1528 ],
1529 )?;
1530 }
1531 Ok(())
1532 }
1533
1534 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1535 let mut stmt = self.conn.prepare(
1536 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1537 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1538 churn_30d, churn_90d, authors_90d, last_changed_ms
1539 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1540 )?;
1541 let rows = stmt.query_map(params![snapshot_id], |row| {
1542 Ok(FileFact {
1543 snapshot_id: row.get(0)?,
1544 path: row.get(1)?,
1545 language: row.get(2)?,
1546 bytes: row.get::<_, i64>(3)? as u64,
1547 loc: row.get::<_, i64>(4)? as u32,
1548 sloc: row.get::<_, i64>(5)? as u32,
1549 complexity_total: row.get::<_, i64>(6)? as u32,
1550 max_fn_complexity: row.get::<_, i64>(7)? as u32,
1551 symbol_count: row.get::<_, i64>(8)? as u32,
1552 import_count: row.get::<_, i64>(9)? as u32,
1553 fan_in: row.get::<_, i64>(10)? as u32,
1554 fan_out: row.get::<_, i64>(11)? as u32,
1555 churn_30d: row.get::<_, i64>(12)? as u32,
1556 churn_90d: row.get::<_, i64>(13)? as u32,
1557 authors_90d: row.get::<_, i64>(14)? as u32,
1558 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1559 })
1560 })?;
1561 Ok(rows.filter_map(|row| row.ok()).collect())
1562 }
1563
1564 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1565 let mut stmt = self.conn.prepare(
1566 "SELECT from_id, to_id, kind, weight
1567 FROM repo_edges WHERE snapshot_id = ?1
1568 ORDER BY kind, from_id, to_id",
1569 )?;
1570 let rows = stmt.query_map(params![snapshot_id], |row| {
1571 Ok(RepoEdge {
1572 from_path: row.get(0)?,
1573 to_path: row.get(1)?,
1574 kind: row.get(2)?,
1575 weight: row.get::<_, i64>(3)? as u32,
1576 })
1577 })?;
1578 Ok(rows.filter_map(|row| row.ok()).collect())
1579 }
1580
1581 pub fn tool_spans_in_window(
1582 &self,
1583 workspace: &str,
1584 start_ms: u64,
1585 end_ms: u64,
1586 ) -> Result<Vec<ToolSpanView>> {
1587 let mut stmt = self.conn.prepare(
1588 "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1589 ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1590 FROM tool_spans ts
1591 JOIN sessions s ON s.id = ts.session_id
1592 WHERE s.workspace = ?1
1593 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1594 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1595 ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1596 )?;
1597 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1598 let paths_json: String = row.get(7)?;
1599 Ok(ToolSpanView {
1600 tool: row
1601 .get::<_, Option<String>>(0)?
1602 .unwrap_or_else(|| "unknown".into()),
1603 status: row.get(1)?,
1604 lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1605 tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1606 tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1607 reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1608 cost_usd_e6: row.get(6)?,
1609 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1610 })
1611 })?;
1612 Ok(rows.filter_map(|row| row.ok()).collect())
1613 }
1614
1615 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1616 let mut stmt = self.conn.prepare(
1617 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1618 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1619 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1620 )?;
1621 let rows = stmt.query_map(params![session_id], |row| {
1622 let paths_json: String = row.get(12)?;
1623 Ok(ToolSpanSyncRow {
1624 span_id: row.get(0)?,
1625 session_id: row.get(1)?,
1626 tool: row.get(2)?,
1627 tool_call_id: row.get(3)?,
1628 status: row.get(4)?,
1629 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1630 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1631 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1632 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1633 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1634 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1635 cost_usd_e6: row.get(11)?,
1636 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1637 })
1638 })?;
1639 Ok(rows.filter_map(|row| row.ok()).collect())
1640 }
1641
1642 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1643 self.conn.execute(
1644 "INSERT OR REPLACE INTO session_evals
1645 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1646 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1647 rusqlite::params![
1648 eval.id,
1649 eval.session_id,
1650 eval.judge_model,
1651 eval.rubric_id,
1652 eval.score,
1653 eval.rationale,
1654 eval.flagged as i64,
1655 eval.created_at_ms as i64,
1656 ],
1657 )?;
1658 Ok(())
1659 }
1660
1661 pub fn list_evals_in_window(
1662 &self,
1663 start_ms: u64,
1664 end_ms: u64,
1665 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1666 let mut stmt = self.conn.prepare(
1667 "SELECT id, session_id, judge_model, rubric_id, score,
1668 rationale, flagged, created_at_ms
1669 FROM session_evals
1670 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1671 ORDER BY created_at_ms ASC",
1672 )?;
1673 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1674 Ok(crate::eval::types::EvalRow {
1675 id: r.get(0)?,
1676 session_id: r.get(1)?,
1677 judge_model: r.get(2)?,
1678 rubric_id: r.get(3)?,
1679 score: r.get(4)?,
1680 rationale: r.get(5)?,
1681 flagged: r.get::<_, i64>(6)? != 0,
1682 created_at_ms: r.get::<_, i64>(7)? as u64,
1683 })
1684 })?;
1685 rows.collect()
1686 }
1687
1688 pub fn list_evals_for_session(
1689 &self,
1690 session_id: &str,
1691 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1692 let mut stmt = self.conn.prepare(
1693 "SELECT id, session_id, judge_model, rubric_id, score,
1694 rationale, flagged, created_at_ms
1695 FROM session_evals
1696 WHERE session_id = ?1
1697 ORDER BY created_at_ms DESC",
1698 )?;
1699 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1700 Ok(crate::eval::types::EvalRow {
1701 id: r.get(0)?,
1702 session_id: r.get(1)?,
1703 judge_model: r.get(2)?,
1704 rubric_id: r.get(3)?,
1705 score: r.get(4)?,
1706 rationale: r.get(5)?,
1707 flagged: r.get::<_, i64>(6)? != 0,
1708 created_at_ms: r.get::<_, i64>(7)? as u64,
1709 })
1710 })?;
1711 rows.collect()
1712 }
1713
1714 pub fn list_sessions_for_eval(
1715 &self,
1716 since_ms: u64,
1717 min_cost_usd: f64,
1718 ) -> Result<Vec<crate::core::event::SessionRecord>> {
1719 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
1720 let mut stmt = self.conn.prepare(
1721 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1722 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
1723 s.dirty_start, s.dirty_end, s.repo_binding_source
1724 FROM sessions s
1725 WHERE s.started_at_ms >= ?1
1726 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
1727 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
1728 ORDER BY s.started_at_ms DESC",
1729 )?;
1730 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
1731 Ok((
1732 r.get::<_, String>(0)?,
1733 r.get::<_, String>(1)?,
1734 r.get::<_, Option<String>>(2)?,
1735 r.get::<_, String>(3)?,
1736 r.get::<_, i64>(4)?,
1737 r.get::<_, Option<i64>>(5)?,
1738 r.get::<_, String>(6)?,
1739 r.get::<_, String>(7)?,
1740 r.get::<_, Option<String>>(8)?,
1741 r.get::<_, Option<String>>(9)?,
1742 r.get::<_, Option<String>>(10)?,
1743 r.get::<_, Option<i64>>(11)?,
1744 r.get::<_, Option<i64>>(12)?,
1745 r.get::<_, Option<String>>(13)?,
1746 ))
1747 })?;
1748 let mut out = Vec::new();
1749 for row in rows {
1750 let (
1751 id,
1752 agent,
1753 model,
1754 workspace,
1755 started,
1756 ended,
1757 status_str,
1758 trace,
1759 start_commit,
1760 end_commit,
1761 branch,
1762 dirty_start,
1763 dirty_end,
1764 source,
1765 ) = row?;
1766 out.push(crate::core::event::SessionRecord {
1767 id,
1768 agent,
1769 model,
1770 workspace,
1771 started_at_ms: started as u64,
1772 ended_at_ms: ended.map(|v| v as u64),
1773 status: status_from_str(&status_str),
1774 trace_path: trace,
1775 start_commit,
1776 end_commit,
1777 branch,
1778 dirty_start: dirty_start.map(i64_to_bool),
1779 dirty_end: dirty_end.map(i64_to_bool),
1780 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
1781 });
1782 }
1783 Ok(out)
1784 }
1785}
1786
1787fn now_ms() -> u64 {
1788 std::time::SystemTime::now()
1789 .duration_since(std::time::UNIX_EPOCH)
1790 .unwrap_or_default()
1791 .as_millis() as u64
1792}
1793
1794fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1795 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1796}
1797
1798fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1799 let cost: i64 = conn.query_row(
1800 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1801 params![workspace], |r| r.get(0),
1802 )?;
1803 let with_cost: i64 = conn.query_row(
1804 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1805 params![workspace], |r| r.get(0),
1806 )?;
1807 Ok((cost, with_cost as u64))
1808}
1809
1810fn day_label(day_idx: u64) -> &'static str {
1811 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
1812}
1813
1814fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
1815 let week_ago = now.saturating_sub(7 * 86_400_000);
1816 let mut stmt = conn
1817 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
1818 let days: Vec<u64> = stmt
1819 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
1820 .filter_map(|r| r.ok())
1821 .map(|v| v as u64 / 86_400_000)
1822 .collect();
1823 let today = now / 86_400_000;
1824 Ok((0u64..7)
1825 .map(|i| {
1826 let d = today.saturating_sub(6 - i);
1827 (
1828 day_label(d).to_string(),
1829 days.iter().filter(|&&x| x == d).count() as u64,
1830 )
1831 })
1832 .collect())
1833}
1834
1835fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
1836 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
1837 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
1838 s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
1839 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
1840 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
1841 let mut stmt = conn.prepare(sql)?;
1842 let out: Vec<(SessionRecord, u64)> = stmt
1843 .query_map(params![workspace], |r| {
1844 let st: String = r.get(6)?;
1845 Ok((
1846 SessionRecord {
1847 id: r.get(0)?,
1848 agent: r.get(1)?,
1849 model: r.get(2)?,
1850 workspace: r.get(3)?,
1851 started_at_ms: r.get::<_, i64>(4)? as u64,
1852 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1853 status: status_from_str(&st),
1854 trace_path: r.get(7)?,
1855 start_commit: r.get(8)?,
1856 end_commit: r.get(9)?,
1857 branch: r.get(10)?,
1858 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
1859 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
1860 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
1861 },
1862 r.get::<_, i64>(14)? as u64,
1863 ))
1864 })?
1865 .filter_map(|r| r.ok())
1866 .collect();
1867 Ok(out)
1868}
1869
1870fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
1871 let mut stmt = conn.prepare(
1872 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
1873 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
1874 )?;
1875 let out: Vec<(String, u64)> = stmt
1876 .query_map(params![workspace], |r| {
1877 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1878 })?
1879 .filter_map(|r| r.ok())
1880 .collect();
1881 Ok(out)
1882}
1883
1884fn status_from_str(s: &str) -> SessionStatus {
1885 match s {
1886 "Running" => SessionStatus::Running,
1887 "Waiting" => SessionStatus::Waiting,
1888 "Idle" => SessionStatus::Idle,
1889 _ => SessionStatus::Done,
1890 }
1891}
1892
1893fn kind_from_str(s: &str) -> EventKind {
1894 match s {
1895 "ToolCall" => EventKind::ToolCall,
1896 "ToolResult" => EventKind::ToolResult,
1897 "Message" => EventKind::Message,
1898 "Error" => EventKind::Error,
1899 "Cost" => EventKind::Cost,
1900 _ => EventKind::Hook,
1901 }
1902}
1903
1904fn source_from_str(s: &str) -> EventSource {
1905 match s {
1906 "Tail" => EventSource::Tail,
1907 "Hook" => EventSource::Hook,
1908 _ => EventSource::Proxy,
1909 }
1910}
1911
1912fn ensure_schema_columns(conn: &Connection) -> Result<()> {
1913 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
1914 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
1915 ensure_column(conn, "sessions", "branch", "TEXT")?;
1916 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
1917 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
1918 ensure_column(
1919 conn,
1920 "sessions",
1921 "repo_binding_source",
1922 "TEXT NOT NULL DEFAULT ''",
1923 )?;
1924 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
1925 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
1926 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
1927 ensure_column(
1928 conn,
1929 "sync_outbox",
1930 "kind",
1931 "TEXT NOT NULL DEFAULT 'events'",
1932 )?;
1933 ensure_column(
1934 conn,
1935 "experiments",
1936 "state",
1937 "TEXT NOT NULL DEFAULT 'Draft'",
1938 )?;
1939 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
1940 Ok(())
1941}
1942
1943fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
1944 if has_column(conn, table, column)? {
1945 return Ok(());
1946 }
1947 conn.execute(
1948 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
1949 [],
1950 )?;
1951 Ok(())
1952}
1953
1954fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
1955 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1956 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1957 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
1958}
1959
1960fn bool_to_i64(v: bool) -> i64 {
1961 if v { 1 } else { 0 }
1962}
1963
1964fn i64_to_bool(v: i64) -> bool {
1965 v != 0
1966}
1967
1968fn empty_to_none(s: String) -> Option<String> {
1969 if s.is_empty() { None } else { Some(s) }
1970}
1971
1972#[cfg(test)]
1973mod tests {
1974 use super::*;
1975 use serde_json::json;
1976 use std::collections::HashSet;
1977 use tempfile::TempDir;
1978
1979 fn make_session(id: &str) -> SessionRecord {
1980 SessionRecord {
1981 id: id.to_string(),
1982 agent: "cursor".to_string(),
1983 model: None,
1984 workspace: "/ws".to_string(),
1985 started_at_ms: 1000,
1986 ended_at_ms: None,
1987 status: SessionStatus::Done,
1988 trace_path: "/trace".to_string(),
1989 start_commit: None,
1990 end_commit: None,
1991 branch: None,
1992 dirty_start: None,
1993 dirty_end: None,
1994 repo_binding_source: None,
1995 }
1996 }
1997
1998 fn make_event(session_id: &str, seq: u64) -> Event {
1999 Event {
2000 session_id: session_id.to_string(),
2001 seq,
2002 ts_ms: 1000 + seq * 100,
2003 ts_exact: false,
2004 kind: EventKind::ToolCall,
2005 source: EventSource::Tail,
2006 tool: Some("read_file".to_string()),
2007 tool_call_id: Some(format!("call_{seq}")),
2008 tokens_in: None,
2009 tokens_out: None,
2010 reasoning_tokens: None,
2011 cost_usd_e6: None,
2012 payload: json!({}),
2013 }
2014 }
2015
2016 #[test]
2017 fn open_and_wal_mode() {
2018 let dir = TempDir::new().unwrap();
2019 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2020 let mode: String = store
2021 .conn
2022 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2023 .unwrap();
2024 assert_eq!(mode, "wal");
2025 }
2026
2027 #[test]
2028 fn upsert_and_get_session() {
2029 let dir = TempDir::new().unwrap();
2030 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2031 let s = make_session("s1");
2032 store.upsert_session(&s).unwrap();
2033
2034 let got = store.get_session("s1").unwrap().unwrap();
2035 assert_eq!(got.id, "s1");
2036 assert_eq!(got.status, SessionStatus::Done);
2037 }
2038
2039 #[test]
2040 fn append_and_list_events_round_trip() {
2041 let dir = TempDir::new().unwrap();
2042 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2043 let s = make_session("s2");
2044 store.upsert_session(&s).unwrap();
2045 store.append_event(&make_event("s2", 0)).unwrap();
2046 store.append_event(&make_event("s2", 1)).unwrap();
2047
2048 let sessions = store.list_sessions("/ws").unwrap();
2049 assert_eq!(sessions.len(), 1);
2050 assert_eq!(sessions[0].id, "s2");
2051 }
2052
2053 #[test]
2054 fn summary_stats_empty() {
2055 let dir = TempDir::new().unwrap();
2056 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2057 let stats = store.summary_stats("/ws").unwrap();
2058 assert_eq!(stats.session_count, 0);
2059 assert_eq!(stats.total_cost_usd_e6, 0);
2060 }
2061
2062 #[test]
2063 fn summary_stats_counts_sessions() {
2064 let dir = TempDir::new().unwrap();
2065 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2066 store.upsert_session(&make_session("a")).unwrap();
2067 store.upsert_session(&make_session("b")).unwrap();
2068 let stats = store.summary_stats("/ws").unwrap();
2069 assert_eq!(stats.session_count, 2);
2070 assert_eq!(stats.by_agent.len(), 1);
2071 assert_eq!(stats.by_agent[0].0, "cursor");
2072 assert_eq!(stats.by_agent[0].1, 2);
2073 }
2074
2075 #[test]
2076 fn list_events_for_session_round_trip() {
2077 let dir = TempDir::new().unwrap();
2078 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2079 store.upsert_session(&make_session("s4")).unwrap();
2080 store.append_event(&make_event("s4", 0)).unwrap();
2081 store.append_event(&make_event("s4", 1)).unwrap();
2082 let events = store.list_events_for_session("s4").unwrap();
2083 assert_eq!(events.len(), 2);
2084 assert_eq!(events[0].seq, 0);
2085 assert_eq!(events[1].seq, 1);
2086 }
2087
2088 #[test]
2089 fn append_event_dedup() {
2090 let dir = TempDir::new().unwrap();
2091 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2092 store.upsert_session(&make_session("s5")).unwrap();
2093 store.append_event(&make_event("s5", 0)).unwrap();
2094 store.append_event(&make_event("s5", 0)).unwrap();
2096 let events = store.list_events_for_session("s5").unwrap();
2097 assert_eq!(events.len(), 1);
2098 }
2099
2100 #[test]
2101 fn upsert_idempotent() {
2102 let dir = TempDir::new().unwrap();
2103 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2104 let mut s = make_session("s3");
2105 store.upsert_session(&s).unwrap();
2106 s.status = SessionStatus::Running;
2107 store.upsert_session(&s).unwrap();
2108
2109 let got = store.get_session("s3").unwrap().unwrap();
2110 assert_eq!(got.status, SessionStatus::Running);
2111 }
2112
2113 #[test]
2114 fn append_event_indexes_path_from_payload() {
2115 let dir = TempDir::new().unwrap();
2116 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2117 store.upsert_session(&make_session("sx")).unwrap();
2118 let mut ev = make_event("sx", 0);
2119 ev.payload = json!({"input": {"path": "src/lib.rs"}});
2120 store.append_event(&ev).unwrap();
2121 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2122 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2123 }
2124
2125 #[test]
2126 fn update_session_status_changes_status() {
2127 let dir = TempDir::new().unwrap();
2128 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2129 store.upsert_session(&make_session("s6")).unwrap();
2130 store
2131 .update_session_status("s6", SessionStatus::Running)
2132 .unwrap();
2133 let got = store.get_session("s6").unwrap().unwrap();
2134 assert_eq!(got.status, SessionStatus::Running);
2135 }
2136
2137 #[test]
2138 fn prune_sessions_removes_old_rows_and_keeps_recent() {
2139 let dir = TempDir::new().unwrap();
2140 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2141 let mut old = make_session("old");
2142 old.started_at_ms = 1_000;
2143 let mut new = make_session("new");
2144 new.started_at_ms = 9_000_000_000_000;
2145 store.upsert_session(&old).unwrap();
2146 store.upsert_session(&new).unwrap();
2147 store.append_event(&make_event("old", 0)).unwrap();
2148
2149 let stats = store.prune_sessions_started_before(5_000).unwrap();
2150 assert_eq!(
2151 stats,
2152 PruneStats {
2153 sessions_removed: 1,
2154 events_removed: 1,
2155 }
2156 );
2157 assert!(store.get_session("old").unwrap().is_none());
2158 assert!(store.get_session("new").unwrap().is_some());
2159 let sessions = store.list_sessions("/ws").unwrap();
2160 assert_eq!(sessions.len(), 1);
2161 assert_eq!(sessions[0].id, "new");
2162 }
2163
2164 #[test]
2165 fn append_event_indexes_rules_from_payload() {
2166 let dir = TempDir::new().unwrap();
2167 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2168 store.upsert_session(&make_session("sr")).unwrap();
2169 let mut ev = make_event("sr", 0);
2170 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2171 store.append_event(&ev).unwrap();
2172 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2173 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2174 }
2175
2176 #[test]
2177 fn guidance_report_counts_skill_and_rule_sessions() {
2178 let dir = TempDir::new().unwrap();
2179 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2180 store.upsert_session(&make_session("sx")).unwrap();
2181 let mut ev = make_event("sx", 0);
2182 ev.payload =
2183 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2184 ev.cost_usd_e6 = Some(500_000);
2185 store.append_event(&ev).unwrap();
2186
2187 let mut skill_slugs = HashSet::new();
2188 skill_slugs.insert("tdd".into());
2189 let mut rule_slugs = HashSet::new();
2190 rule_slugs.insert("style".into());
2191
2192 let rep = store
2193 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2194 .unwrap();
2195 assert_eq!(rep.sessions_in_window, 1);
2196 let tdd = rep
2197 .rows
2198 .iter()
2199 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2200 .unwrap();
2201 assert_eq!(tdd.sessions, 1);
2202 assert!(tdd.on_disk);
2203 let style = rep
2204 .rows
2205 .iter()
2206 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2207 .unwrap();
2208 assert_eq!(style.sessions, 1);
2209 assert!(style.on_disk);
2210 }
2211
2212 #[test]
2213 fn prune_sessions_removes_rules_used_rows() {
2214 let dir = TempDir::new().unwrap();
2215 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2216 let mut old = make_session("old_r");
2217 old.started_at_ms = 1_000;
2218 store.upsert_session(&old).unwrap();
2219 let mut ev = make_event("old_r", 0);
2220 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2221 store.append_event(&ev).unwrap();
2222
2223 store.prune_sessions_started_before(5_000).unwrap();
2224 let n: i64 = store
2225 .conn
2226 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2227 .unwrap();
2228 assert_eq!(n, 0);
2229 }
2230}