1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23 "CREATE TABLE IF NOT EXISTS sessions (
24 id TEXT PRIMARY KEY,
25 agent TEXT NOT NULL,
26 model TEXT,
27 workspace TEXT NOT NULL,
28 started_at_ms INTEGER NOT NULL,
29 ended_at_ms INTEGER,
30 status TEXT NOT NULL,
31 trace_path TEXT NOT NULL
32 )",
33 "CREATE TABLE IF NOT EXISTS events (
34 id INTEGER PRIMARY KEY AUTOINCREMENT,
35 session_id TEXT NOT NULL,
36 seq INTEGER NOT NULL,
37 ts_ms INTEGER NOT NULL,
38 kind TEXT NOT NULL,
39 source TEXT NOT NULL,
40 tool TEXT,
41 tokens_in INTEGER,
42 tokens_out INTEGER,
43 cost_usd_e6 INTEGER,
44 payload TEXT NOT NULL
45 )",
46 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47 "CREATE TABLE IF NOT EXISTS files_touched (
48 id INTEGER PRIMARY KEY AUTOINCREMENT,
49 session_id TEXT NOT NULL,
50 path TEXT NOT NULL
51 )",
52 "CREATE TABLE IF NOT EXISTS skills_used (
53 id INTEGER PRIMARY KEY AUTOINCREMENT,
54 session_id TEXT NOT NULL,
55 skill TEXT NOT NULL
56 )",
57 "CREATE TABLE IF NOT EXISTS sync_outbox (
58 id INTEGER PRIMARY KEY AUTOINCREMENT,
59 session_id TEXT NOT NULL,
60 payload TEXT NOT NULL,
61 sent INTEGER NOT NULL DEFAULT 0
62 )",
63 "CREATE TABLE IF NOT EXISTS experiments (
64 id TEXT PRIMARY KEY,
65 name TEXT NOT NULL,
66 created_at_ms INTEGER NOT NULL,
67 metadata TEXT NOT NULL DEFAULT '{}'
68 )",
69 "CREATE TABLE IF NOT EXISTS experiment_tags (
70 experiment_id TEXT NOT NULL,
71 session_id TEXT NOT NULL,
72 variant TEXT NOT NULL,
73 PRIMARY KEY (experiment_id, session_id)
74 )",
75 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76 "CREATE TABLE IF NOT EXISTS sync_state (
77 k TEXT PRIMARY KEY,
78 v TEXT NOT NULL
79 )",
80 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82 "CREATE TABLE IF NOT EXISTS tool_spans (
83 span_id TEXT PRIMARY KEY,
84 session_id TEXT NOT NULL,
85 tool TEXT,
86 tool_call_id TEXT,
87 status TEXT NOT NULL,
88 started_at_ms INTEGER,
89 ended_at_ms INTEGER,
90 lead_time_ms INTEGER,
91 tokens_in INTEGER,
92 tokens_out INTEGER,
93 reasoning_tokens INTEGER,
94 cost_usd_e6 INTEGER,
95 paths_json TEXT NOT NULL DEFAULT '[]'
96 )",
97 "CREATE TABLE IF NOT EXISTS tool_span_paths (
98 span_id TEXT NOT NULL,
99 path TEXT NOT NULL,
100 PRIMARY KEY (span_id, path)
101 )",
102 "CREATE TABLE IF NOT EXISTS session_repo_binding (
103 session_id TEXT PRIMARY KEY,
104 start_commit TEXT,
105 end_commit TEXT,
106 branch TEXT,
107 dirty_start INTEGER,
108 dirty_end INTEGER,
109 repo_binding_source TEXT NOT NULL DEFAULT ''
110 )",
111 "CREATE TABLE IF NOT EXISTS repo_snapshots (
112 id TEXT PRIMARY KEY,
113 workspace TEXT NOT NULL,
114 head_commit TEXT,
115 dirty_fingerprint TEXT NOT NULL,
116 analyzer_version TEXT NOT NULL,
117 indexed_at_ms INTEGER NOT NULL,
118 dirty INTEGER NOT NULL DEFAULT 0,
119 graph_path TEXT NOT NULL
120 )",
121 "CREATE TABLE IF NOT EXISTS file_facts (
122 snapshot_id TEXT NOT NULL,
123 path TEXT NOT NULL,
124 language TEXT NOT NULL,
125 bytes INTEGER NOT NULL,
126 loc INTEGER NOT NULL,
127 sloc INTEGER NOT NULL,
128 complexity_total INTEGER NOT NULL,
129 max_fn_complexity INTEGER NOT NULL,
130 symbol_count INTEGER NOT NULL,
131 import_count INTEGER NOT NULL,
132 fan_in INTEGER NOT NULL,
133 fan_out INTEGER NOT NULL,
134 churn_30d INTEGER NOT NULL,
135 churn_90d INTEGER NOT NULL,
136 authors_90d INTEGER NOT NULL,
137 last_changed_ms INTEGER,
138 PRIMARY KEY (snapshot_id, path)
139 )",
140 "CREATE TABLE IF NOT EXISTS repo_edges (
141 snapshot_id TEXT NOT NULL,
142 from_id TEXT NOT NULL,
143 to_id TEXT NOT NULL,
144 kind TEXT NOT NULL,
145 weight INTEGER NOT NULL,
146 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147 )",
148 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152 "CREATE TABLE IF NOT EXISTS rules_used (
153 id INTEGER PRIMARY KEY AUTOINCREMENT,
154 session_id TEXT NOT NULL,
155 rule TEXT NOT NULL
156 )",
157 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158 "CREATE TABLE IF NOT EXISTS remote_pull_state (
160 id INTEGER PRIMARY KEY CHECK (id = 1),
161 query_provider TEXT NOT NULL DEFAULT 'none',
162 cursor_json TEXT NOT NULL DEFAULT '',
163 last_success_ms INTEGER
164 )",
165 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166 "CREATE TABLE IF NOT EXISTS remote_sessions (
167 team_id TEXT NOT NULL,
168 workspace_hash TEXT NOT NULL,
169 session_id_hash TEXT NOT NULL,
170 json TEXT NOT NULL,
171 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172 )",
173 "CREATE TABLE IF NOT EXISTS remote_events (
174 team_id TEXT NOT NULL,
175 workspace_hash TEXT NOT NULL,
176 session_id_hash TEXT NOT NULL,
177 event_seq INTEGER NOT NULL,
178 json TEXT NOT NULL,
179 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180 )",
181 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182 team_id TEXT NOT NULL,
183 workspace_hash TEXT NOT NULL,
184 span_id_hash TEXT NOT NULL,
185 json TEXT NOT NULL,
186 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187 )",
188 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189 team_id TEXT NOT NULL,
190 workspace_hash TEXT NOT NULL,
191 snapshot_id_hash TEXT NOT NULL,
192 chunk_index INTEGER NOT NULL,
193 json TEXT NOT NULL,
194 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195 )",
196 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197 team_id TEXT NOT NULL,
198 workspace_hash TEXT NOT NULL,
199 fact_key TEXT NOT NULL,
200 json TEXT NOT NULL,
201 PRIMARY KEY (team_id, workspace_hash, fact_key)
202 )",
203 "CREATE TABLE IF NOT EXISTS session_evals (
204 id TEXT PRIMARY KEY,
205 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206 judge_model TEXT NOT NULL,
207 rubric_id TEXT NOT NULL,
208 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209 rationale TEXT NOT NULL,
210 flagged INTEGER NOT NULL DEFAULT 0,
211 created_at_ms INTEGER NOT NULL
212 );
213 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
215 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
216 fingerprint TEXT PRIMARY KEY,
217 captured_at_ms INTEGER NOT NULL,
218 files_json TEXT NOT NULL,
219 total_bytes INTEGER NOT NULL
220 )",
221 "CREATE TABLE IF NOT EXISTS session_feedback (
222 id TEXT PRIMARY KEY,
223 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
224 score INTEGER CHECK(score BETWEEN 1 AND 5),
225 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
226 note TEXT,
227 created_at_ms INTEGER NOT NULL
228 );
229 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
230 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
231 "CREATE TABLE IF NOT EXISTS session_outcomes (
232 session_id TEXT PRIMARY KEY NOT NULL,
233 test_passed INTEGER,
234 test_failed INTEGER,
235 test_skipped INTEGER,
236 build_ok INTEGER,
237 lint_errors INTEGER,
238 revert_lines_14d INTEGER,
239 pr_open INTEGER,
240 ci_ok INTEGER,
241 measured_at_ms INTEGER NOT NULL,
242 measure_error TEXT
243 )",
244 "CREATE TABLE IF NOT EXISTS session_samples (
245 session_id TEXT NOT NULL,
246 ts_ms INTEGER NOT NULL,
247 pid INTEGER NOT NULL,
248 cpu_percent REAL,
249 rss_bytes INTEGER,
250 PRIMARY KEY (session_id, ts_ms, pid)
251 )",
252 "CREATE INDEX IF NOT EXISTS session_samples_session_idx ON session_samples(session_id)",
253];
254
255#[derive(Clone)]
257pub struct InsightsStats {
258 pub total_sessions: u64,
259 pub running_sessions: u64,
260 pub total_events: u64,
261 pub sessions_by_day: Vec<(String, u64)>,
263 pub recent: Vec<(SessionRecord, u64)>,
265 pub top_tools: Vec<(String, u64)>,
267 pub total_cost_usd_e6: i64,
268 pub sessions_with_cost: u64,
269}
270
271pub struct SyncStatusSnapshot {
273 pub pending_outbox: u64,
274 pub last_success_ms: Option<u64>,
275 pub last_error: Option<String>,
276 pub consecutive_failures: u32,
277}
278
279#[derive(serde::Serialize)]
281pub struct SummaryStats {
282 pub session_count: u64,
283 pub total_cost_usd_e6: i64,
284 pub by_agent: Vec<(String, u64)>,
285 pub by_model: Vec<(String, u64)>,
286 pub top_tools: Vec<(String, u64)>,
287}
288
289#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
291#[serde(rename_all = "lowercase")]
292pub enum GuidanceKind {
293 Skill,
294 Rule,
295}
296
297#[derive(Clone, Debug, serde::Serialize)]
299pub struct GuidancePerfRow {
300 pub kind: GuidanceKind,
301 pub id: String,
302 pub sessions: u64,
303 pub sessions_pct: f64,
304 pub total_cost_usd_e6: i64,
305 pub avg_cost_per_session_usd: Option<f64>,
306 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
307 pub on_disk: bool,
308}
309
310#[derive(Clone, Debug, serde::Serialize)]
312pub struct GuidanceReport {
313 pub workspace: String,
314 pub window_start_ms: u64,
315 pub window_end_ms: u64,
316 pub sessions_in_window: u64,
317 pub workspace_avg_cost_per_session_usd: Option<f64>,
318 pub rows: Vec<GuidancePerfRow>,
319}
320
321#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
323pub struct PruneStats {
324 pub sessions_removed: u64,
325 pub events_removed: u64,
326}
327
328#[derive(Debug, Clone, Eq, PartialEq)]
330pub struct SessionOutcomeRow {
331 pub session_id: String,
332 pub test_passed: Option<i64>,
333 pub test_failed: Option<i64>,
334 pub test_skipped: Option<i64>,
335 pub build_ok: Option<bool>,
336 pub lint_errors: Option<i64>,
337 pub revert_lines_14d: Option<i64>,
338 pub pr_open: Option<i64>,
339 pub ci_ok: Option<bool>,
340 pub measured_at_ms: u64,
341 pub measure_error: Option<String>,
342}
343
344#[derive(Debug, Clone)]
346pub struct SessionSampleAgg {
347 pub session_id: String,
348 pub sample_count: u64,
349 pub max_cpu_percent: f64,
350 pub max_rss_bytes: u64,
351}
352
353pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
355pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
356
357pub struct ToolSpanSyncRow {
358 pub span_id: String,
359 pub session_id: String,
360 pub tool: Option<String>,
361 pub tool_call_id: Option<String>,
362 pub status: String,
363 pub started_at_ms: Option<u64>,
364 pub ended_at_ms: Option<u64>,
365 pub lead_time_ms: Option<u64>,
366 pub tokens_in: Option<u32>,
367 pub tokens_out: Option<u32>,
368 pub reasoning_tokens: Option<u32>,
369 pub cost_usd_e6: Option<i64>,
370 pub paths: Vec<String>,
371}
372
373pub struct Store {
374 conn: Connection,
375}
376
377impl Store {
378 pub(crate) fn conn(&self) -> &Connection {
379 &self.conn
380 }
381
382 pub fn open(path: &Path) -> Result<Self> {
383 if let Some(parent) = path.parent() {
384 std::fs::create_dir_all(parent)?;
385 }
386 let conn =
387 Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
388 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
389 for sql in MIGRATIONS {
390 conn.execute_batch(sql)?;
391 }
392 ensure_schema_columns(&conn)?;
393 Ok(Self { conn })
394 }
395
396 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
397 self.conn.execute(
398 "INSERT INTO sessions (
399 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
400 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
401 prompt_fingerprint, parent_session_id, agent_version, os, arch,
402 repo_file_count, repo_total_loc
403 )
404 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15,
405 ?16, ?17, ?18, ?19, ?20, ?21)
406 ON CONFLICT(id) DO UPDATE SET
407 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
408 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
409 status=excluded.status, trace_path=excluded.trace_path,
410 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
411 branch=excluded.branch, dirty_start=excluded.dirty_start,
412 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
413 prompt_fingerprint=excluded.prompt_fingerprint,
414 parent_session_id=excluded.parent_session_id,
415 agent_version=excluded.agent_version, os=excluded.os, arch=excluded.arch,
416 repo_file_count=excluded.repo_file_count, repo_total_loc=excluded.repo_total_loc",
417 params![
418 s.id,
419 s.agent,
420 s.model,
421 s.workspace,
422 s.started_at_ms as i64,
423 s.ended_at_ms.map(|v| v as i64),
424 format!("{:?}", s.status),
425 s.trace_path,
426 s.start_commit,
427 s.end_commit,
428 s.branch,
429 s.dirty_start.map(bool_to_i64),
430 s.dirty_end.map(bool_to_i64),
431 s.repo_binding_source.clone().unwrap_or_default(),
432 s.prompt_fingerprint.as_deref(),
433 s.parent_session_id.as_deref(),
434 s.agent_version.as_deref(),
435 s.os.as_deref(),
436 s.arch.as_deref(),
437 s.repo_file_count.map(|v| v as i64),
438 s.repo_total_loc.map(|v| v as i64),
439 ],
440 )?;
441 self.conn.execute(
442 "INSERT INTO session_repo_binding (
443 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
444 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
445 ON CONFLICT(session_id) DO UPDATE SET
446 start_commit=excluded.start_commit,
447 end_commit=excluded.end_commit,
448 branch=excluded.branch,
449 dirty_start=excluded.dirty_start,
450 dirty_end=excluded.dirty_end,
451 repo_binding_source=excluded.repo_binding_source",
452 params![
453 s.id,
454 s.start_commit,
455 s.end_commit,
456 s.branch,
457 s.dirty_start.map(bool_to_i64),
458 s.dirty_end.map(bool_to_i64),
459 s.repo_binding_source.clone().unwrap_or_default(),
460 ],
461 )?;
462 Ok(())
463 }
464
465 pub fn ensure_session_stub(
468 &self,
469 id: &str,
470 agent: &str,
471 workspace: &str,
472 started_at_ms: u64,
473 ) -> Result<()> {
474 self.conn.execute(
475 "INSERT OR IGNORE INTO sessions (
476 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
477 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
478 prompt_fingerprint, parent_session_id, agent_version, os, arch, repo_file_count, repo_total_loc
479 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '',
480 NULL, NULL, NULL, NULL, NULL, NULL, NULL)",
481 params![id, agent, workspace, started_at_ms as i64],
482 )?;
483 Ok(())
484 }
485
486 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
488 let n: i64 = self.conn.query_row(
489 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
490 [session_id],
491 |r| r.get(0),
492 )?;
493 Ok(n as u64)
494 }
495
496 pub fn append_event(&self, e: &Event) -> Result<()> {
497 self.append_event_with_sync(e, None)
498 }
499
500 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
502 let payload = serde_json::to_string(&e.payload)?;
503 self.conn.execute(
504 "INSERT INTO events (
505 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
506 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
507 stop_reason, latency_ms, ttft_ms, retry_count,
508 context_used_tokens, context_max_tokens,
509 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
510 ) VALUES (
511 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
512 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
513 )
514 ON CONFLICT(session_id, seq) DO UPDATE SET
515 ts_ms = excluded.ts_ms,
516 ts_exact = excluded.ts_exact,
517 kind = excluded.kind,
518 source = excluded.source,
519 tool = excluded.tool,
520 tool_call_id = excluded.tool_call_id,
521 tokens_in = excluded.tokens_in,
522 tokens_out = excluded.tokens_out,
523 reasoning_tokens = excluded.reasoning_tokens,
524 cost_usd_e6 = excluded.cost_usd_e6,
525 payload = excluded.payload,
526 stop_reason = excluded.stop_reason,
527 latency_ms = excluded.latency_ms,
528 ttft_ms = excluded.ttft_ms,
529 retry_count = excluded.retry_count,
530 context_used_tokens = excluded.context_used_tokens,
531 context_max_tokens = excluded.context_max_tokens,
532 cache_creation_tokens = excluded.cache_creation_tokens,
533 cache_read_tokens = excluded.cache_read_tokens,
534 system_prompt_tokens = excluded.system_prompt_tokens",
535 params![
536 e.session_id,
537 e.seq as i64,
538 e.ts_ms as i64,
539 bool_to_i64(e.ts_exact),
540 format!("{:?}", e.kind),
541 format!("{:?}", e.source),
542 e.tool,
543 e.tool_call_id,
544 e.tokens_in.map(|v| v as i64),
545 e.tokens_out.map(|v| v as i64),
546 e.reasoning_tokens.map(|v| v as i64),
547 e.cost_usd_e6,
548 payload,
549 e.stop_reason,
550 e.latency_ms.map(|v| v as i64),
551 e.ttft_ms.map(|v| v as i64),
552 e.retry_count.map(|v| v as i64),
553 e.context_used_tokens.map(|v| v as i64),
554 e.context_max_tokens.map(|v| v as i64),
555 e.cache_creation_tokens.map(|v| v as i64),
556 e.cache_read_tokens.map(|v| v as i64),
557 e.system_prompt_tokens.map(|v| v as i64),
558 ],
559 )?;
560 if self.conn.changes() == 0 {
561 return Ok(());
562 }
563 index_event_derived(&self.conn, e)?;
564 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
565 let Some(ctx) = ctx else {
566 return Ok(());
567 };
568 let sync = &ctx.sync;
569 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
570 return Ok(());
571 }
572 let Some(salt) = try_team_salt(sync) else {
573 tracing::warn!(
574 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
575 );
576 return Ok(());
577 };
578 if sync.sample_rate < 1.0 {
579 let u: f64 = rand::random();
580 if u > sync.sample_rate {
581 return Ok(());
582 }
583 }
584 let Some(session) = self.get_session(&e.session_id)? else {
585 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
586 return Ok(());
587 };
588 let mut outbound = outbound_event_from_row(e, &session, &salt);
589 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
590 let row = serde_json::to_string(&outbound)?;
591 self.conn.execute(
592 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
593 params![e.session_id, row],
594 )?;
595 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
596 Ok(())
597 }
598
599 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
600 let mut stmt = self.conn.prepare(
601 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
602 )?;
603 let rows = stmt.query_map(params![limit as i64], |row| {
604 Ok((
605 row.get::<_, i64>(0)?,
606 row.get::<_, String>(1)?,
607 row.get::<_, String>(2)?,
608 ))
609 })?;
610 let mut out = Vec::new();
611 for r in rows {
612 out.push(r?);
613 }
614 Ok(out)
615 }
616
617 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
618 for id in ids {
619 self.conn
620 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
621 }
622 Ok(())
623 }
624
625 pub fn replace_outbox_rows(
626 &self,
627 owner_id: &str,
628 kind: &str,
629 payloads: &[String],
630 ) -> Result<()> {
631 self.conn.execute(
632 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
633 params![owner_id, kind],
634 )?;
635 for payload in payloads {
636 self.conn.execute(
637 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
638 params![owner_id, kind, payload],
639 )?;
640 }
641 Ok(())
642 }
643
644 pub fn outbox_pending_count(&self) -> Result<u64> {
645 let c: i64 =
646 self.conn
647 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
648 r.get(0)
649 })?;
650 Ok(c as u64)
651 }
652
653 pub fn set_sync_state_ok(&self) -> Result<()> {
654 let now = now_ms().to_string();
655 self.conn.execute(
656 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
657 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
658 params![now],
659 )?;
660 self.conn.execute(
661 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
662 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
663 [],
664 )?;
665 self.conn
666 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
667 Ok(())
668 }
669
670 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
671 let prev: i64 = self
672 .conn
673 .query_row(
674 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
675 [],
676 |r| {
677 let s: String = r.get(0)?;
678 Ok(s.parse::<i64>().unwrap_or(0))
679 },
680 )
681 .optional()?
682 .unwrap_or(0);
683 let next = prev.saturating_add(1);
684 self.conn.execute(
685 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
686 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
687 params![msg],
688 )?;
689 self.conn.execute(
690 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
691 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
692 params![next.to_string()],
693 )?;
694 Ok(())
695 }
696
697 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
698 let pending_outbox = self.outbox_pending_count()?;
699 let last_success_ms = self
700 .conn
701 .query_row(
702 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
703 [],
704 |r| r.get::<_, String>(0),
705 )
706 .optional()?
707 .and_then(|s| s.parse().ok());
708 let last_error = self
709 .conn
710 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
711 r.get::<_, String>(0)
712 })
713 .optional()?;
714 let consecutive_failures = self
715 .conn
716 .query_row(
717 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
718 [],
719 |r| r.get::<_, String>(0),
720 )
721 .optional()?
722 .and_then(|s| s.parse().ok())
723 .unwrap_or(0);
724 Ok(SyncStatusSnapshot {
725 pending_outbox,
726 last_success_ms,
727 last_error,
728 consecutive_failures,
729 })
730 }
731
732 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
733 let row: Option<String> = self
734 .conn
735 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
736 r.get::<_, String>(0)
737 })
738 .optional()?;
739 Ok(row.and_then(|s| s.parse().ok()))
740 }
741
742 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
743 self.conn.execute(
744 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
745 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
746 params![key, v.to_string()],
747 )?;
748 Ok(())
749 }
750
751 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
753 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
754 let sessions_to_remove: i64 = tx.query_row(
755 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
756 params![cutoff_ms],
757 |r| r.get(0),
758 )?;
759 let events_to_remove: i64 = tx.query_row(
760 "SELECT COUNT(*) FROM events WHERE session_id IN \
761 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
762 params![cutoff_ms],
763 |r| r.get(0),
764 )?;
765
766 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
767 tx.execute(
768 &format!(
769 "DELETE FROM tool_span_paths WHERE span_id IN \
770 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
771 ),
772 params![cutoff_ms],
773 )?;
774 tx.execute(
775 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
776 params![cutoff_ms],
777 )?;
778 tx.execute(
779 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
780 params![cutoff_ms],
781 )?;
782 tx.execute(
783 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
784 params![cutoff_ms],
785 )?;
786 tx.execute(
787 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
788 params![cutoff_ms],
789 )?;
790 tx.execute(
791 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
792 params![cutoff_ms],
793 )?;
794 tx.execute(
795 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
796 params![cutoff_ms],
797 )?;
798 tx.execute(
799 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
800 params![cutoff_ms],
801 )?;
802 tx.execute(
803 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
804 params![cutoff_ms],
805 )?;
806 tx.execute(
807 &format!("DELETE FROM session_outcomes WHERE session_id IN ({sub_old_sessions})"),
808 params![cutoff_ms],
809 )?;
810 tx.execute(
811 &format!("DELETE FROM session_samples WHERE session_id IN ({sub_old_sessions})"),
812 params![cutoff_ms],
813 )?;
814 tx.execute(
815 "DELETE FROM sessions WHERE started_at_ms < ?1",
816 params![cutoff_ms],
817 )?;
818 tx.commit()?;
819 Ok(PruneStats {
820 sessions_removed: sessions_to_remove as u64,
821 events_removed: events_to_remove as u64,
822 })
823 }
824
825 pub fn vacuum(&self) -> Result<()> {
827 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
828 Ok(())
829 }
830
831 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
832 let mut stmt = self.conn.prepare(
833 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
834 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
835 prompt_fingerprint, parent_session_id, agent_version, os, arch,
836 repo_file_count, repo_total_loc
837 FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
838 )?;
839 let rows = stmt.query_map(params![workspace], |row| {
840 Ok((
841 row.get::<_, String>(0)?,
842 row.get::<_, String>(1)?,
843 row.get::<_, Option<String>>(2)?,
844 row.get::<_, String>(3)?,
845 row.get::<_, i64>(4)?,
846 row.get::<_, Option<i64>>(5)?,
847 row.get::<_, String>(6)?,
848 row.get::<_, String>(7)?,
849 row.get::<_, Option<String>>(8)?,
850 row.get::<_, Option<String>>(9)?,
851 row.get::<_, Option<String>>(10)?,
852 row.get::<_, Option<i64>>(11)?,
853 row.get::<_, Option<i64>>(12)?,
854 row.get::<_, String>(13)?,
855 row.get::<_, Option<String>>(14)?,
856 row.get::<_, Option<String>>(15)?,
857 row.get::<_, Option<String>>(16)?,
858 row.get::<_, Option<String>>(17)?,
859 row.get::<_, Option<String>>(18)?,
860 row.get::<_, Option<i64>>(19)?,
861 row.get::<_, Option<i64>>(20)?,
862 ))
863 })?;
864
865 let mut out = Vec::new();
866 for row in rows {
867 let (
868 id,
869 agent,
870 model,
871 workspace,
872 started,
873 ended,
874 status_str,
875 trace,
876 start_commit,
877 end_commit,
878 branch,
879 dirty_start,
880 dirty_end,
881 source,
882 prompt_fingerprint,
883 parent_session_id,
884 agent_version,
885 os,
886 arch,
887 repo_file_count,
888 repo_total_loc,
889 ) = row?;
890 out.push(SessionRecord {
891 id,
892 agent,
893 model,
894 workspace,
895 started_at_ms: started as u64,
896 ended_at_ms: ended.map(|v| v as u64),
897 status: status_from_str(&status_str),
898 trace_path: trace,
899 start_commit,
900 end_commit,
901 branch,
902 dirty_start: dirty_start.map(i64_to_bool),
903 dirty_end: dirty_end.map(i64_to_bool),
904 repo_binding_source: empty_to_none(source),
905 prompt_fingerprint,
906 parent_session_id,
907 agent_version,
908 os,
909 arch,
910 repo_file_count: repo_file_count.map(|v| v as u32),
911 repo_total_loc: repo_total_loc.map(|v| v as u64),
912 });
913 }
914 Ok(out)
915 }
916
917 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
918 let session_count: i64 = self.conn.query_row(
919 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
920 params![workspace],
921 |r| r.get(0),
922 )?;
923
924 let total_cost: i64 = self.conn.query_row(
925 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
926 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
927 params![workspace],
928 |r| r.get(0),
929 )?;
930
931 let mut stmt = self.conn.prepare(
932 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
933 )?;
934 let by_agent: Vec<(String, u64)> = stmt
935 .query_map(params![workspace], |r| {
936 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
937 })?
938 .filter_map(|r| r.ok())
939 .collect();
940
941 let mut stmt = self.conn.prepare(
942 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
943 )?;
944 let by_model: Vec<(String, u64)> = stmt
945 .query_map(params![workspace], |r| {
946 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
947 })?
948 .filter_map(|r| r.ok())
949 .collect();
950
951 let mut stmt = self.conn.prepare(
952 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
953 WHERE s.workspace = ?1 AND tool IS NOT NULL
954 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
955 )?;
956 let top_tools: Vec<(String, u64)> = stmt
957 .query_map(params![workspace], |r| {
958 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
959 })?
960 .filter_map(|r| r.ok())
961 .collect();
962
963 Ok(SummaryStats {
964 session_count: session_count as u64,
965 total_cost_usd_e6: total_cost,
966 by_agent,
967 by_model,
968 top_tools,
969 })
970 }
971
972 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
973 let mut stmt = self.conn.prepare(
974 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
975 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
976 stop_reason, latency_ms, ttft_ms, retry_count,
977 context_used_tokens, context_max_tokens,
978 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
979 FROM events WHERE session_id = ?1 ORDER BY seq ASC",
980 )?;
981 let rows = stmt.query_map(params![session_id], |row| {
982 let payload_str: String = row.get(12)?;
983 Ok(Event {
984 session_id: row.get(0)?,
985 seq: row.get::<_, i64>(1)? as u64,
986 ts_ms: row.get::<_, i64>(2)? as u64,
987 ts_exact: row.get::<_, i64>(3)? != 0,
988 kind: kind_from_str(&row.get::<_, String>(4)?),
989 source: source_from_str(&row.get::<_, String>(5)?),
990 tool: row.get(6)?,
991 tool_call_id: row.get(7)?,
992 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
993 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
994 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
995 cost_usd_e6: row.get(11)?,
996 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
997 stop_reason: row.get(13)?,
998 latency_ms: row.get::<_, Option<i64>>(14)?.map(|v| v as u32),
999 ttft_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u32),
1000 retry_count: row.get::<_, Option<i64>>(16)?.map(|v| v as u16),
1001 context_used_tokens: row.get::<_, Option<i64>>(17)?.map(|v| v as u32),
1002 context_max_tokens: row.get::<_, Option<i64>>(18)?.map(|v| v as u32),
1003 cache_creation_tokens: row.get::<_, Option<i64>>(19)?.map(|v| v as u32),
1004 cache_read_tokens: row.get::<_, Option<i64>>(20)?.map(|v| v as u32),
1005 system_prompt_tokens: row.get::<_, Option<i64>>(21)?.map(|v| v as u32),
1006 })
1007 })?;
1008
1009 let mut events = Vec::new();
1010 for row in rows {
1011 events.push(row?);
1012 }
1013 Ok(events)
1014 }
1015
1016 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
1018 self.conn.execute(
1019 "UPDATE sessions SET status = ?1 WHERE id = ?2",
1020 params![format!("{:?}", status), id],
1021 )?;
1022 Ok(())
1023 }
1024
1025 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
1027 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
1028 Ok(InsightsStats {
1029 total_sessions: count_q(
1030 &self.conn,
1031 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
1032 workspace,
1033 )?,
1034 running_sessions: count_q(
1035 &self.conn,
1036 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
1037 workspace,
1038 )?,
1039 total_events: count_q(
1040 &self.conn,
1041 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1042 workspace,
1043 )?,
1044 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
1045 recent: recent_sessions_3(&self.conn, workspace)?,
1046 top_tools: top_tools_5(&self.conn, workspace)?,
1047 total_cost_usd_e6,
1048 sessions_with_cost,
1049 })
1050 }
1051
1052 pub fn retro_events_in_window(
1054 &self,
1055 workspace: &str,
1056 start_ms: u64,
1057 end_ms: u64,
1058 ) -> Result<Vec<(SessionRecord, Event)>> {
1059 let mut stmt = self.conn.prepare(
1060 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
1061 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
1062 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
1063 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source,
1064 s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch,
1065 s.repo_file_count, s.repo_total_loc,
1066 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
1067 e.context_used_tokens, e.context_max_tokens,
1068 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
1069 FROM events e
1070 JOIN sessions s ON s.id = e.session_id
1071 WHERE s.workspace = ?1
1072 AND (
1073 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1074 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1075 )
1076 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
1077 )?;
1078 let rows = stmt.query_map(
1079 params![
1080 workspace,
1081 start_ms as i64,
1082 end_ms as i64,
1083 SYNTHETIC_TS_CEILING_MS,
1084 ],
1085 |row| {
1086 let payload_str: String = row.get(12)?;
1087 let status_str: String = row.get(19)?;
1088 Ok((
1089 SessionRecord {
1090 id: row.get(13)?,
1091 agent: row.get(14)?,
1092 model: row.get(15)?,
1093 workspace: row.get(16)?,
1094 started_at_ms: row.get::<_, i64>(17)? as u64,
1095 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
1096 status: status_from_str(&status_str),
1097 trace_path: row.get(20)?,
1098 start_commit: row.get(21)?,
1099 end_commit: row.get(22)?,
1100 branch: row.get(23)?,
1101 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1102 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1103 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1104 prompt_fingerprint: row.get(27)?,
1105 parent_session_id: row.get(28)?,
1106 agent_version: row.get(29)?,
1107 os: row.get(30)?,
1108 arch: row.get(31)?,
1109 repo_file_count: row.get::<_, Option<i64>>(32)?.map(|v| v as u32),
1110 repo_total_loc: row.get::<_, Option<i64>>(33)?.map(|v| v as u64),
1111 },
1112 Event {
1113 session_id: row.get(0)?,
1114 seq: row.get::<_, i64>(1)? as u64,
1115 ts_ms: row.get::<_, i64>(2)? as u64,
1116 ts_exact: row.get::<_, i64>(3)? != 0,
1117 kind: kind_from_str(&row.get::<_, String>(4)?),
1118 source: source_from_str(&row.get::<_, String>(5)?),
1119 tool: row.get(6)?,
1120 tool_call_id: row.get(7)?,
1121 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1122 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1123 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1124 cost_usd_e6: row.get(11)?,
1125 payload: serde_json::from_str(&payload_str)
1126 .unwrap_or(serde_json::Value::Null),
1127 stop_reason: row.get(34)?,
1128 latency_ms: row.get::<_, Option<i64>>(35)?.map(|v| v as u32),
1129 ttft_ms: row.get::<_, Option<i64>>(36)?.map(|v| v as u32),
1130 retry_count: row.get::<_, Option<i64>>(37)?.map(|v| v as u16),
1131 context_used_tokens: row.get::<_, Option<i64>>(38)?.map(|v| v as u32),
1132 context_max_tokens: row.get::<_, Option<i64>>(39)?.map(|v| v as u32),
1133 cache_creation_tokens: row.get::<_, Option<i64>>(40)?.map(|v| v as u32),
1134 cache_read_tokens: row.get::<_, Option<i64>>(41)?.map(|v| v as u32),
1135 system_prompt_tokens: row.get::<_, Option<i64>>(42)?.map(|v| v as u32),
1136 },
1137 ))
1138 },
1139 )?;
1140
1141 let mut out = Vec::new();
1142 for r in rows {
1143 out.push(r?);
1144 }
1145 Ok(out)
1146 }
1147
1148 pub fn files_touched_in_window(
1150 &self,
1151 workspace: &str,
1152 start_ms: u64,
1153 end_ms: u64,
1154 ) -> Result<Vec<(String, String)>> {
1155 let mut stmt = self.conn.prepare(
1156 "SELECT DISTINCT ft.session_id, ft.path
1157 FROM files_touched ft
1158 JOIN sessions s ON s.id = ft.session_id
1159 WHERE s.workspace = ?1
1160 AND EXISTS (
1161 SELECT 1 FROM events e
1162 JOIN sessions ss ON ss.id = e.session_id
1163 WHERE e.session_id = ft.session_id
1164 AND (
1165 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1166 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1167 )
1168 )
1169 ORDER BY ft.session_id, ft.path",
1170 )?;
1171 let out: Vec<(String, String)> = stmt
1172 .query_map(
1173 params![
1174 workspace,
1175 start_ms as i64,
1176 end_ms as i64,
1177 SYNTHETIC_TS_CEILING_MS,
1178 ],
1179 |r| Ok((r.get(0)?, r.get(1)?)),
1180 )?
1181 .filter_map(|r| r.ok())
1182 .collect();
1183 Ok(out)
1184 }
1185
1186 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1189 let mut stmt = self.conn.prepare(
1190 "SELECT DISTINCT su.skill
1191 FROM skills_used su
1192 JOIN sessions s ON s.id = su.session_id
1193 WHERE s.workspace = ?1
1194 AND EXISTS (
1195 SELECT 1 FROM events e
1196 JOIN sessions ss ON ss.id = e.session_id
1197 WHERE e.session_id = su.session_id
1198 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1199 )
1200 ORDER BY su.skill",
1201 )?;
1202 let out: Vec<String> = stmt
1203 .query_map(
1204 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1205 |r| r.get::<_, String>(0),
1206 )?
1207 .filter_map(|r| r.ok())
1208 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1209 .collect();
1210 Ok(out)
1211 }
1212
1213 pub fn skills_used_in_window(
1215 &self,
1216 workspace: &str,
1217 start_ms: u64,
1218 end_ms: u64,
1219 ) -> Result<Vec<(String, String)>> {
1220 let mut stmt = self.conn.prepare(
1221 "SELECT DISTINCT su.session_id, su.skill
1222 FROM skills_used su
1223 JOIN sessions s ON s.id = su.session_id
1224 WHERE s.workspace = ?1
1225 AND EXISTS (
1226 SELECT 1 FROM events e
1227 JOIN sessions ss ON ss.id = e.session_id
1228 WHERE e.session_id = su.session_id
1229 AND (
1230 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1231 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1232 )
1233 )
1234 ORDER BY su.session_id, su.skill",
1235 )?;
1236 let out: Vec<(String, String)> = stmt
1237 .query_map(
1238 params![
1239 workspace,
1240 start_ms as i64,
1241 end_ms as i64,
1242 SYNTHETIC_TS_CEILING_MS,
1243 ],
1244 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1245 )?
1246 .filter_map(|r| r.ok())
1247 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1248 .collect();
1249 Ok(out)
1250 }
1251
1252 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1254 let mut stmt = self.conn.prepare(
1255 "SELECT DISTINCT ru.rule
1256 FROM rules_used ru
1257 JOIN sessions s ON s.id = ru.session_id
1258 WHERE s.workspace = ?1
1259 AND EXISTS (
1260 SELECT 1 FROM events e
1261 JOIN sessions ss ON ss.id = e.session_id
1262 WHERE e.session_id = ru.session_id
1263 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1264 )
1265 ORDER BY ru.rule",
1266 )?;
1267 let out: Vec<String> = stmt
1268 .query_map(
1269 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1270 |r| r.get::<_, String>(0),
1271 )?
1272 .filter_map(|r| r.ok())
1273 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1274 .collect();
1275 Ok(out)
1276 }
1277
1278 pub fn rules_used_in_window(
1280 &self,
1281 workspace: &str,
1282 start_ms: u64,
1283 end_ms: u64,
1284 ) -> Result<Vec<(String, String)>> {
1285 let mut stmt = self.conn.prepare(
1286 "SELECT DISTINCT ru.session_id, ru.rule
1287 FROM rules_used ru
1288 JOIN sessions s ON s.id = ru.session_id
1289 WHERE s.workspace = ?1
1290 AND EXISTS (
1291 SELECT 1 FROM events e
1292 JOIN sessions ss ON ss.id = e.session_id
1293 WHERE e.session_id = ru.session_id
1294 AND (
1295 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1296 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1297 )
1298 )
1299 ORDER BY ru.session_id, ru.rule",
1300 )?;
1301 let out: Vec<(String, String)> = stmt
1302 .query_map(
1303 params![
1304 workspace,
1305 start_ms as i64,
1306 end_ms as i64,
1307 SYNTHETIC_TS_CEILING_MS,
1308 ],
1309 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1310 )?
1311 .filter_map(|r| r.ok())
1312 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1313 .collect();
1314 Ok(out)
1315 }
1316
1317 pub fn sessions_active_in_window(
1319 &self,
1320 workspace: &str,
1321 start_ms: u64,
1322 end_ms: u64,
1323 ) -> Result<HashSet<String>> {
1324 let mut stmt = self.conn.prepare(
1325 "SELECT DISTINCT s.id
1326 FROM sessions s
1327 WHERE s.workspace = ?1
1328 AND EXISTS (
1329 SELECT 1 FROM events e
1330 WHERE e.session_id = s.id
1331 AND (
1332 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1333 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1334 )
1335 )",
1336 )?;
1337 let out: HashSet<String> = stmt
1338 .query_map(
1339 params![
1340 workspace,
1341 start_ms as i64,
1342 end_ms as i64,
1343 SYNTHETIC_TS_CEILING_MS,
1344 ],
1345 |r| r.get(0),
1346 )?
1347 .filter_map(|r| r.ok())
1348 .collect();
1349 Ok(out)
1350 }
1351
1352 pub fn session_costs_usd_e6_in_window(
1354 &self,
1355 workspace: &str,
1356 start_ms: u64,
1357 end_ms: u64,
1358 ) -> Result<HashMap<String, i64>> {
1359 let mut stmt = self.conn.prepare(
1360 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1361 FROM events e
1362 JOIN sessions s ON s.id = e.session_id
1363 WHERE s.workspace = ?1
1364 AND (
1365 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1366 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1367 )
1368 GROUP BY e.session_id",
1369 )?;
1370 let rows: Vec<(String, i64)> = stmt
1371 .query_map(
1372 params![
1373 workspace,
1374 start_ms as i64,
1375 end_ms as i64,
1376 SYNTHETIC_TS_CEILING_MS,
1377 ],
1378 |r| Ok((r.get(0)?, r.get(1)?)),
1379 )?
1380 .filter_map(|r| r.ok())
1381 .collect();
1382 Ok(rows.into_iter().collect())
1383 }
1384
1385 pub fn guidance_report(
1387 &self,
1388 workspace: &str,
1389 window_start_ms: u64,
1390 window_end_ms: u64,
1391 skill_slugs_on_disk: &HashSet<String>,
1392 rule_slugs_on_disk: &HashSet<String>,
1393 ) -> Result<GuidanceReport> {
1394 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1395 let denom = active.len() as u64;
1396 let costs =
1397 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1398
1399 let workspace_avg_cost_per_session_usd = if denom > 0 {
1400 let total_e6: i64 = active
1401 .iter()
1402 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1403 .sum();
1404 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1405 } else {
1406 None
1407 };
1408
1409 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1410 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1411 skill_sessions.entry(skill).or_default().insert(sid);
1412 }
1413 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1414 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1415 rule_sessions.entry(rule).or_default().insert(sid);
1416 }
1417
1418 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1419
1420 let mut push_row =
1421 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1422 let sessions = sids.len() as u64;
1423 let sessions_pct = if denom > 0 {
1424 sessions as f64 * 100.0 / denom as f64
1425 } else {
1426 0.0
1427 };
1428 let total_cost_usd_e6: i64 = sids
1429 .iter()
1430 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1431 .sum();
1432 let avg_cost_per_session_usd = if sessions > 0 {
1433 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1434 } else {
1435 None
1436 };
1437 let vs_workspace_avg_cost_per_session_usd =
1438 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1439 (Some(avg), Some(w)) => Some(avg - w),
1440 _ => None,
1441 };
1442 rows.push(GuidancePerfRow {
1443 kind,
1444 id,
1445 sessions,
1446 sessions_pct,
1447 total_cost_usd_e6,
1448 avg_cost_per_session_usd,
1449 vs_workspace_avg_cost_per_session_usd,
1450 on_disk,
1451 });
1452 };
1453
1454 let mut seen_skills: HashSet<String> = HashSet::new();
1455 for (id, sids) in &skill_sessions {
1456 seen_skills.insert(id.clone());
1457 push_row(
1458 GuidanceKind::Skill,
1459 id.clone(),
1460 sids,
1461 skill_slugs_on_disk.contains(id),
1462 );
1463 }
1464 for slug in skill_slugs_on_disk {
1465 if seen_skills.contains(slug) {
1466 continue;
1467 }
1468 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1469 }
1470
1471 let mut seen_rules: HashSet<String> = HashSet::new();
1472 for (id, sids) in &rule_sessions {
1473 seen_rules.insert(id.clone());
1474 push_row(
1475 GuidanceKind::Rule,
1476 id.clone(),
1477 sids,
1478 rule_slugs_on_disk.contains(id),
1479 );
1480 }
1481 for slug in rule_slugs_on_disk {
1482 if seen_rules.contains(slug) {
1483 continue;
1484 }
1485 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1486 }
1487
1488 rows.sort_by(|a, b| {
1489 b.sessions
1490 .cmp(&a.sessions)
1491 .then_with(|| a.kind.cmp(&b.kind))
1492 .then_with(|| a.id.cmp(&b.id))
1493 });
1494
1495 Ok(GuidanceReport {
1496 workspace: workspace.to_string(),
1497 window_start_ms,
1498 window_end_ms,
1499 sessions_in_window: denom,
1500 workspace_avg_cost_per_session_usd,
1501 rows,
1502 })
1503 }
1504
1505 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1506 let mut stmt = self.conn.prepare(
1507 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1508 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1509 prompt_fingerprint, parent_session_id, agent_version, os, arch,
1510 repo_file_count, repo_total_loc
1511 FROM sessions WHERE id = ?1",
1512 )?;
1513 let mut rows = stmt.query_map(params![id], |row| {
1514 Ok((
1515 row.get::<_, String>(0)?,
1516 row.get::<_, String>(1)?,
1517 row.get::<_, Option<String>>(2)?,
1518 row.get::<_, String>(3)?,
1519 row.get::<_, i64>(4)?,
1520 row.get::<_, Option<i64>>(5)?,
1521 row.get::<_, String>(6)?,
1522 row.get::<_, String>(7)?,
1523 row.get::<_, Option<String>>(8)?,
1524 row.get::<_, Option<String>>(9)?,
1525 row.get::<_, Option<String>>(10)?,
1526 row.get::<_, Option<i64>>(11)?,
1527 row.get::<_, Option<i64>>(12)?,
1528 row.get::<_, String>(13)?,
1529 row.get::<_, Option<String>>(14)?,
1530 row.get::<_, Option<String>>(15)?,
1531 row.get::<_, Option<String>>(16)?,
1532 row.get::<_, Option<String>>(17)?,
1533 row.get::<_, Option<String>>(18)?,
1534 row.get::<_, Option<i64>>(19)?,
1535 row.get::<_, Option<i64>>(20)?,
1536 ))
1537 })?;
1538
1539 if let Some(row) = rows.next() {
1540 let (
1541 id,
1542 agent,
1543 model,
1544 workspace,
1545 started,
1546 ended,
1547 status_str,
1548 trace,
1549 start_commit,
1550 end_commit,
1551 branch,
1552 dirty_start,
1553 dirty_end,
1554 source,
1555 prompt_fingerprint,
1556 parent_session_id,
1557 agent_version,
1558 os,
1559 arch,
1560 repo_file_count,
1561 repo_total_loc,
1562 ) = row?;
1563 Ok(Some(SessionRecord {
1564 id,
1565 agent,
1566 model,
1567 workspace,
1568 started_at_ms: started as u64,
1569 ended_at_ms: ended.map(|v| v as u64),
1570 status: status_from_str(&status_str),
1571 trace_path: trace,
1572 start_commit,
1573 end_commit,
1574 branch,
1575 dirty_start: dirty_start.map(i64_to_bool),
1576 dirty_end: dirty_end.map(i64_to_bool),
1577 repo_binding_source: empty_to_none(source),
1578 prompt_fingerprint,
1579 parent_session_id,
1580 agent_version,
1581 os,
1582 arch,
1583 repo_file_count: repo_file_count.map(|v| v as u32),
1584 repo_total_loc: repo_total_loc.map(|v| v as u64),
1585 }))
1586 } else {
1587 Ok(None)
1588 }
1589 }
1590
1591 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1592 let mut stmt = self.conn.prepare(
1593 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1594 indexed_at_ms, dirty, graph_path
1595 FROM repo_snapshots WHERE workspace = ?1
1596 ORDER BY indexed_at_ms DESC LIMIT 1",
1597 )?;
1598 let mut rows = stmt.query_map(params![workspace], |row| {
1599 Ok(RepoSnapshotRecord {
1600 id: row.get(0)?,
1601 workspace: row.get(1)?,
1602 head_commit: row.get(2)?,
1603 dirty_fingerprint: row.get(3)?,
1604 analyzer_version: row.get(4)?,
1605 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1606 dirty: row.get::<_, i64>(6)? != 0,
1607 graph_path: row.get(7)?,
1608 })
1609 })?;
1610 Ok(rows.next().transpose()?)
1611 }
1612
1613 pub fn save_repo_snapshot(
1614 &self,
1615 snapshot: &RepoSnapshotRecord,
1616 facts: &[FileFact],
1617 edges: &[RepoEdge],
1618 ) -> Result<()> {
1619 self.conn.execute(
1620 "INSERT INTO repo_snapshots (
1621 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1622 indexed_at_ms, dirty, graph_path
1623 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1624 ON CONFLICT(id) DO UPDATE SET
1625 workspace=excluded.workspace,
1626 head_commit=excluded.head_commit,
1627 dirty_fingerprint=excluded.dirty_fingerprint,
1628 analyzer_version=excluded.analyzer_version,
1629 indexed_at_ms=excluded.indexed_at_ms,
1630 dirty=excluded.dirty,
1631 graph_path=excluded.graph_path",
1632 params![
1633 snapshot.id,
1634 snapshot.workspace,
1635 snapshot.head_commit,
1636 snapshot.dirty_fingerprint,
1637 snapshot.analyzer_version,
1638 snapshot.indexed_at_ms as i64,
1639 bool_to_i64(snapshot.dirty),
1640 snapshot.graph_path,
1641 ],
1642 )?;
1643 self.conn.execute(
1644 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1645 params![snapshot.id],
1646 )?;
1647 self.conn.execute(
1648 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1649 params![snapshot.id],
1650 )?;
1651 for fact in facts {
1652 self.conn.execute(
1653 "INSERT INTO file_facts (
1654 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1655 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1656 churn_30d, churn_90d, authors_90d, last_changed_ms
1657 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1658 params![
1659 fact.snapshot_id,
1660 fact.path,
1661 fact.language,
1662 fact.bytes as i64,
1663 fact.loc as i64,
1664 fact.sloc as i64,
1665 fact.complexity_total as i64,
1666 fact.max_fn_complexity as i64,
1667 fact.symbol_count as i64,
1668 fact.import_count as i64,
1669 fact.fan_in as i64,
1670 fact.fan_out as i64,
1671 fact.churn_30d as i64,
1672 fact.churn_90d as i64,
1673 fact.authors_90d as i64,
1674 fact.last_changed_ms.map(|v| v as i64),
1675 ],
1676 )?;
1677 }
1678 for edge in edges {
1679 self.conn.execute(
1680 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1681 VALUES (?1, ?2, ?3, ?4, ?5)
1682 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1683 DO UPDATE SET weight = weight + excluded.weight",
1684 params![
1685 snapshot.id,
1686 edge.from_path,
1687 edge.to_path,
1688 edge.kind,
1689 edge.weight as i64,
1690 ],
1691 )?;
1692 }
1693 Ok(())
1694 }
1695
1696 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1697 let mut stmt = self.conn.prepare(
1698 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1699 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1700 churn_30d, churn_90d, authors_90d, last_changed_ms
1701 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1702 )?;
1703 let rows = stmt.query_map(params![snapshot_id], |row| {
1704 Ok(FileFact {
1705 snapshot_id: row.get(0)?,
1706 path: row.get(1)?,
1707 language: row.get(2)?,
1708 bytes: row.get::<_, i64>(3)? as u64,
1709 loc: row.get::<_, i64>(4)? as u32,
1710 sloc: row.get::<_, i64>(5)? as u32,
1711 complexity_total: row.get::<_, i64>(6)? as u32,
1712 max_fn_complexity: row.get::<_, i64>(7)? as u32,
1713 symbol_count: row.get::<_, i64>(8)? as u32,
1714 import_count: row.get::<_, i64>(9)? as u32,
1715 fan_in: row.get::<_, i64>(10)? as u32,
1716 fan_out: row.get::<_, i64>(11)? as u32,
1717 churn_30d: row.get::<_, i64>(12)? as u32,
1718 churn_90d: row.get::<_, i64>(13)? as u32,
1719 authors_90d: row.get::<_, i64>(14)? as u32,
1720 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1721 })
1722 })?;
1723 Ok(rows.filter_map(|row| row.ok()).collect())
1724 }
1725
1726 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1727 let mut stmt = self.conn.prepare(
1728 "SELECT from_id, to_id, kind, weight
1729 FROM repo_edges WHERE snapshot_id = ?1
1730 ORDER BY kind, from_id, to_id",
1731 )?;
1732 let rows = stmt.query_map(params![snapshot_id], |row| {
1733 Ok(RepoEdge {
1734 from_path: row.get(0)?,
1735 to_path: row.get(1)?,
1736 kind: row.get(2)?,
1737 weight: row.get::<_, i64>(3)? as u32,
1738 })
1739 })?;
1740 Ok(rows.filter_map(|row| row.ok()).collect())
1741 }
1742
1743 pub fn tool_spans_in_window(
1744 &self,
1745 workspace: &str,
1746 start_ms: u64,
1747 end_ms: u64,
1748 ) -> Result<Vec<ToolSpanView>> {
1749 let mut stmt = self.conn.prepare(
1750 "SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1751 ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
1752 ts.parent_span_id, ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count
1753 FROM tool_spans ts
1754 JOIN sessions s ON s.id = ts.session_id
1755 WHERE s.workspace = ?1
1756 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1757 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1758 ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1759 )?;
1760 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1761 let paths_json: String = row.get(8)?;
1762 Ok(ToolSpanView {
1763 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1764 tool: row
1765 .get::<_, Option<String>>(1)?
1766 .unwrap_or_else(|| "unknown".into()),
1767 status: row.get(2)?,
1768 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1769 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1770 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1771 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1772 cost_usd_e6: row.get(7)?,
1773 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1774 parent_span_id: row.get(9)?,
1775 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1776 subtree_cost_usd_e6: row.get(11)?,
1777 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1778 })
1779 })?;
1780 Ok(rows.filter_map(|row| row.ok()).collect())
1781 }
1782
1783 pub fn session_span_tree(
1784 &self,
1785 session_id: &str,
1786 ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
1787 let mut stmt = self.conn.prepare(
1788 "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
1789 reasoning_tokens, cost_usd_e6, paths_json,
1790 parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
1791 FROM tool_spans
1792 WHERE session_id = ?1
1793 ORDER BY depth ASC, started_at_ms ASC",
1794 )?;
1795 let rows = stmt.query_map(params![session_id], |row| {
1796 let paths_json: String = row.get(8)?;
1797 Ok(crate::metrics::types::ToolSpanView {
1798 span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
1799 tool: row
1800 .get::<_, Option<String>>(1)?
1801 .unwrap_or_else(|| "unknown".into()),
1802 status: row.get(2)?,
1803 lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
1804 tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1805 tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1806 reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
1807 cost_usd_e6: row.get(7)?,
1808 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1809 parent_span_id: row.get(9)?,
1810 depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
1811 subtree_cost_usd_e6: row.get(11)?,
1812 subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
1813 })
1814 })?;
1815 let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1816 Ok(crate::store::span_tree::build_tree(spans))
1817 }
1818
1819 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1820 let mut stmt = self.conn.prepare(
1821 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1822 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1823 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1824 )?;
1825 let rows = stmt.query_map(params![session_id], |row| {
1826 let paths_json: String = row.get(12)?;
1827 Ok(ToolSpanSyncRow {
1828 span_id: row.get(0)?,
1829 session_id: row.get(1)?,
1830 tool: row.get(2)?,
1831 tool_call_id: row.get(3)?,
1832 status: row.get(4)?,
1833 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1834 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1835 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1836 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1837 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1838 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1839 cost_usd_e6: row.get(11)?,
1840 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1841 })
1842 })?;
1843 Ok(rows.filter_map(|row| row.ok()).collect())
1844 }
1845
1846 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1847 self.conn.execute(
1848 "INSERT OR REPLACE INTO session_evals
1849 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1850 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1851 rusqlite::params![
1852 eval.id,
1853 eval.session_id,
1854 eval.judge_model,
1855 eval.rubric_id,
1856 eval.score,
1857 eval.rationale,
1858 eval.flagged as i64,
1859 eval.created_at_ms as i64,
1860 ],
1861 )?;
1862 Ok(())
1863 }
1864
1865 pub fn list_evals_in_window(
1866 &self,
1867 start_ms: u64,
1868 end_ms: u64,
1869 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1870 let mut stmt = self.conn.prepare(
1871 "SELECT id, session_id, judge_model, rubric_id, score,
1872 rationale, flagged, created_at_ms
1873 FROM session_evals
1874 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1875 ORDER BY created_at_ms ASC",
1876 )?;
1877 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1878 Ok(crate::eval::types::EvalRow {
1879 id: r.get(0)?,
1880 session_id: r.get(1)?,
1881 judge_model: r.get(2)?,
1882 rubric_id: r.get(3)?,
1883 score: r.get(4)?,
1884 rationale: r.get(5)?,
1885 flagged: r.get::<_, i64>(6)? != 0,
1886 created_at_ms: r.get::<_, i64>(7)? as u64,
1887 })
1888 })?;
1889 rows.collect()
1890 }
1891
1892 pub fn list_evals_for_session(
1893 &self,
1894 session_id: &str,
1895 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1896 let mut stmt = self.conn.prepare(
1897 "SELECT id, session_id, judge_model, rubric_id, score,
1898 rationale, flagged, created_at_ms
1899 FROM session_evals
1900 WHERE session_id = ?1
1901 ORDER BY created_at_ms DESC",
1902 )?;
1903 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1904 Ok(crate::eval::types::EvalRow {
1905 id: r.get(0)?,
1906 session_id: r.get(1)?,
1907 judge_model: r.get(2)?,
1908 rubric_id: r.get(3)?,
1909 score: r.get(4)?,
1910 rationale: r.get(5)?,
1911 flagged: r.get::<_, i64>(6)? != 0,
1912 created_at_ms: r.get::<_, i64>(7)? as u64,
1913 })
1914 })?;
1915 rows.collect()
1916 }
1917
1918 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
1919 use crate::feedback::types::FeedbackLabel;
1920 self.conn.execute(
1921 "INSERT OR REPLACE INTO session_feedback
1922 (id, session_id, score, label, note, created_at_ms)
1923 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1924 rusqlite::params![
1925 r.id,
1926 r.session_id,
1927 r.score.as_ref().map(|s| s.0 as i64),
1928 r.label.as_ref().map(FeedbackLabel::to_db_str),
1929 r.note,
1930 r.created_at_ms as i64,
1931 ],
1932 )?;
1933 let payload = serde_json::to_string(r).unwrap_or_default();
1934 self.conn.execute(
1935 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
1936 VALUES (?1, 'session_feedback', ?2, 0)",
1937 rusqlite::params![r.session_id, payload],
1938 )?;
1939 Ok(())
1940 }
1941
1942 pub fn list_feedback_in_window(
1943 &self,
1944 start_ms: u64,
1945 end_ms: u64,
1946 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
1947 let mut stmt = self.conn.prepare(
1948 "SELECT id, session_id, score, label, note, created_at_ms
1949 FROM session_feedback
1950 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1951 ORDER BY created_at_ms ASC",
1952 )?;
1953 let rows = stmt.query_map(
1954 rusqlite::params![start_ms as i64, end_ms as i64],
1955 feedback_row,
1956 )?;
1957 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1958 }
1959
1960 pub fn feedback_for_sessions(
1961 &self,
1962 ids: &[String],
1963 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
1964 if ids.is_empty() {
1965 return Ok(std::collections::HashMap::new());
1966 }
1967 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1968 let sql = format!(
1969 "SELECT id, session_id, score, label, note, created_at_ms
1970 FROM session_feedback WHERE session_id IN ({placeholders})
1971 ORDER BY created_at_ms DESC"
1972 );
1973 let mut stmt = self.conn.prepare(&sql)?;
1974 let params: Vec<&dyn rusqlite::ToSql> =
1975 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1976 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
1977 let mut map = std::collections::HashMap::new();
1978 for row in rows {
1979 let r = row?;
1980 map.entry(r.session_id.clone()).or_insert(r);
1981 }
1982 Ok(map)
1983 }
1984
1985 pub fn upsert_session_outcome(&self, row: &SessionOutcomeRow) -> Result<()> {
1986 self.conn.execute(
1987 "INSERT INTO session_outcomes (
1988 session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
1989 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
1990 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
1991 ON CONFLICT(session_id) DO UPDATE SET
1992 test_passed=excluded.test_passed,
1993 test_failed=excluded.test_failed,
1994 test_skipped=excluded.test_skipped,
1995 build_ok=excluded.build_ok,
1996 lint_errors=excluded.lint_errors,
1997 revert_lines_14d=excluded.revert_lines_14d,
1998 pr_open=excluded.pr_open,
1999 ci_ok=excluded.ci_ok,
2000 measured_at_ms=excluded.measured_at_ms,
2001 measure_error=excluded.measure_error",
2002 params![
2003 row.session_id,
2004 row.test_passed,
2005 row.test_failed,
2006 row.test_skipped,
2007 row.build_ok.map(bool_to_i64),
2008 row.lint_errors,
2009 row.revert_lines_14d,
2010 row.pr_open,
2011 row.ci_ok.map(bool_to_i64),
2012 row.measured_at_ms as i64,
2013 row.measure_error.as_deref(),
2014 ],
2015 )?;
2016 Ok(())
2017 }
2018
2019 pub fn get_session_outcome(&self, session_id: &str) -> Result<Option<SessionOutcomeRow>> {
2020 let mut stmt = self.conn.prepare(
2021 "SELECT session_id, test_passed, test_failed, test_skipped, build_ok, lint_errors,
2022 revert_lines_14d, pr_open, ci_ok, measured_at_ms, measure_error
2023 FROM session_outcomes WHERE session_id = ?1",
2024 )?;
2025 let row = stmt
2026 .query_row(params![session_id], outcome_row)
2027 .optional()?;
2028 Ok(row)
2029 }
2030
2031 pub fn list_session_outcomes_in_window(
2033 &self,
2034 workspace: &str,
2035 start_ms: u64,
2036 end_ms: u64,
2037 ) -> Result<Vec<SessionOutcomeRow>> {
2038 let mut stmt = self.conn.prepare(
2039 "SELECT o.session_id, o.test_passed, o.test_failed, o.test_skipped, o.build_ok, o.lint_errors,
2040 o.revert_lines_14d, o.pr_open, o.ci_ok, o.measured_at_ms, o.measure_error
2041 FROM session_outcomes o
2042 JOIN sessions s ON s.id = o.session_id
2043 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2044 ORDER BY o.measured_at_ms ASC",
2045 )?;
2046 let rows = stmt.query_map(
2047 params![workspace, start_ms as i64, end_ms as i64],
2048 outcome_row,
2049 )?;
2050 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2051 }
2052
2053 pub fn append_session_sample(
2054 &self,
2055 session_id: &str,
2056 ts_ms: u64,
2057 pid: u32,
2058 cpu_percent: Option<f64>,
2059 rss_bytes: Option<u64>,
2060 ) -> Result<()> {
2061 self.conn.execute(
2062 "INSERT OR REPLACE INTO session_samples (session_id, ts_ms, pid, cpu_percent, rss_bytes)
2063 VALUES (?1, ?2, ?3, ?4, ?5)",
2064 params![
2065 session_id,
2066 ts_ms as i64,
2067 pid as i64,
2068 cpu_percent,
2069 rss_bytes.map(|b| b as i64)
2070 ],
2071 )?;
2072 Ok(())
2073 }
2074
2075 pub fn list_session_sample_aggs_in_window(
2077 &self,
2078 workspace: &str,
2079 start_ms: u64,
2080 end_ms: u64,
2081 ) -> Result<Vec<SessionSampleAgg>> {
2082 let mut stmt = self.conn.prepare(
2083 "SELECT ss.session_id, COUNT(*) AS n,
2084 MAX(ss.cpu_percent), MAX(ss.rss_bytes)
2085 FROM session_samples ss
2086 JOIN sessions s ON s.id = ss.session_id
2087 WHERE s.workspace = ?1 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3
2088 GROUP BY ss.session_id",
2089 )?;
2090 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2091 let sid: String = r.get(0)?;
2092 let n: i64 = r.get(1)?;
2093 let max_cpu: Option<f64> = r.get(2)?;
2094 let max_rss: Option<i64> = r.get(3)?;
2095 Ok(SessionSampleAgg {
2096 session_id: sid,
2097 sample_count: n as u64,
2098 max_cpu_percent: max_cpu.unwrap_or(0.0),
2099 max_rss_bytes: max_rss.map(|x| x as u64).unwrap_or(0),
2100 })
2101 })?;
2102 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
2103 }
2104
2105 pub fn list_sessions_for_eval(
2106 &self,
2107 since_ms: u64,
2108 min_cost_usd: f64,
2109 ) -> Result<Vec<crate::core::event::SessionRecord>> {
2110 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
2111 let mut stmt = self.conn.prepare(
2112 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
2113 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
2114 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint,
2115 s.parent_session_id, s.agent_version, s.os, s.arch, s.repo_file_count, s.repo_total_loc
2116 FROM sessions s
2117 WHERE s.started_at_ms >= ?1
2118 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
2119 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
2120 ORDER BY s.started_at_ms DESC",
2121 )?;
2122 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
2123 Ok((
2124 r.get::<_, String>(0)?,
2125 r.get::<_, String>(1)?,
2126 r.get::<_, Option<String>>(2)?,
2127 r.get::<_, String>(3)?,
2128 r.get::<_, i64>(4)?,
2129 r.get::<_, Option<i64>>(5)?,
2130 r.get::<_, String>(6)?,
2131 r.get::<_, String>(7)?,
2132 r.get::<_, Option<String>>(8)?,
2133 r.get::<_, Option<String>>(9)?,
2134 r.get::<_, Option<String>>(10)?,
2135 r.get::<_, Option<i64>>(11)?,
2136 r.get::<_, Option<i64>>(12)?,
2137 r.get::<_, Option<String>>(13)?,
2138 r.get::<_, Option<String>>(14)?,
2139 r.get::<_, Option<String>>(15)?,
2140 r.get::<_, Option<String>>(16)?,
2141 r.get::<_, Option<String>>(17)?,
2142 r.get::<_, Option<String>>(18)?,
2143 r.get::<_, Option<i64>>(19)?,
2144 r.get::<_, Option<i64>>(20)?,
2145 ))
2146 })?;
2147 let mut out = Vec::new();
2148 for row in rows {
2149 let (
2150 id,
2151 agent,
2152 model,
2153 workspace,
2154 started,
2155 ended,
2156 status_str,
2157 trace,
2158 start_commit,
2159 end_commit,
2160 branch,
2161 dirty_start,
2162 dirty_end,
2163 source,
2164 prompt_fingerprint,
2165 parent_session_id,
2166 agent_version,
2167 os,
2168 arch,
2169 repo_file_count,
2170 repo_total_loc,
2171 ) = row?;
2172 out.push(crate::core::event::SessionRecord {
2173 id,
2174 agent,
2175 model,
2176 workspace,
2177 started_at_ms: started as u64,
2178 ended_at_ms: ended.map(|v| v as u64),
2179 status: status_from_str(&status_str),
2180 trace_path: trace,
2181 start_commit,
2182 end_commit,
2183 branch,
2184 dirty_start: dirty_start.map(i64_to_bool),
2185 dirty_end: dirty_end.map(i64_to_bool),
2186 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
2187 prompt_fingerprint,
2188 parent_session_id,
2189 agent_version,
2190 os,
2191 arch,
2192 repo_file_count: repo_file_count.map(|v| v as u32),
2193 repo_total_loc: repo_total_loc.map(|v| v as u64),
2194 });
2195 }
2196 Ok(out)
2197 }
2198
2199 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
2200 self.conn.execute(
2201 "INSERT OR IGNORE INTO prompt_snapshots
2202 (fingerprint, captured_at_ms, files_json, total_bytes)
2203 VALUES (?1, ?2, ?3, ?4)",
2204 params![
2205 snap.fingerprint,
2206 snap.captured_at_ms as i64,
2207 snap.files_json,
2208 snap.total_bytes as i64
2209 ],
2210 )?;
2211 Ok(())
2212 }
2213
2214 pub fn get_prompt_snapshot(
2215 &self,
2216 fingerprint: &str,
2217 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
2218 self.conn
2219 .query_row(
2220 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2221 FROM prompt_snapshots WHERE fingerprint = ?1",
2222 params![fingerprint],
2223 |r| {
2224 Ok(crate::prompt::PromptSnapshot {
2225 fingerprint: r.get(0)?,
2226 captured_at_ms: r.get::<_, i64>(1)? as u64,
2227 files_json: r.get(2)?,
2228 total_bytes: r.get::<_, i64>(3)? as u64,
2229 })
2230 },
2231 )
2232 .optional()
2233 .map_err(Into::into)
2234 }
2235
2236 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
2237 let mut stmt = self.conn.prepare(
2238 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
2239 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
2240 )?;
2241 let rows = stmt.query_map([], |r| {
2242 Ok(crate::prompt::PromptSnapshot {
2243 fingerprint: r.get(0)?,
2244 captured_at_ms: r.get::<_, i64>(1)? as u64,
2245 files_json: r.get(2)?,
2246 total_bytes: r.get::<_, i64>(3)? as u64,
2247 })
2248 })?;
2249 Ok(rows.filter_map(|r| r.ok()).collect())
2250 }
2251
2252 pub fn sessions_with_prompt_fingerprint(
2254 &self,
2255 workspace: &str,
2256 start_ms: u64,
2257 end_ms: u64,
2258 ) -> Result<Vec<(String, String)>> {
2259 let mut stmt = self.conn.prepare(
2260 "SELECT id, prompt_fingerprint FROM sessions
2261 WHERE workspace = ?1
2262 AND started_at_ms >= ?2 AND started_at_ms < ?3
2263 AND prompt_fingerprint IS NOT NULL",
2264 )?;
2265 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
2266 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
2267 })?;
2268 Ok(rows.filter_map(|r| r.ok()).collect())
2269 }
2270}
2271
2272fn now_ms() -> u64 {
2273 std::time::SystemTime::now()
2274 .duration_since(std::time::UNIX_EPOCH)
2275 .unwrap_or_default()
2276 .as_millis() as u64
2277}
2278
2279fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
2280 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
2281}
2282
2283fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
2284 let cost: i64 = conn.query_row(
2285 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
2286 params![workspace], |r| r.get(0),
2287 )?;
2288 let with_cost: i64 = conn.query_row(
2289 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
2290 params![workspace], |r| r.get(0),
2291 )?;
2292 Ok((cost, with_cost as u64))
2293}
2294
2295fn outcome_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionOutcomeRow> {
2296 let build_raw: Option<i64> = r.get(4)?;
2297 let ci_raw: Option<i64> = r.get(8)?;
2298 Ok(SessionOutcomeRow {
2299 session_id: r.get(0)?,
2300 test_passed: r.get(1)?,
2301 test_failed: r.get(2)?,
2302 test_skipped: r.get(3)?,
2303 build_ok: build_raw.map(|v| v != 0),
2304 lint_errors: r.get(5)?,
2305 revert_lines_14d: r.get(6)?,
2306 pr_open: r.get(7)?,
2307 ci_ok: ci_raw.map(|v| v != 0),
2308 measured_at_ms: r.get::<_, i64>(9)? as u64,
2309 measure_error: r.get(10)?,
2310 })
2311}
2312
2313fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
2314 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
2315 let score = r
2316 .get::<_, Option<i64>>(2)?
2317 .and_then(|v| FeedbackScore::new(v as u8));
2318 let label = r
2319 .get::<_, Option<String>>(3)?
2320 .and_then(|s| FeedbackLabel::from_str_opt(&s));
2321 Ok(FeedbackRecord {
2322 id: r.get(0)?,
2323 session_id: r.get(1)?,
2324 score,
2325 label,
2326 note: r.get(4)?,
2327 created_at_ms: r.get::<_, i64>(5)? as u64,
2328 })
2329}
2330
2331fn day_label(day_idx: u64) -> &'static str {
2332 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2333}
2334
2335fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2336 let week_ago = now.saturating_sub(7 * 86_400_000);
2337 let mut stmt = conn
2338 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2339 let days: Vec<u64> = stmt
2340 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2341 .filter_map(|r| r.ok())
2342 .map(|v| v as u64 / 86_400_000)
2343 .collect();
2344 let today = now / 86_400_000;
2345 Ok((0u64..7)
2346 .map(|i| {
2347 let d = today.saturating_sub(6 - i);
2348 (
2349 day_label(d).to_string(),
2350 days.iter().filter(|&&x| x == d).count() as u64,
2351 )
2352 })
2353 .collect())
2354}
2355
2356fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2357 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2358 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2359 s.dirty_end,s.repo_binding_source,s.prompt_fingerprint,s.parent_session_id,\
2360 s.agent_version,s.os,s.arch,s.repo_file_count,s.repo_total_loc,\
2361 COUNT(e.id) FROM sessions s \
2362 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2363 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2364 let mut stmt = conn.prepare(sql)?;
2365 let out: Vec<(SessionRecord, u64)> = stmt
2366 .query_map(params![workspace], |r| {
2367 let st: String = r.get(6)?;
2368 Ok((
2369 SessionRecord {
2370 id: r.get(0)?,
2371 agent: r.get(1)?,
2372 model: r.get(2)?,
2373 workspace: r.get(3)?,
2374 started_at_ms: r.get::<_, i64>(4)? as u64,
2375 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2376 status: status_from_str(&st),
2377 trace_path: r.get(7)?,
2378 start_commit: r.get(8)?,
2379 end_commit: r.get(9)?,
2380 branch: r.get(10)?,
2381 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2382 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2383 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2384 prompt_fingerprint: r.get(14)?,
2385 parent_session_id: r.get(15)?,
2386 agent_version: r.get(16)?,
2387 os: r.get(17)?,
2388 arch: r.get(18)?,
2389 repo_file_count: r.get::<_, Option<i64>>(19)?.map(|v| v as u32),
2390 repo_total_loc: r.get::<_, Option<i64>>(20)?.map(|v| v as u64),
2391 },
2392 r.get::<_, i64>(21)? as u64,
2393 ))
2394 })?
2395 .filter_map(|r| r.ok())
2396 .collect();
2397 Ok(out)
2398}
2399
2400fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2401 let mut stmt = conn.prepare(
2402 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2403 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2404 )?;
2405 let out: Vec<(String, u64)> = stmt
2406 .query_map(params![workspace], |r| {
2407 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2408 })?
2409 .filter_map(|r| r.ok())
2410 .collect();
2411 Ok(out)
2412}
2413
2414fn status_from_str(s: &str) -> SessionStatus {
2415 match s {
2416 "Running" => SessionStatus::Running,
2417 "Waiting" => SessionStatus::Waiting,
2418 "Idle" => SessionStatus::Idle,
2419 _ => SessionStatus::Done,
2420 }
2421}
2422
2423fn kind_from_str(s: &str) -> EventKind {
2424 match s {
2425 "ToolCall" => EventKind::ToolCall,
2426 "ToolResult" => EventKind::ToolResult,
2427 "Message" => EventKind::Message,
2428 "Error" => EventKind::Error,
2429 "Cost" => EventKind::Cost,
2430 "Hook" => EventKind::Hook,
2431 "Lifecycle" => EventKind::Lifecycle,
2432 _ => EventKind::Hook,
2433 }
2434}
2435
2436fn source_from_str(s: &str) -> EventSource {
2437 match s {
2438 "Tail" => EventSource::Tail,
2439 "Hook" => EventSource::Hook,
2440 _ => EventSource::Proxy,
2441 }
2442}
2443
2444fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2445 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2446 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2447 ensure_column(conn, "sessions", "branch", "TEXT")?;
2448 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2449 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2450 ensure_column(
2451 conn,
2452 "sessions",
2453 "repo_binding_source",
2454 "TEXT NOT NULL DEFAULT ''",
2455 )?;
2456 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2457 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2458 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2459 ensure_column(conn, "events", "stop_reason", "TEXT")?;
2460 ensure_column(conn, "events", "latency_ms", "INTEGER")?;
2461 ensure_column(conn, "events", "ttft_ms", "INTEGER")?;
2462 ensure_column(conn, "events", "retry_count", "INTEGER")?;
2463 ensure_column(conn, "events", "context_used_tokens", "INTEGER")?;
2464 ensure_column(conn, "events", "context_max_tokens", "INTEGER")?;
2465 ensure_column(conn, "events", "cache_creation_tokens", "INTEGER")?;
2466 ensure_column(conn, "events", "cache_read_tokens", "INTEGER")?;
2467 ensure_column(conn, "events", "system_prompt_tokens", "INTEGER")?;
2468 ensure_column(
2469 conn,
2470 "sync_outbox",
2471 "kind",
2472 "TEXT NOT NULL DEFAULT 'events'",
2473 )?;
2474 ensure_column(
2475 conn,
2476 "experiments",
2477 "state",
2478 "TEXT NOT NULL DEFAULT 'Draft'",
2479 )?;
2480 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2481 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2482 ensure_column(conn, "sessions", "parent_session_id", "TEXT")?;
2483 ensure_column(conn, "sessions", "agent_version", "TEXT")?;
2484 ensure_column(conn, "sessions", "os", "TEXT")?;
2485 ensure_column(conn, "sessions", "arch", "TEXT")?;
2486 ensure_column(conn, "sessions", "repo_file_count", "INTEGER")?;
2487 ensure_column(conn, "sessions", "repo_total_loc", "INTEGER")?;
2488 ensure_column(conn, "tool_spans", "parent_span_id", "TEXT")?;
2489 ensure_column(conn, "tool_spans", "depth", "INTEGER NOT NULL DEFAULT 0")?;
2490 ensure_column(conn, "tool_spans", "subtree_cost_usd_e6", "INTEGER")?;
2491 ensure_column(conn, "tool_spans", "subtree_token_count", "INTEGER")?;
2492 conn.execute_batch(
2493 "CREATE INDEX IF NOT EXISTS tool_spans_parent ON tool_spans(parent_span_id);
2494 CREATE INDEX IF NOT EXISTS tool_spans_session_depth ON tool_spans(session_id, depth);",
2495 )?;
2496 Ok(())
2497}
2498
2499fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2500 if has_column(conn, table, column)? {
2501 return Ok(());
2502 }
2503 conn.execute(
2504 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2505 [],
2506 )?;
2507 Ok(())
2508}
2509
2510fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2511 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2512 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2513 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2514}
2515
2516fn bool_to_i64(v: bool) -> i64 {
2517 if v { 1 } else { 0 }
2518}
2519
2520fn i64_to_bool(v: i64) -> bool {
2521 v != 0
2522}
2523
2524fn empty_to_none(s: String) -> Option<String> {
2525 if s.is_empty() { None } else { Some(s) }
2526}
2527
2528#[cfg(test)]
2529mod tests {
2530 use super::*;
2531 use serde_json::json;
2532 use std::collections::HashSet;
2533 use tempfile::TempDir;
2534
2535 fn make_session(id: &str) -> SessionRecord {
2536 SessionRecord {
2537 id: id.to_string(),
2538 agent: "cursor".to_string(),
2539 model: None,
2540 workspace: "/ws".to_string(),
2541 started_at_ms: 1000,
2542 ended_at_ms: None,
2543 status: SessionStatus::Done,
2544 trace_path: "/trace".to_string(),
2545 start_commit: None,
2546 end_commit: None,
2547 branch: None,
2548 dirty_start: None,
2549 dirty_end: None,
2550 repo_binding_source: None,
2551 prompt_fingerprint: None,
2552 parent_session_id: None,
2553 agent_version: None,
2554 os: None,
2555 arch: None,
2556 repo_file_count: None,
2557 repo_total_loc: None,
2558 }
2559 }
2560
2561 fn make_event(session_id: &str, seq: u64) -> Event {
2562 Event {
2563 session_id: session_id.to_string(),
2564 seq,
2565 ts_ms: 1000 + seq * 100,
2566 ts_exact: false,
2567 kind: EventKind::ToolCall,
2568 source: EventSource::Tail,
2569 tool: Some("read_file".to_string()),
2570 tool_call_id: Some(format!("call_{seq}")),
2571 tokens_in: None,
2572 tokens_out: None,
2573 reasoning_tokens: None,
2574 cost_usd_e6: None,
2575 stop_reason: None,
2576 latency_ms: None,
2577 ttft_ms: None,
2578 retry_count: None,
2579 context_used_tokens: None,
2580 context_max_tokens: None,
2581 cache_creation_tokens: None,
2582 cache_read_tokens: None,
2583 system_prompt_tokens: None,
2584 payload: json!({}),
2585 }
2586 }
2587
2588 #[test]
2589 fn open_and_wal_mode() {
2590 let dir = TempDir::new().unwrap();
2591 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2592 let mode: String = store
2593 .conn
2594 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2595 .unwrap();
2596 assert_eq!(mode, "wal");
2597 }
2598
2599 #[test]
2600 fn upsert_and_get_session() {
2601 let dir = TempDir::new().unwrap();
2602 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2603 let s = make_session("s1");
2604 store.upsert_session(&s).unwrap();
2605
2606 let got = store.get_session("s1").unwrap().unwrap();
2607 assert_eq!(got.id, "s1");
2608 assert_eq!(got.status, SessionStatus::Done);
2609 }
2610
2611 #[test]
2612 fn append_and_list_events_round_trip() {
2613 let dir = TempDir::new().unwrap();
2614 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2615 let s = make_session("s2");
2616 store.upsert_session(&s).unwrap();
2617 store.append_event(&make_event("s2", 0)).unwrap();
2618 store.append_event(&make_event("s2", 1)).unwrap();
2619
2620 let sessions = store.list_sessions("/ws").unwrap();
2621 assert_eq!(sessions.len(), 1);
2622 assert_eq!(sessions[0].id, "s2");
2623 }
2624
2625 #[test]
2626 fn summary_stats_empty() {
2627 let dir = TempDir::new().unwrap();
2628 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2629 let stats = store.summary_stats("/ws").unwrap();
2630 assert_eq!(stats.session_count, 0);
2631 assert_eq!(stats.total_cost_usd_e6, 0);
2632 }
2633
2634 #[test]
2635 fn summary_stats_counts_sessions() {
2636 let dir = TempDir::new().unwrap();
2637 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2638 store.upsert_session(&make_session("a")).unwrap();
2639 store.upsert_session(&make_session("b")).unwrap();
2640 let stats = store.summary_stats("/ws").unwrap();
2641 assert_eq!(stats.session_count, 2);
2642 assert_eq!(stats.by_agent.len(), 1);
2643 assert_eq!(stats.by_agent[0].0, "cursor");
2644 assert_eq!(stats.by_agent[0].1, 2);
2645 }
2646
2647 #[test]
2648 fn list_events_for_session_round_trip() {
2649 let dir = TempDir::new().unwrap();
2650 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2651 store.upsert_session(&make_session("s4")).unwrap();
2652 store.append_event(&make_event("s4", 0)).unwrap();
2653 store.append_event(&make_event("s4", 1)).unwrap();
2654 let events = store.list_events_for_session("s4").unwrap();
2655 assert_eq!(events.len(), 2);
2656 assert_eq!(events[0].seq, 0);
2657 assert_eq!(events[1].seq, 1);
2658 }
2659
2660 #[test]
2661 fn append_event_dedup() {
2662 let dir = TempDir::new().unwrap();
2663 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2664 store.upsert_session(&make_session("s5")).unwrap();
2665 store.append_event(&make_event("s5", 0)).unwrap();
2666 store.append_event(&make_event("s5", 0)).unwrap();
2668 let events = store.list_events_for_session("s5").unwrap();
2669 assert_eq!(events.len(), 1);
2670 }
2671
2672 #[test]
2673 fn upsert_idempotent() {
2674 let dir = TempDir::new().unwrap();
2675 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2676 let mut s = make_session("s3");
2677 store.upsert_session(&s).unwrap();
2678 s.status = SessionStatus::Running;
2679 store.upsert_session(&s).unwrap();
2680
2681 let got = store.get_session("s3").unwrap().unwrap();
2682 assert_eq!(got.status, SessionStatus::Running);
2683 }
2684
2685 #[test]
2686 fn append_event_indexes_path_from_payload() {
2687 let dir = TempDir::new().unwrap();
2688 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2689 store.upsert_session(&make_session("sx")).unwrap();
2690 let mut ev = make_event("sx", 0);
2691 ev.payload = json!({"input": {"path": "src/lib.rs"}});
2692 store.append_event(&ev).unwrap();
2693 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2694 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2695 }
2696
2697 #[test]
2698 fn update_session_status_changes_status() {
2699 let dir = TempDir::new().unwrap();
2700 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2701 store.upsert_session(&make_session("s6")).unwrap();
2702 store
2703 .update_session_status("s6", SessionStatus::Running)
2704 .unwrap();
2705 let got = store.get_session("s6").unwrap().unwrap();
2706 assert_eq!(got.status, SessionStatus::Running);
2707 }
2708
2709 #[test]
2710 fn prune_sessions_removes_old_rows_and_keeps_recent() {
2711 let dir = TempDir::new().unwrap();
2712 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2713 let mut old = make_session("old");
2714 old.started_at_ms = 1_000;
2715 let mut new = make_session("new");
2716 new.started_at_ms = 9_000_000_000_000;
2717 store.upsert_session(&old).unwrap();
2718 store.upsert_session(&new).unwrap();
2719 store.append_event(&make_event("old", 0)).unwrap();
2720
2721 let stats = store.prune_sessions_started_before(5_000).unwrap();
2722 assert_eq!(
2723 stats,
2724 PruneStats {
2725 sessions_removed: 1,
2726 events_removed: 1,
2727 }
2728 );
2729 assert!(store.get_session("old").unwrap().is_none());
2730 assert!(store.get_session("new").unwrap().is_some());
2731 let sessions = store.list_sessions("/ws").unwrap();
2732 assert_eq!(sessions.len(), 1);
2733 assert_eq!(sessions[0].id, "new");
2734 }
2735
2736 #[test]
2737 fn append_event_indexes_rules_from_payload() {
2738 let dir = TempDir::new().unwrap();
2739 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2740 store.upsert_session(&make_session("sr")).unwrap();
2741 let mut ev = make_event("sr", 0);
2742 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2743 store.append_event(&ev).unwrap();
2744 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2745 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2746 }
2747
2748 #[test]
2749 fn guidance_report_counts_skill_and_rule_sessions() {
2750 let dir = TempDir::new().unwrap();
2751 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2752 store.upsert_session(&make_session("sx")).unwrap();
2753 let mut ev = make_event("sx", 0);
2754 ev.payload =
2755 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2756 ev.cost_usd_e6 = Some(500_000);
2757 store.append_event(&ev).unwrap();
2758
2759 let mut skill_slugs = HashSet::new();
2760 skill_slugs.insert("tdd".into());
2761 let mut rule_slugs = HashSet::new();
2762 rule_slugs.insert("style".into());
2763
2764 let rep = store
2765 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2766 .unwrap();
2767 assert_eq!(rep.sessions_in_window, 1);
2768 let tdd = rep
2769 .rows
2770 .iter()
2771 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2772 .unwrap();
2773 assert_eq!(tdd.sessions, 1);
2774 assert!(tdd.on_disk);
2775 let style = rep
2776 .rows
2777 .iter()
2778 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2779 .unwrap();
2780 assert_eq!(style.sessions, 1);
2781 assert!(style.on_disk);
2782 }
2783
2784 #[test]
2785 fn prune_sessions_removes_rules_used_rows() {
2786 let dir = TempDir::new().unwrap();
2787 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2788 let mut old = make_session("old_r");
2789 old.started_at_ms = 1_000;
2790 store.upsert_session(&old).unwrap();
2791 let mut ev = make_event("old_r", 0);
2792 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2793 store.append_event(&ev).unwrap();
2794
2795 store.prune_sessions_started_before(5_000).unwrap();
2796 let n: i64 = store
2797 .conn
2798 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2799 .unwrap();
2800 assert_eq!(n, 0);
2801 }
2802}