1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23 "CREATE TABLE IF NOT EXISTS sessions (
24 id TEXT PRIMARY KEY,
25 agent TEXT NOT NULL,
26 model TEXT,
27 workspace TEXT NOT NULL,
28 started_at_ms INTEGER NOT NULL,
29 ended_at_ms INTEGER,
30 status TEXT NOT NULL,
31 trace_path TEXT NOT NULL
32 )",
33 "CREATE TABLE IF NOT EXISTS events (
34 id INTEGER PRIMARY KEY AUTOINCREMENT,
35 session_id TEXT NOT NULL,
36 seq INTEGER NOT NULL,
37 ts_ms INTEGER NOT NULL,
38 kind TEXT NOT NULL,
39 source TEXT NOT NULL,
40 tool TEXT,
41 tokens_in INTEGER,
42 tokens_out INTEGER,
43 cost_usd_e6 INTEGER,
44 payload TEXT NOT NULL
45 )",
46 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47 "CREATE TABLE IF NOT EXISTS files_touched (
48 id INTEGER PRIMARY KEY AUTOINCREMENT,
49 session_id TEXT NOT NULL,
50 path TEXT NOT NULL
51 )",
52 "CREATE TABLE IF NOT EXISTS skills_used (
53 id INTEGER PRIMARY KEY AUTOINCREMENT,
54 session_id TEXT NOT NULL,
55 skill TEXT NOT NULL
56 )",
57 "CREATE TABLE IF NOT EXISTS sync_outbox (
58 id INTEGER PRIMARY KEY AUTOINCREMENT,
59 session_id TEXT NOT NULL,
60 payload TEXT NOT NULL,
61 sent INTEGER NOT NULL DEFAULT 0
62 )",
63 "CREATE TABLE IF NOT EXISTS experiments (
64 id TEXT PRIMARY KEY,
65 name TEXT NOT NULL,
66 created_at_ms INTEGER NOT NULL,
67 metadata TEXT NOT NULL DEFAULT '{}'
68 )",
69 "CREATE TABLE IF NOT EXISTS experiment_tags (
70 experiment_id TEXT NOT NULL,
71 session_id TEXT NOT NULL,
72 variant TEXT NOT NULL,
73 PRIMARY KEY (experiment_id, session_id)
74 )",
75 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76 "CREATE TABLE IF NOT EXISTS sync_state (
77 k TEXT PRIMARY KEY,
78 v TEXT NOT NULL
79 )",
80 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82 "CREATE TABLE IF NOT EXISTS tool_spans (
83 span_id TEXT PRIMARY KEY,
84 session_id TEXT NOT NULL,
85 tool TEXT,
86 tool_call_id TEXT,
87 status TEXT NOT NULL,
88 started_at_ms INTEGER,
89 ended_at_ms INTEGER,
90 lead_time_ms INTEGER,
91 tokens_in INTEGER,
92 tokens_out INTEGER,
93 reasoning_tokens INTEGER,
94 cost_usd_e6 INTEGER,
95 paths_json TEXT NOT NULL DEFAULT '[]'
96 )",
97 "CREATE TABLE IF NOT EXISTS tool_span_paths (
98 span_id TEXT NOT NULL,
99 path TEXT NOT NULL,
100 PRIMARY KEY (span_id, path)
101 )",
102 "CREATE TABLE IF NOT EXISTS session_repo_binding (
103 session_id TEXT PRIMARY KEY,
104 start_commit TEXT,
105 end_commit TEXT,
106 branch TEXT,
107 dirty_start INTEGER,
108 dirty_end INTEGER,
109 repo_binding_source TEXT NOT NULL DEFAULT ''
110 )",
111 "CREATE TABLE IF NOT EXISTS repo_snapshots (
112 id TEXT PRIMARY KEY,
113 workspace TEXT NOT NULL,
114 head_commit TEXT,
115 dirty_fingerprint TEXT NOT NULL,
116 analyzer_version TEXT NOT NULL,
117 indexed_at_ms INTEGER NOT NULL,
118 dirty INTEGER NOT NULL DEFAULT 0,
119 graph_path TEXT NOT NULL
120 )",
121 "CREATE TABLE IF NOT EXISTS file_facts (
122 snapshot_id TEXT NOT NULL,
123 path TEXT NOT NULL,
124 language TEXT NOT NULL,
125 bytes INTEGER NOT NULL,
126 loc INTEGER NOT NULL,
127 sloc INTEGER NOT NULL,
128 complexity_total INTEGER NOT NULL,
129 max_fn_complexity INTEGER NOT NULL,
130 symbol_count INTEGER NOT NULL,
131 import_count INTEGER NOT NULL,
132 fan_in INTEGER NOT NULL,
133 fan_out INTEGER NOT NULL,
134 churn_30d INTEGER NOT NULL,
135 churn_90d INTEGER NOT NULL,
136 authors_90d INTEGER NOT NULL,
137 last_changed_ms INTEGER,
138 PRIMARY KEY (snapshot_id, path)
139 )",
140 "CREATE TABLE IF NOT EXISTS repo_edges (
141 snapshot_id TEXT NOT NULL,
142 from_id TEXT NOT NULL,
143 to_id TEXT NOT NULL,
144 kind TEXT NOT NULL,
145 weight INTEGER NOT NULL,
146 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147 )",
148 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152 "CREATE TABLE IF NOT EXISTS rules_used (
153 id INTEGER PRIMARY KEY AUTOINCREMENT,
154 session_id TEXT NOT NULL,
155 rule TEXT NOT NULL
156 )",
157 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158 "CREATE TABLE IF NOT EXISTS remote_pull_state (
160 id INTEGER PRIMARY KEY CHECK (id = 1),
161 query_provider TEXT NOT NULL DEFAULT 'none',
162 cursor_json TEXT NOT NULL DEFAULT '',
163 last_success_ms INTEGER
164 )",
165 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166 "CREATE TABLE IF NOT EXISTS remote_sessions (
167 team_id TEXT NOT NULL,
168 workspace_hash TEXT NOT NULL,
169 session_id_hash TEXT NOT NULL,
170 json TEXT NOT NULL,
171 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172 )",
173 "CREATE TABLE IF NOT EXISTS remote_events (
174 team_id TEXT NOT NULL,
175 workspace_hash TEXT NOT NULL,
176 session_id_hash TEXT NOT NULL,
177 event_seq INTEGER NOT NULL,
178 json TEXT NOT NULL,
179 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180 )",
181 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182 team_id TEXT NOT NULL,
183 workspace_hash TEXT NOT NULL,
184 span_id_hash TEXT NOT NULL,
185 json TEXT NOT NULL,
186 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187 )",
188 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189 team_id TEXT NOT NULL,
190 workspace_hash TEXT NOT NULL,
191 snapshot_id_hash TEXT NOT NULL,
192 chunk_index INTEGER NOT NULL,
193 json TEXT NOT NULL,
194 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195 )",
196 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197 team_id TEXT NOT NULL,
198 workspace_hash TEXT NOT NULL,
199 fact_key TEXT NOT NULL,
200 json TEXT NOT NULL,
201 PRIMARY KEY (team_id, workspace_hash, fact_key)
202 )",
203];
204
205#[derive(Clone)]
207pub struct InsightsStats {
208 pub total_sessions: u64,
209 pub running_sessions: u64,
210 pub total_events: u64,
211 pub sessions_by_day: Vec<(String, u64)>,
213 pub recent: Vec<(SessionRecord, u64)>,
215 pub top_tools: Vec<(String, u64)>,
217 pub total_cost_usd_e6: i64,
218 pub sessions_with_cost: u64,
219}
220
221pub struct SyncStatusSnapshot {
223 pub pending_outbox: u64,
224 pub last_success_ms: Option<u64>,
225 pub last_error: Option<String>,
226 pub consecutive_failures: u32,
227}
228
229#[derive(serde::Serialize)]
231pub struct SummaryStats {
232 pub session_count: u64,
233 pub total_cost_usd_e6: i64,
234 pub by_agent: Vec<(String, u64)>,
235 pub by_model: Vec<(String, u64)>,
236 pub top_tools: Vec<(String, u64)>,
237}
238
239#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
241#[serde(rename_all = "lowercase")]
242pub enum GuidanceKind {
243 Skill,
244 Rule,
245}
246
247#[derive(Clone, Debug, serde::Serialize)]
249pub struct GuidancePerfRow {
250 pub kind: GuidanceKind,
251 pub id: String,
252 pub sessions: u64,
253 pub sessions_pct: f64,
254 pub total_cost_usd_e6: i64,
255 pub avg_cost_per_session_usd: Option<f64>,
256 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
257 pub on_disk: bool,
258}
259
260#[derive(Clone, Debug, serde::Serialize)]
262pub struct GuidanceReport {
263 pub workspace: String,
264 pub window_start_ms: u64,
265 pub window_end_ms: u64,
266 pub sessions_in_window: u64,
267 pub workspace_avg_cost_per_session_usd: Option<f64>,
268 pub rows: Vec<GuidancePerfRow>,
269}
270
271#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
273pub struct PruneStats {
274 pub sessions_removed: u64,
275 pub events_removed: u64,
276}
277
278pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
280pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
281
282pub struct ToolSpanSyncRow {
283 pub span_id: String,
284 pub session_id: String,
285 pub tool: Option<String>,
286 pub tool_call_id: Option<String>,
287 pub status: String,
288 pub started_at_ms: Option<u64>,
289 pub ended_at_ms: Option<u64>,
290 pub lead_time_ms: Option<u64>,
291 pub tokens_in: Option<u32>,
292 pub tokens_out: Option<u32>,
293 pub reasoning_tokens: Option<u32>,
294 pub cost_usd_e6: Option<i64>,
295 pub paths: Vec<String>,
296}
297
298pub struct Store {
299 conn: Connection,
300}
301
302impl Store {
303 pub(crate) fn conn(&self) -> &Connection {
304 &self.conn
305 }
306
307 pub fn open(path: &Path) -> Result<Self> {
308 if let Some(parent) = path.parent() {
309 std::fs::create_dir_all(parent)?;
310 }
311 let conn =
312 Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
313 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
314 for sql in MIGRATIONS {
315 conn.execute_batch(sql)?;
316 }
317 ensure_schema_columns(&conn)?;
318 Ok(Self { conn })
319 }
320
321 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
322 self.conn.execute(
323 "INSERT INTO sessions (
324 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
325 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
326 )
327 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
328 ON CONFLICT(id) DO UPDATE SET
329 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
330 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
331 status=excluded.status, trace_path=excluded.trace_path,
332 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
333 branch=excluded.branch, dirty_start=excluded.dirty_start,
334 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source",
335 params![
336 s.id,
337 s.agent,
338 s.model,
339 s.workspace,
340 s.started_at_ms as i64,
341 s.ended_at_ms.map(|v| v as i64),
342 format!("{:?}", s.status),
343 s.trace_path,
344 s.start_commit,
345 s.end_commit,
346 s.branch,
347 s.dirty_start.map(bool_to_i64),
348 s.dirty_end.map(bool_to_i64),
349 s.repo_binding_source.clone().unwrap_or_default(),
350 ],
351 )?;
352 self.conn.execute(
353 "INSERT INTO session_repo_binding (
354 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
355 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
356 ON CONFLICT(session_id) DO UPDATE SET
357 start_commit=excluded.start_commit,
358 end_commit=excluded.end_commit,
359 branch=excluded.branch,
360 dirty_start=excluded.dirty_start,
361 dirty_end=excluded.dirty_end,
362 repo_binding_source=excluded.repo_binding_source",
363 params![
364 s.id,
365 s.start_commit,
366 s.end_commit,
367 s.branch,
368 s.dirty_start.map(bool_to_i64),
369 s.dirty_end.map(bool_to_i64),
370 s.repo_binding_source.clone().unwrap_or_default(),
371 ],
372 )?;
373 Ok(())
374 }
375
376 pub fn ensure_session_stub(
379 &self,
380 id: &str,
381 agent: &str,
382 workspace: &str,
383 started_at_ms: u64,
384 ) -> Result<()> {
385 self.conn.execute(
386 "INSERT OR IGNORE INTO sessions (
387 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
388 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
389 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
390 params![id, agent, workspace, started_at_ms as i64],
391 )?;
392 Ok(())
393 }
394
395 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
397 let n: i64 = self.conn.query_row(
398 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
399 [session_id],
400 |r| r.get(0),
401 )?;
402 Ok(n as u64)
403 }
404
405 pub fn append_event(&self, e: &Event) -> Result<()> {
406 self.append_event_with_sync(e, None)
407 }
408
409 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
411 let payload = serde_json::to_string(&e.payload)?;
412 self.conn.execute(
413 "INSERT INTO events (
414 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
415 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
416 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
417 ON CONFLICT(session_id, seq) DO UPDATE SET
418 ts_ms = excluded.ts_ms,
419 ts_exact = excluded.ts_exact,
420 kind = excluded.kind,
421 source = excluded.source,
422 tool = excluded.tool,
423 tool_call_id = excluded.tool_call_id,
424 tokens_in = excluded.tokens_in,
425 tokens_out = excluded.tokens_out,
426 reasoning_tokens = excluded.reasoning_tokens,
427 cost_usd_e6 = excluded.cost_usd_e6,
428 payload = excluded.payload",
429 params![
430 e.session_id,
431 e.seq as i64,
432 e.ts_ms as i64,
433 bool_to_i64(e.ts_exact),
434 format!("{:?}", e.kind),
435 format!("{:?}", e.source),
436 e.tool,
437 e.tool_call_id,
438 e.tokens_in.map(|v| v as i64),
439 e.tokens_out.map(|v| v as i64),
440 e.reasoning_tokens.map(|v| v as i64),
441 e.cost_usd_e6,
442 payload,
443 ],
444 )?;
445 if self.conn.changes() == 0 {
446 return Ok(());
447 }
448 index_event_derived(&self.conn, e)?;
449 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
450 let Some(ctx) = ctx else {
451 return Ok(());
452 };
453 let sync = &ctx.sync;
454 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
455 return Ok(());
456 }
457 let Some(salt) = try_team_salt(sync) else {
458 tracing::warn!(
459 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
460 );
461 return Ok(());
462 };
463 if sync.sample_rate < 1.0 {
464 let u: f64 = rand::random();
465 if u > sync.sample_rate {
466 return Ok(());
467 }
468 }
469 let Some(session) = self.get_session(&e.session_id)? else {
470 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
471 return Ok(());
472 };
473 let mut outbound = outbound_event_from_row(e, &session, &salt);
474 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
475 let row = serde_json::to_string(&outbound)?;
476 self.conn.execute(
477 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
478 params![e.session_id, row],
479 )?;
480 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
481 Ok(())
482 }
483
484 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
485 let mut stmt = self.conn.prepare(
486 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
487 )?;
488 let rows = stmt.query_map(params![limit as i64], |row| {
489 Ok((
490 row.get::<_, i64>(0)?,
491 row.get::<_, String>(1)?,
492 row.get::<_, String>(2)?,
493 ))
494 })?;
495 let mut out = Vec::new();
496 for r in rows {
497 out.push(r?);
498 }
499 Ok(out)
500 }
501
502 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
503 for id in ids {
504 self.conn
505 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
506 }
507 Ok(())
508 }
509
510 pub fn replace_outbox_rows(
511 &self,
512 owner_id: &str,
513 kind: &str,
514 payloads: &[String],
515 ) -> Result<()> {
516 self.conn.execute(
517 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
518 params![owner_id, kind],
519 )?;
520 for payload in payloads {
521 self.conn.execute(
522 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
523 params![owner_id, kind, payload],
524 )?;
525 }
526 Ok(())
527 }
528
529 pub fn outbox_pending_count(&self) -> Result<u64> {
530 let c: i64 =
531 self.conn
532 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
533 r.get(0)
534 })?;
535 Ok(c as u64)
536 }
537
538 pub fn set_sync_state_ok(&self) -> Result<()> {
539 let now = now_ms().to_string();
540 self.conn.execute(
541 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
542 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
543 params![now],
544 )?;
545 self.conn.execute(
546 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
547 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
548 [],
549 )?;
550 self.conn
551 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
552 Ok(())
553 }
554
555 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
556 let prev: i64 = self
557 .conn
558 .query_row(
559 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
560 [],
561 |r| {
562 let s: String = r.get(0)?;
563 Ok(s.parse::<i64>().unwrap_or(0))
564 },
565 )
566 .optional()?
567 .unwrap_or(0);
568 let next = prev.saturating_add(1);
569 self.conn.execute(
570 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
571 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
572 params![msg],
573 )?;
574 self.conn.execute(
575 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
576 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
577 params![next.to_string()],
578 )?;
579 Ok(())
580 }
581
582 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
583 let pending_outbox = self.outbox_pending_count()?;
584 let last_success_ms = self
585 .conn
586 .query_row(
587 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
588 [],
589 |r| r.get::<_, String>(0),
590 )
591 .optional()?
592 .and_then(|s| s.parse().ok());
593 let last_error = self
594 .conn
595 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
596 r.get::<_, String>(0)
597 })
598 .optional()?;
599 let consecutive_failures = self
600 .conn
601 .query_row(
602 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
603 [],
604 |r| r.get::<_, String>(0),
605 )
606 .optional()?
607 .and_then(|s| s.parse().ok())
608 .unwrap_or(0);
609 Ok(SyncStatusSnapshot {
610 pending_outbox,
611 last_success_ms,
612 last_error,
613 consecutive_failures,
614 })
615 }
616
617 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
618 let row: Option<String> = self
619 .conn
620 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
621 r.get::<_, String>(0)
622 })
623 .optional()?;
624 Ok(row.and_then(|s| s.parse().ok()))
625 }
626
627 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
628 self.conn.execute(
629 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
630 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
631 params![key, v.to_string()],
632 )?;
633 Ok(())
634 }
635
636 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
638 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
639 let sessions_to_remove: i64 = tx.query_row(
640 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
641 params![cutoff_ms],
642 |r| r.get(0),
643 )?;
644 let events_to_remove: i64 = tx.query_row(
645 "SELECT COUNT(*) FROM events WHERE session_id IN \
646 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
647 params![cutoff_ms],
648 |r| r.get(0),
649 )?;
650
651 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
652 tx.execute(
653 &format!(
654 "DELETE FROM tool_span_paths WHERE span_id IN \
655 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
656 ),
657 params![cutoff_ms],
658 )?;
659 tx.execute(
660 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
661 params![cutoff_ms],
662 )?;
663 tx.execute(
664 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
665 params![cutoff_ms],
666 )?;
667 tx.execute(
668 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
669 params![cutoff_ms],
670 )?;
671 tx.execute(
672 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
673 params![cutoff_ms],
674 )?;
675 tx.execute(
676 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
677 params![cutoff_ms],
678 )?;
679 tx.execute(
680 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
681 params![cutoff_ms],
682 )?;
683 tx.execute(
684 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
685 params![cutoff_ms],
686 )?;
687 tx.execute(
688 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
689 params![cutoff_ms],
690 )?;
691 tx.execute(
692 "DELETE FROM sessions WHERE started_at_ms < ?1",
693 params![cutoff_ms],
694 )?;
695 tx.commit()?;
696 Ok(PruneStats {
697 sessions_removed: sessions_to_remove as u64,
698 events_removed: events_to_remove as u64,
699 })
700 }
701
702 pub fn vacuum(&self) -> Result<()> {
704 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
705 Ok(())
706 }
707
708 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
709 let mut stmt = self.conn.prepare(
710 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
711 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
712 FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
713 )?;
714 let rows = stmt.query_map(params![workspace], |row| {
715 Ok((
716 row.get::<_, String>(0)?,
717 row.get::<_, String>(1)?,
718 row.get::<_, Option<String>>(2)?,
719 row.get::<_, String>(3)?,
720 row.get::<_, i64>(4)?,
721 row.get::<_, Option<i64>>(5)?,
722 row.get::<_, String>(6)?,
723 row.get::<_, String>(7)?,
724 row.get::<_, Option<String>>(8)?,
725 row.get::<_, Option<String>>(9)?,
726 row.get::<_, Option<String>>(10)?,
727 row.get::<_, Option<i64>>(11)?,
728 row.get::<_, Option<i64>>(12)?,
729 row.get::<_, String>(13)?,
730 ))
731 })?;
732
733 let mut out = Vec::new();
734 for row in rows {
735 let (
736 id,
737 agent,
738 model,
739 workspace,
740 started,
741 ended,
742 status_str,
743 trace,
744 start_commit,
745 end_commit,
746 branch,
747 dirty_start,
748 dirty_end,
749 source,
750 ) = row?;
751 out.push(SessionRecord {
752 id,
753 agent,
754 model,
755 workspace,
756 started_at_ms: started as u64,
757 ended_at_ms: ended.map(|v| v as u64),
758 status: status_from_str(&status_str),
759 trace_path: trace,
760 start_commit,
761 end_commit,
762 branch,
763 dirty_start: dirty_start.map(i64_to_bool),
764 dirty_end: dirty_end.map(i64_to_bool),
765 repo_binding_source: empty_to_none(source),
766 });
767 }
768 Ok(out)
769 }
770
771 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
772 let session_count: i64 = self.conn.query_row(
773 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
774 params![workspace],
775 |r| r.get(0),
776 )?;
777
778 let total_cost: i64 = self.conn.query_row(
779 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
780 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
781 params![workspace],
782 |r| r.get(0),
783 )?;
784
785 let mut stmt = self.conn.prepare(
786 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
787 )?;
788 let by_agent: Vec<(String, u64)> = stmt
789 .query_map(params![workspace], |r| {
790 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
791 })?
792 .filter_map(|r| r.ok())
793 .collect();
794
795 let mut stmt = self.conn.prepare(
796 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
797 )?;
798 let by_model: Vec<(String, u64)> = stmt
799 .query_map(params![workspace], |r| {
800 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
801 })?
802 .filter_map(|r| r.ok())
803 .collect();
804
805 let mut stmt = self.conn.prepare(
806 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
807 WHERE s.workspace = ?1 AND tool IS NOT NULL
808 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
809 )?;
810 let top_tools: Vec<(String, u64)> = stmt
811 .query_map(params![workspace], |r| {
812 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
813 })?
814 .filter_map(|r| r.ok())
815 .collect();
816
817 Ok(SummaryStats {
818 session_count: session_count as u64,
819 total_cost_usd_e6: total_cost,
820 by_agent,
821 by_model,
822 top_tools,
823 })
824 }
825
826 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
827 let mut stmt = self.conn.prepare(
828 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
829 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
830 FROM events WHERE session_id = ?1 ORDER BY seq ASC",
831 )?;
832 let rows = stmt.query_map(params![session_id], |row| {
833 Ok((
834 row.get::<_, String>(0)?,
835 row.get::<_, i64>(1)?,
836 row.get::<_, i64>(2)?,
837 row.get::<_, i64>(3)?,
838 row.get::<_, String>(4)?,
839 row.get::<_, String>(5)?,
840 row.get::<_, Option<String>>(6)?,
841 row.get::<_, Option<String>>(7)?,
842 row.get::<_, Option<i64>>(8)?,
843 row.get::<_, Option<i64>>(9)?,
844 row.get::<_, Option<i64>>(10)?,
845 row.get::<_, Option<i64>>(11)?,
846 row.get::<_, String>(12)?,
847 ))
848 })?;
849
850 let mut events = Vec::new();
851 for row in rows {
852 let (
853 sid,
854 seq,
855 ts_ms,
856 ts_exact,
857 kind_str,
858 source_str,
859 tool,
860 tool_call_id,
861 tokens_in,
862 tokens_out,
863 reasoning_tokens,
864 cost_usd_e6,
865 payload_str,
866 ) = row?;
867 events.push(Event {
868 session_id: sid,
869 seq: seq as u64,
870 ts_ms: ts_ms as u64,
871 ts_exact: ts_exact != 0,
872 kind: kind_from_str(&kind_str),
873 source: source_from_str(&source_str),
874 tool,
875 tool_call_id,
876 tokens_in: tokens_in.map(|v| v as u32),
877 tokens_out: tokens_out.map(|v| v as u32),
878 reasoning_tokens: reasoning_tokens.map(|v| v as u32),
879 cost_usd_e6,
880 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
881 });
882 }
883 Ok(events)
884 }
885
886 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
888 self.conn.execute(
889 "UPDATE sessions SET status = ?1 WHERE id = ?2",
890 params![format!("{:?}", status), id],
891 )?;
892 Ok(())
893 }
894
895 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
897 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
898 Ok(InsightsStats {
899 total_sessions: count_q(
900 &self.conn,
901 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
902 workspace,
903 )?,
904 running_sessions: count_q(
905 &self.conn,
906 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
907 workspace,
908 )?,
909 total_events: count_q(
910 &self.conn,
911 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
912 workspace,
913 )?,
914 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
915 recent: recent_sessions_3(&self.conn, workspace)?,
916 top_tools: top_tools_5(&self.conn, workspace)?,
917 total_cost_usd_e6,
918 sessions_with_cost,
919 })
920 }
921
922 pub fn retro_events_in_window(
924 &self,
925 workspace: &str,
926 start_ms: u64,
927 end_ms: u64,
928 ) -> Result<Vec<(SessionRecord, Event)>> {
929 let mut stmt = self.conn.prepare(
930 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
931 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
932 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
933 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
934 FROM events e
935 JOIN sessions s ON s.id = e.session_id
936 WHERE s.workspace = ?1
937 AND (
938 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
939 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
940 )
941 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
942 )?;
943 let rows = stmt.query_map(
944 params![
945 workspace,
946 start_ms as i64,
947 end_ms as i64,
948 SYNTHETIC_TS_CEILING_MS,
949 ],
950 |row| {
951 let payload_str: String = row.get(12)?;
952 let status_str: String = row.get(19)?;
953 Ok((
954 SessionRecord {
955 id: row.get(13)?,
956 agent: row.get(14)?,
957 model: row.get(15)?,
958 workspace: row.get(16)?,
959 started_at_ms: row.get::<_, i64>(17)? as u64,
960 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
961 status: status_from_str(&status_str),
962 trace_path: row.get(20)?,
963 start_commit: row.get(21)?,
964 end_commit: row.get(22)?,
965 branch: row.get(23)?,
966 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
967 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
968 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
969 },
970 Event {
971 session_id: row.get(0)?,
972 seq: row.get::<_, i64>(1)? as u64,
973 ts_ms: row.get::<_, i64>(2)? as u64,
974 ts_exact: row.get::<_, i64>(3)? != 0,
975 kind: kind_from_str(&row.get::<_, String>(4)?),
976 source: source_from_str(&row.get::<_, String>(5)?),
977 tool: row.get(6)?,
978 tool_call_id: row.get(7)?,
979 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
980 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
981 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
982 cost_usd_e6: row.get(11)?,
983 payload: serde_json::from_str(&payload_str)
984 .unwrap_or(serde_json::Value::Null),
985 },
986 ))
987 },
988 )?;
989
990 let mut out = Vec::new();
991 for r in rows {
992 out.push(r?);
993 }
994 Ok(out)
995 }
996
997 pub fn files_touched_in_window(
999 &self,
1000 workspace: &str,
1001 start_ms: u64,
1002 end_ms: u64,
1003 ) -> Result<Vec<(String, String)>> {
1004 let mut stmt = self.conn.prepare(
1005 "SELECT DISTINCT ft.session_id, ft.path
1006 FROM files_touched ft
1007 JOIN sessions s ON s.id = ft.session_id
1008 WHERE s.workspace = ?1
1009 AND EXISTS (
1010 SELECT 1 FROM events e
1011 JOIN sessions ss ON ss.id = e.session_id
1012 WHERE e.session_id = ft.session_id
1013 AND (
1014 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1015 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1016 )
1017 )
1018 ORDER BY ft.session_id, ft.path",
1019 )?;
1020 let out: Vec<(String, String)> = stmt
1021 .query_map(
1022 params![
1023 workspace,
1024 start_ms as i64,
1025 end_ms as i64,
1026 SYNTHETIC_TS_CEILING_MS,
1027 ],
1028 |r| Ok((r.get(0)?, r.get(1)?)),
1029 )?
1030 .filter_map(|r| r.ok())
1031 .collect();
1032 Ok(out)
1033 }
1034
1035 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1038 let mut stmt = self.conn.prepare(
1039 "SELECT DISTINCT su.skill
1040 FROM skills_used su
1041 JOIN sessions s ON s.id = su.session_id
1042 WHERE s.workspace = ?1
1043 AND EXISTS (
1044 SELECT 1 FROM events e
1045 JOIN sessions ss ON ss.id = e.session_id
1046 WHERE e.session_id = su.session_id
1047 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1048 )
1049 ORDER BY su.skill",
1050 )?;
1051 let out: Vec<String> = stmt
1052 .query_map(
1053 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1054 |r| r.get::<_, String>(0),
1055 )?
1056 .filter_map(|r| r.ok())
1057 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1058 .collect();
1059 Ok(out)
1060 }
1061
1062 pub fn skills_used_in_window(
1064 &self,
1065 workspace: &str,
1066 start_ms: u64,
1067 end_ms: u64,
1068 ) -> Result<Vec<(String, String)>> {
1069 let mut stmt = self.conn.prepare(
1070 "SELECT DISTINCT su.session_id, su.skill
1071 FROM skills_used su
1072 JOIN sessions s ON s.id = su.session_id
1073 WHERE s.workspace = ?1
1074 AND EXISTS (
1075 SELECT 1 FROM events e
1076 JOIN sessions ss ON ss.id = e.session_id
1077 WHERE e.session_id = su.session_id
1078 AND (
1079 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1080 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1081 )
1082 )
1083 ORDER BY su.session_id, su.skill",
1084 )?;
1085 let out: Vec<(String, String)> = stmt
1086 .query_map(
1087 params![
1088 workspace,
1089 start_ms as i64,
1090 end_ms as i64,
1091 SYNTHETIC_TS_CEILING_MS,
1092 ],
1093 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1094 )?
1095 .filter_map(|r| r.ok())
1096 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1097 .collect();
1098 Ok(out)
1099 }
1100
1101 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1103 let mut stmt = self.conn.prepare(
1104 "SELECT DISTINCT ru.rule
1105 FROM rules_used ru
1106 JOIN sessions s ON s.id = ru.session_id
1107 WHERE s.workspace = ?1
1108 AND EXISTS (
1109 SELECT 1 FROM events e
1110 JOIN sessions ss ON ss.id = e.session_id
1111 WHERE e.session_id = ru.session_id
1112 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1113 )
1114 ORDER BY ru.rule",
1115 )?;
1116 let out: Vec<String> = stmt
1117 .query_map(
1118 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1119 |r| r.get::<_, String>(0),
1120 )?
1121 .filter_map(|r| r.ok())
1122 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1123 .collect();
1124 Ok(out)
1125 }
1126
1127 pub fn rules_used_in_window(
1129 &self,
1130 workspace: &str,
1131 start_ms: u64,
1132 end_ms: u64,
1133 ) -> Result<Vec<(String, String)>> {
1134 let mut stmt = self.conn.prepare(
1135 "SELECT DISTINCT ru.session_id, ru.rule
1136 FROM rules_used ru
1137 JOIN sessions s ON s.id = ru.session_id
1138 WHERE s.workspace = ?1
1139 AND EXISTS (
1140 SELECT 1 FROM events e
1141 JOIN sessions ss ON ss.id = e.session_id
1142 WHERE e.session_id = ru.session_id
1143 AND (
1144 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1145 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1146 )
1147 )
1148 ORDER BY ru.session_id, ru.rule",
1149 )?;
1150 let out: Vec<(String, String)> = stmt
1151 .query_map(
1152 params![
1153 workspace,
1154 start_ms as i64,
1155 end_ms as i64,
1156 SYNTHETIC_TS_CEILING_MS,
1157 ],
1158 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1159 )?
1160 .filter_map(|r| r.ok())
1161 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1162 .collect();
1163 Ok(out)
1164 }
1165
1166 pub fn sessions_active_in_window(
1168 &self,
1169 workspace: &str,
1170 start_ms: u64,
1171 end_ms: u64,
1172 ) -> Result<HashSet<String>> {
1173 let mut stmt = self.conn.prepare(
1174 "SELECT DISTINCT s.id
1175 FROM sessions s
1176 WHERE s.workspace = ?1
1177 AND EXISTS (
1178 SELECT 1 FROM events e
1179 WHERE e.session_id = s.id
1180 AND (
1181 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1182 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1183 )
1184 )",
1185 )?;
1186 let out: HashSet<String> = stmt
1187 .query_map(
1188 params![
1189 workspace,
1190 start_ms as i64,
1191 end_ms as i64,
1192 SYNTHETIC_TS_CEILING_MS,
1193 ],
1194 |r| r.get(0),
1195 )?
1196 .filter_map(|r| r.ok())
1197 .collect();
1198 Ok(out)
1199 }
1200
1201 pub fn session_costs_usd_e6_in_window(
1203 &self,
1204 workspace: &str,
1205 start_ms: u64,
1206 end_ms: u64,
1207 ) -> Result<HashMap<String, i64>> {
1208 let mut stmt = self.conn.prepare(
1209 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1210 FROM events e
1211 JOIN sessions s ON s.id = e.session_id
1212 WHERE s.workspace = ?1
1213 AND (
1214 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1215 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1216 )
1217 GROUP BY e.session_id",
1218 )?;
1219 let rows: Vec<(String, i64)> = stmt
1220 .query_map(
1221 params![
1222 workspace,
1223 start_ms as i64,
1224 end_ms as i64,
1225 SYNTHETIC_TS_CEILING_MS,
1226 ],
1227 |r| Ok((r.get(0)?, r.get(1)?)),
1228 )?
1229 .filter_map(|r| r.ok())
1230 .collect();
1231 Ok(rows.into_iter().collect())
1232 }
1233
1234 pub fn guidance_report(
1236 &self,
1237 workspace: &str,
1238 window_start_ms: u64,
1239 window_end_ms: u64,
1240 skill_slugs_on_disk: &HashSet<String>,
1241 rule_slugs_on_disk: &HashSet<String>,
1242 ) -> Result<GuidanceReport> {
1243 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1244 let denom = active.len() as u64;
1245 let costs =
1246 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1247
1248 let workspace_avg_cost_per_session_usd = if denom > 0 {
1249 let total_e6: i64 = active
1250 .iter()
1251 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1252 .sum();
1253 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1254 } else {
1255 None
1256 };
1257
1258 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1259 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1260 skill_sessions.entry(skill).or_default().insert(sid);
1261 }
1262 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1263 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1264 rule_sessions.entry(rule).or_default().insert(sid);
1265 }
1266
1267 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1268
1269 let mut push_row =
1270 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1271 let sessions = sids.len() as u64;
1272 let sessions_pct = if denom > 0 {
1273 sessions as f64 * 100.0 / denom as f64
1274 } else {
1275 0.0
1276 };
1277 let total_cost_usd_e6: i64 = sids
1278 .iter()
1279 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1280 .sum();
1281 let avg_cost_per_session_usd = if sessions > 0 {
1282 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1283 } else {
1284 None
1285 };
1286 let vs_workspace_avg_cost_per_session_usd =
1287 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1288 (Some(avg), Some(w)) => Some(avg - w),
1289 _ => None,
1290 };
1291 rows.push(GuidancePerfRow {
1292 kind,
1293 id,
1294 sessions,
1295 sessions_pct,
1296 total_cost_usd_e6,
1297 avg_cost_per_session_usd,
1298 vs_workspace_avg_cost_per_session_usd,
1299 on_disk,
1300 });
1301 };
1302
1303 let mut seen_skills: HashSet<String> = HashSet::new();
1304 for (id, sids) in &skill_sessions {
1305 seen_skills.insert(id.clone());
1306 push_row(
1307 GuidanceKind::Skill,
1308 id.clone(),
1309 sids,
1310 skill_slugs_on_disk.contains(id),
1311 );
1312 }
1313 for slug in skill_slugs_on_disk {
1314 if seen_skills.contains(slug) {
1315 continue;
1316 }
1317 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1318 }
1319
1320 let mut seen_rules: HashSet<String> = HashSet::new();
1321 for (id, sids) in &rule_sessions {
1322 seen_rules.insert(id.clone());
1323 push_row(
1324 GuidanceKind::Rule,
1325 id.clone(),
1326 sids,
1327 rule_slugs_on_disk.contains(id),
1328 );
1329 }
1330 for slug in rule_slugs_on_disk {
1331 if seen_rules.contains(slug) {
1332 continue;
1333 }
1334 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1335 }
1336
1337 rows.sort_by(|a, b| {
1338 b.sessions
1339 .cmp(&a.sessions)
1340 .then_with(|| a.kind.cmp(&b.kind))
1341 .then_with(|| a.id.cmp(&b.id))
1342 });
1343
1344 Ok(GuidanceReport {
1345 workspace: workspace.to_string(),
1346 window_start_ms,
1347 window_end_ms,
1348 sessions_in_window: denom,
1349 workspace_avg_cost_per_session_usd,
1350 rows,
1351 })
1352 }
1353
1354 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1355 let mut stmt = self.conn.prepare(
1356 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1357 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
1358 FROM sessions WHERE id = ?1",
1359 )?;
1360 let mut rows = stmt.query_map(params![id], |row| {
1361 Ok((
1362 row.get::<_, String>(0)?,
1363 row.get::<_, String>(1)?,
1364 row.get::<_, Option<String>>(2)?,
1365 row.get::<_, String>(3)?,
1366 row.get::<_, i64>(4)?,
1367 row.get::<_, Option<i64>>(5)?,
1368 row.get::<_, String>(6)?,
1369 row.get::<_, String>(7)?,
1370 row.get::<_, Option<String>>(8)?,
1371 row.get::<_, Option<String>>(9)?,
1372 row.get::<_, Option<String>>(10)?,
1373 row.get::<_, Option<i64>>(11)?,
1374 row.get::<_, Option<i64>>(12)?,
1375 row.get::<_, String>(13)?,
1376 ))
1377 })?;
1378
1379 if let Some(row) = rows.next() {
1380 let (
1381 id,
1382 agent,
1383 model,
1384 workspace,
1385 started,
1386 ended,
1387 status_str,
1388 trace,
1389 start_commit,
1390 end_commit,
1391 branch,
1392 dirty_start,
1393 dirty_end,
1394 source,
1395 ) = row?;
1396 Ok(Some(SessionRecord {
1397 id,
1398 agent,
1399 model,
1400 workspace,
1401 started_at_ms: started as u64,
1402 ended_at_ms: ended.map(|v| v as u64),
1403 status: status_from_str(&status_str),
1404 trace_path: trace,
1405 start_commit,
1406 end_commit,
1407 branch,
1408 dirty_start: dirty_start.map(i64_to_bool),
1409 dirty_end: dirty_end.map(i64_to_bool),
1410 repo_binding_source: empty_to_none(source),
1411 }))
1412 } else {
1413 Ok(None)
1414 }
1415 }
1416
1417 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1418 let mut stmt = self.conn.prepare(
1419 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1420 indexed_at_ms, dirty, graph_path
1421 FROM repo_snapshots WHERE workspace = ?1
1422 ORDER BY indexed_at_ms DESC LIMIT 1",
1423 )?;
1424 let mut rows = stmt.query_map(params![workspace], |row| {
1425 Ok(RepoSnapshotRecord {
1426 id: row.get(0)?,
1427 workspace: row.get(1)?,
1428 head_commit: row.get(2)?,
1429 dirty_fingerprint: row.get(3)?,
1430 analyzer_version: row.get(4)?,
1431 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1432 dirty: row.get::<_, i64>(6)? != 0,
1433 graph_path: row.get(7)?,
1434 })
1435 })?;
1436 Ok(rows.next().transpose()?)
1437 }
1438
1439 pub fn save_repo_snapshot(
1440 &self,
1441 snapshot: &RepoSnapshotRecord,
1442 facts: &[FileFact],
1443 edges: &[RepoEdge],
1444 ) -> Result<()> {
1445 self.conn.execute(
1446 "INSERT INTO repo_snapshots (
1447 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1448 indexed_at_ms, dirty, graph_path
1449 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1450 ON CONFLICT(id) DO UPDATE SET
1451 workspace=excluded.workspace,
1452 head_commit=excluded.head_commit,
1453 dirty_fingerprint=excluded.dirty_fingerprint,
1454 analyzer_version=excluded.analyzer_version,
1455 indexed_at_ms=excluded.indexed_at_ms,
1456 dirty=excluded.dirty,
1457 graph_path=excluded.graph_path",
1458 params![
1459 snapshot.id,
1460 snapshot.workspace,
1461 snapshot.head_commit,
1462 snapshot.dirty_fingerprint,
1463 snapshot.analyzer_version,
1464 snapshot.indexed_at_ms as i64,
1465 bool_to_i64(snapshot.dirty),
1466 snapshot.graph_path,
1467 ],
1468 )?;
1469 self.conn.execute(
1470 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1471 params![snapshot.id],
1472 )?;
1473 self.conn.execute(
1474 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1475 params![snapshot.id],
1476 )?;
1477 for fact in facts {
1478 self.conn.execute(
1479 "INSERT INTO file_facts (
1480 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1481 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1482 churn_30d, churn_90d, authors_90d, last_changed_ms
1483 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1484 params![
1485 fact.snapshot_id,
1486 fact.path,
1487 fact.language,
1488 fact.bytes as i64,
1489 fact.loc as i64,
1490 fact.sloc as i64,
1491 fact.complexity_total as i64,
1492 fact.max_fn_complexity as i64,
1493 fact.symbol_count as i64,
1494 fact.import_count as i64,
1495 fact.fan_in as i64,
1496 fact.fan_out as i64,
1497 fact.churn_30d as i64,
1498 fact.churn_90d as i64,
1499 fact.authors_90d as i64,
1500 fact.last_changed_ms.map(|v| v as i64),
1501 ],
1502 )?;
1503 }
1504 for edge in edges {
1505 self.conn.execute(
1506 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1507 VALUES (?1, ?2, ?3, ?4, ?5)
1508 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1509 DO UPDATE SET weight = weight + excluded.weight",
1510 params![
1511 snapshot.id,
1512 edge.from_path,
1513 edge.to_path,
1514 edge.kind,
1515 edge.weight as i64,
1516 ],
1517 )?;
1518 }
1519 Ok(())
1520 }
1521
1522 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1523 let mut stmt = self.conn.prepare(
1524 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1525 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1526 churn_30d, churn_90d, authors_90d, last_changed_ms
1527 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1528 )?;
1529 let rows = stmt.query_map(params![snapshot_id], |row| {
1530 Ok(FileFact {
1531 snapshot_id: row.get(0)?,
1532 path: row.get(1)?,
1533 language: row.get(2)?,
1534 bytes: row.get::<_, i64>(3)? as u64,
1535 loc: row.get::<_, i64>(4)? as u32,
1536 sloc: row.get::<_, i64>(5)? as u32,
1537 complexity_total: row.get::<_, i64>(6)? as u32,
1538 max_fn_complexity: row.get::<_, i64>(7)? as u32,
1539 symbol_count: row.get::<_, i64>(8)? as u32,
1540 import_count: row.get::<_, i64>(9)? as u32,
1541 fan_in: row.get::<_, i64>(10)? as u32,
1542 fan_out: row.get::<_, i64>(11)? as u32,
1543 churn_30d: row.get::<_, i64>(12)? as u32,
1544 churn_90d: row.get::<_, i64>(13)? as u32,
1545 authors_90d: row.get::<_, i64>(14)? as u32,
1546 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1547 })
1548 })?;
1549 Ok(rows.filter_map(|row| row.ok()).collect())
1550 }
1551
1552 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1553 let mut stmt = self.conn.prepare(
1554 "SELECT from_id, to_id, kind, weight
1555 FROM repo_edges WHERE snapshot_id = ?1
1556 ORDER BY kind, from_id, to_id",
1557 )?;
1558 let rows = stmt.query_map(params![snapshot_id], |row| {
1559 Ok(RepoEdge {
1560 from_path: row.get(0)?,
1561 to_path: row.get(1)?,
1562 kind: row.get(2)?,
1563 weight: row.get::<_, i64>(3)? as u32,
1564 })
1565 })?;
1566 Ok(rows.filter_map(|row| row.ok()).collect())
1567 }
1568
1569 pub fn tool_spans_in_window(
1570 &self,
1571 workspace: &str,
1572 start_ms: u64,
1573 end_ms: u64,
1574 ) -> Result<Vec<ToolSpanView>> {
1575 let mut stmt = self.conn.prepare(
1576 "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1577 ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1578 FROM tool_spans ts
1579 JOIN sessions s ON s.id = ts.session_id
1580 WHERE s.workspace = ?1
1581 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1582 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1583 ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1584 )?;
1585 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1586 let paths_json: String = row.get(7)?;
1587 Ok(ToolSpanView {
1588 tool: row
1589 .get::<_, Option<String>>(0)?
1590 .unwrap_or_else(|| "unknown".into()),
1591 status: row.get(1)?,
1592 lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1593 tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1594 tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1595 reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1596 cost_usd_e6: row.get(6)?,
1597 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1598 })
1599 })?;
1600 Ok(rows.filter_map(|row| row.ok()).collect())
1601 }
1602
1603 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1604 let mut stmt = self.conn.prepare(
1605 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1606 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1607 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1608 )?;
1609 let rows = stmt.query_map(params![session_id], |row| {
1610 let paths_json: String = row.get(12)?;
1611 Ok(ToolSpanSyncRow {
1612 span_id: row.get(0)?,
1613 session_id: row.get(1)?,
1614 tool: row.get(2)?,
1615 tool_call_id: row.get(3)?,
1616 status: row.get(4)?,
1617 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1618 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1619 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1620 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1621 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1622 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1623 cost_usd_e6: row.get(11)?,
1624 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1625 })
1626 })?;
1627 Ok(rows.filter_map(|row| row.ok()).collect())
1628 }
1629}
1630
1631fn now_ms() -> u64 {
1632 std::time::SystemTime::now()
1633 .duration_since(std::time::UNIX_EPOCH)
1634 .unwrap_or_default()
1635 .as_millis() as u64
1636}
1637
1638fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1639 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1640}
1641
1642fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1643 let cost: i64 = conn.query_row(
1644 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1645 params![workspace], |r| r.get(0),
1646 )?;
1647 let with_cost: i64 = conn.query_row(
1648 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1649 params![workspace], |r| r.get(0),
1650 )?;
1651 Ok((cost, with_cost as u64))
1652}
1653
1654fn day_label(day_idx: u64) -> &'static str {
1655 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
1656}
1657
1658fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
1659 let week_ago = now.saturating_sub(7 * 86_400_000);
1660 let mut stmt = conn
1661 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
1662 let days: Vec<u64> = stmt
1663 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
1664 .filter_map(|r| r.ok())
1665 .map(|v| v as u64 / 86_400_000)
1666 .collect();
1667 let today = now / 86_400_000;
1668 Ok((0u64..7)
1669 .map(|i| {
1670 let d = today.saturating_sub(6 - i);
1671 (
1672 day_label(d).to_string(),
1673 days.iter().filter(|&&x| x == d).count() as u64,
1674 )
1675 })
1676 .collect())
1677}
1678
1679fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
1680 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
1681 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
1682 s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
1683 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
1684 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
1685 let mut stmt = conn.prepare(sql)?;
1686 let out: Vec<(SessionRecord, u64)> = stmt
1687 .query_map(params![workspace], |r| {
1688 let st: String = r.get(6)?;
1689 Ok((
1690 SessionRecord {
1691 id: r.get(0)?,
1692 agent: r.get(1)?,
1693 model: r.get(2)?,
1694 workspace: r.get(3)?,
1695 started_at_ms: r.get::<_, i64>(4)? as u64,
1696 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1697 status: status_from_str(&st),
1698 trace_path: r.get(7)?,
1699 start_commit: r.get(8)?,
1700 end_commit: r.get(9)?,
1701 branch: r.get(10)?,
1702 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
1703 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
1704 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
1705 },
1706 r.get::<_, i64>(14)? as u64,
1707 ))
1708 })?
1709 .filter_map(|r| r.ok())
1710 .collect();
1711 Ok(out)
1712}
1713
1714fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
1715 let mut stmt = conn.prepare(
1716 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
1717 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
1718 )?;
1719 let out: Vec<(String, u64)> = stmt
1720 .query_map(params![workspace], |r| {
1721 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
1722 })?
1723 .filter_map(|r| r.ok())
1724 .collect();
1725 Ok(out)
1726}
1727
1728fn status_from_str(s: &str) -> SessionStatus {
1729 match s {
1730 "Running" => SessionStatus::Running,
1731 "Waiting" => SessionStatus::Waiting,
1732 "Idle" => SessionStatus::Idle,
1733 _ => SessionStatus::Done,
1734 }
1735}
1736
1737fn kind_from_str(s: &str) -> EventKind {
1738 match s {
1739 "ToolCall" => EventKind::ToolCall,
1740 "ToolResult" => EventKind::ToolResult,
1741 "Message" => EventKind::Message,
1742 "Error" => EventKind::Error,
1743 "Cost" => EventKind::Cost,
1744 _ => EventKind::Hook,
1745 }
1746}
1747
1748fn source_from_str(s: &str) -> EventSource {
1749 match s {
1750 "Tail" => EventSource::Tail,
1751 "Hook" => EventSource::Hook,
1752 _ => EventSource::Proxy,
1753 }
1754}
1755
1756fn ensure_schema_columns(conn: &Connection) -> Result<()> {
1757 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
1758 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
1759 ensure_column(conn, "sessions", "branch", "TEXT")?;
1760 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
1761 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
1762 ensure_column(
1763 conn,
1764 "sessions",
1765 "repo_binding_source",
1766 "TEXT NOT NULL DEFAULT ''",
1767 )?;
1768 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
1769 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
1770 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
1771 ensure_column(
1772 conn,
1773 "sync_outbox",
1774 "kind",
1775 "TEXT NOT NULL DEFAULT 'events'",
1776 )?;
1777 ensure_column(
1778 conn,
1779 "experiments",
1780 "state",
1781 "TEXT NOT NULL DEFAULT 'Draft'",
1782 )?;
1783 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
1784 Ok(())
1785}
1786
1787fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
1788 if has_column(conn, table, column)? {
1789 return Ok(());
1790 }
1791 conn.execute(
1792 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
1793 [],
1794 )?;
1795 Ok(())
1796}
1797
1798fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
1799 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1800 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1801 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
1802}
1803
1804fn bool_to_i64(v: bool) -> i64 {
1805 if v { 1 } else { 0 }
1806}
1807
1808fn i64_to_bool(v: i64) -> bool {
1809 v != 0
1810}
1811
1812fn empty_to_none(s: String) -> Option<String> {
1813 if s.is_empty() { None } else { Some(s) }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818 use super::*;
1819 use serde_json::json;
1820 use std::collections::HashSet;
1821 use tempfile::TempDir;
1822
1823 fn make_session(id: &str) -> SessionRecord {
1824 SessionRecord {
1825 id: id.to_string(),
1826 agent: "cursor".to_string(),
1827 model: None,
1828 workspace: "/ws".to_string(),
1829 started_at_ms: 1000,
1830 ended_at_ms: None,
1831 status: SessionStatus::Done,
1832 trace_path: "/trace".to_string(),
1833 start_commit: None,
1834 end_commit: None,
1835 branch: None,
1836 dirty_start: None,
1837 dirty_end: None,
1838 repo_binding_source: None,
1839 }
1840 }
1841
1842 fn make_event(session_id: &str, seq: u64) -> Event {
1843 Event {
1844 session_id: session_id.to_string(),
1845 seq,
1846 ts_ms: 1000 + seq * 100,
1847 ts_exact: false,
1848 kind: EventKind::ToolCall,
1849 source: EventSource::Tail,
1850 tool: Some("read_file".to_string()),
1851 tool_call_id: Some(format!("call_{seq}")),
1852 tokens_in: None,
1853 tokens_out: None,
1854 reasoning_tokens: None,
1855 cost_usd_e6: None,
1856 payload: json!({}),
1857 }
1858 }
1859
1860 #[test]
1861 fn open_and_wal_mode() {
1862 let dir = TempDir::new().unwrap();
1863 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1864 let mode: String = store
1865 .conn
1866 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
1867 .unwrap();
1868 assert_eq!(mode, "wal");
1869 }
1870
1871 #[test]
1872 fn upsert_and_get_session() {
1873 let dir = TempDir::new().unwrap();
1874 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1875 let s = make_session("s1");
1876 store.upsert_session(&s).unwrap();
1877
1878 let got = store.get_session("s1").unwrap().unwrap();
1879 assert_eq!(got.id, "s1");
1880 assert_eq!(got.status, SessionStatus::Done);
1881 }
1882
1883 #[test]
1884 fn append_and_list_events_round_trip() {
1885 let dir = TempDir::new().unwrap();
1886 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1887 let s = make_session("s2");
1888 store.upsert_session(&s).unwrap();
1889 store.append_event(&make_event("s2", 0)).unwrap();
1890 store.append_event(&make_event("s2", 1)).unwrap();
1891
1892 let sessions = store.list_sessions("/ws").unwrap();
1893 assert_eq!(sessions.len(), 1);
1894 assert_eq!(sessions[0].id, "s2");
1895 }
1896
1897 #[test]
1898 fn summary_stats_empty() {
1899 let dir = TempDir::new().unwrap();
1900 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1901 let stats = store.summary_stats("/ws").unwrap();
1902 assert_eq!(stats.session_count, 0);
1903 assert_eq!(stats.total_cost_usd_e6, 0);
1904 }
1905
1906 #[test]
1907 fn summary_stats_counts_sessions() {
1908 let dir = TempDir::new().unwrap();
1909 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1910 store.upsert_session(&make_session("a")).unwrap();
1911 store.upsert_session(&make_session("b")).unwrap();
1912 let stats = store.summary_stats("/ws").unwrap();
1913 assert_eq!(stats.session_count, 2);
1914 assert_eq!(stats.by_agent.len(), 1);
1915 assert_eq!(stats.by_agent[0].0, "cursor");
1916 assert_eq!(stats.by_agent[0].1, 2);
1917 }
1918
1919 #[test]
1920 fn list_events_for_session_round_trip() {
1921 let dir = TempDir::new().unwrap();
1922 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1923 store.upsert_session(&make_session("s4")).unwrap();
1924 store.append_event(&make_event("s4", 0)).unwrap();
1925 store.append_event(&make_event("s4", 1)).unwrap();
1926 let events = store.list_events_for_session("s4").unwrap();
1927 assert_eq!(events.len(), 2);
1928 assert_eq!(events[0].seq, 0);
1929 assert_eq!(events[1].seq, 1);
1930 }
1931
1932 #[test]
1933 fn append_event_dedup() {
1934 let dir = TempDir::new().unwrap();
1935 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1936 store.upsert_session(&make_session("s5")).unwrap();
1937 store.append_event(&make_event("s5", 0)).unwrap();
1938 store.append_event(&make_event("s5", 0)).unwrap();
1940 let events = store.list_events_for_session("s5").unwrap();
1941 assert_eq!(events.len(), 1);
1942 }
1943
1944 #[test]
1945 fn upsert_idempotent() {
1946 let dir = TempDir::new().unwrap();
1947 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1948 let mut s = make_session("s3");
1949 store.upsert_session(&s).unwrap();
1950 s.status = SessionStatus::Running;
1951 store.upsert_session(&s).unwrap();
1952
1953 let got = store.get_session("s3").unwrap().unwrap();
1954 assert_eq!(got.status, SessionStatus::Running);
1955 }
1956
1957 #[test]
1958 fn append_event_indexes_path_from_payload() {
1959 let dir = TempDir::new().unwrap();
1960 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1961 store.upsert_session(&make_session("sx")).unwrap();
1962 let mut ev = make_event("sx", 0);
1963 ev.payload = json!({"input": {"path": "src/lib.rs"}});
1964 store.append_event(&ev).unwrap();
1965 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
1966 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
1967 }
1968
1969 #[test]
1970 fn update_session_status_changes_status() {
1971 let dir = TempDir::new().unwrap();
1972 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1973 store.upsert_session(&make_session("s6")).unwrap();
1974 store
1975 .update_session_status("s6", SessionStatus::Running)
1976 .unwrap();
1977 let got = store.get_session("s6").unwrap().unwrap();
1978 assert_eq!(got.status, SessionStatus::Running);
1979 }
1980
1981 #[test]
1982 fn prune_sessions_removes_old_rows_and_keeps_recent() {
1983 let dir = TempDir::new().unwrap();
1984 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
1985 let mut old = make_session("old");
1986 old.started_at_ms = 1_000;
1987 let mut new = make_session("new");
1988 new.started_at_ms = 9_000_000_000_000;
1989 store.upsert_session(&old).unwrap();
1990 store.upsert_session(&new).unwrap();
1991 store.append_event(&make_event("old", 0)).unwrap();
1992
1993 let stats = store.prune_sessions_started_before(5_000).unwrap();
1994 assert_eq!(
1995 stats,
1996 PruneStats {
1997 sessions_removed: 1,
1998 events_removed: 1,
1999 }
2000 );
2001 assert!(store.get_session("old").unwrap().is_none());
2002 assert!(store.get_session("new").unwrap().is_some());
2003 let sessions = store.list_sessions("/ws").unwrap();
2004 assert_eq!(sessions.len(), 1);
2005 assert_eq!(sessions[0].id, "new");
2006 }
2007
2008 #[test]
2009 fn append_event_indexes_rules_from_payload() {
2010 let dir = TempDir::new().unwrap();
2011 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2012 store.upsert_session(&make_session("sr")).unwrap();
2013 let mut ev = make_event("sr", 0);
2014 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2015 store.append_event(&ev).unwrap();
2016 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2017 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2018 }
2019
2020 #[test]
2021 fn guidance_report_counts_skill_and_rule_sessions() {
2022 let dir = TempDir::new().unwrap();
2023 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2024 store.upsert_session(&make_session("sx")).unwrap();
2025 let mut ev = make_event("sx", 0);
2026 ev.payload =
2027 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2028 ev.cost_usd_e6 = Some(500_000);
2029 store.append_event(&ev).unwrap();
2030
2031 let mut skill_slugs = HashSet::new();
2032 skill_slugs.insert("tdd".into());
2033 let mut rule_slugs = HashSet::new();
2034 rule_slugs.insert("style".into());
2035
2036 let rep = store
2037 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2038 .unwrap();
2039 assert_eq!(rep.sessions_in_window, 1);
2040 let tdd = rep
2041 .rows
2042 .iter()
2043 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2044 .unwrap();
2045 assert_eq!(tdd.sessions, 1);
2046 assert!(tdd.on_disk);
2047 let style = rep
2048 .rows
2049 .iter()
2050 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2051 .unwrap();
2052 assert_eq!(style.sessions, 1);
2053 assert!(style.on_disk);
2054 }
2055
2056 #[test]
2057 fn prune_sessions_removes_rules_used_rows() {
2058 let dir = TempDir::new().unwrap();
2059 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2060 let mut old = make_session("old_r");
2061 old.started_at_ms = 1_000;
2062 store.upsert_session(&old).unwrap();
2063 let mut ev = make_event("old_r", 0);
2064 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2065 store.append_event(&ev).unwrap();
2066
2067 store.prune_sessions_started_before(5_000).unwrap();
2068 let n: i64 = store
2069 .conn
2070 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2071 .unwrap();
2072 assert_eq!(n, 0);
2073 }
2074}