1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord, ToolSpanView};
7use crate::store::event_index::index_event_derived;
8use crate::store::tool_span_index::rebuild_tool_spans_for_session;
9use crate::sync::context::SyncIngestContext;
10use crate::sync::outbound::outbound_event_from_row;
11use crate::sync::redact::redact_payload;
12use crate::sync::smart::enqueue_tool_spans_for_session;
13use anyhow::{Context, Result};
14use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params};
15use std::collections::{HashMap, HashSet};
16use std::path::Path;
17
18const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
21
22const MIGRATIONS: &[&str] = &[
23 "CREATE TABLE IF NOT EXISTS sessions (
24 id TEXT PRIMARY KEY,
25 agent TEXT NOT NULL,
26 model TEXT,
27 workspace TEXT NOT NULL,
28 started_at_ms INTEGER NOT NULL,
29 ended_at_ms INTEGER,
30 status TEXT NOT NULL,
31 trace_path TEXT NOT NULL
32 )",
33 "CREATE TABLE IF NOT EXISTS events (
34 id INTEGER PRIMARY KEY AUTOINCREMENT,
35 session_id TEXT NOT NULL,
36 seq INTEGER NOT NULL,
37 ts_ms INTEGER NOT NULL,
38 kind TEXT NOT NULL,
39 source TEXT NOT NULL,
40 tool TEXT,
41 tokens_in INTEGER,
42 tokens_out INTEGER,
43 cost_usd_e6 INTEGER,
44 payload TEXT NOT NULL
45 )",
46 "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)",
47 "CREATE TABLE IF NOT EXISTS files_touched (
48 id INTEGER PRIMARY KEY AUTOINCREMENT,
49 session_id TEXT NOT NULL,
50 path TEXT NOT NULL
51 )",
52 "CREATE TABLE IF NOT EXISTS skills_used (
53 id INTEGER PRIMARY KEY AUTOINCREMENT,
54 session_id TEXT NOT NULL,
55 skill TEXT NOT NULL
56 )",
57 "CREATE TABLE IF NOT EXISTS sync_outbox (
58 id INTEGER PRIMARY KEY AUTOINCREMENT,
59 session_id TEXT NOT NULL,
60 payload TEXT NOT NULL,
61 sent INTEGER NOT NULL DEFAULT 0
62 )",
63 "CREATE TABLE IF NOT EXISTS experiments (
64 id TEXT PRIMARY KEY,
65 name TEXT NOT NULL,
66 created_at_ms INTEGER NOT NULL,
67 metadata TEXT NOT NULL DEFAULT '{}'
68 )",
69 "CREATE TABLE IF NOT EXISTS experiment_tags (
70 experiment_id TEXT NOT NULL,
71 session_id TEXT NOT NULL,
72 variant TEXT NOT NULL,
73 PRIMARY KEY (experiment_id, session_id)
74 )",
75 "CREATE UNIQUE INDEX IF NOT EXISTS events_session_seq_idx ON events(session_id, seq)",
76 "CREATE TABLE IF NOT EXISTS sync_state (
77 k TEXT PRIMARY KEY,
78 v TEXT NOT NULL
79 )",
80 "CREATE UNIQUE INDEX IF NOT EXISTS files_touched_session_path_idx ON files_touched(session_id, path)",
81 "CREATE UNIQUE INDEX IF NOT EXISTS skills_used_session_skill_idx ON skills_used(session_id, skill)",
82 "CREATE TABLE IF NOT EXISTS tool_spans (
83 span_id TEXT PRIMARY KEY,
84 session_id TEXT NOT NULL,
85 tool TEXT,
86 tool_call_id TEXT,
87 status TEXT NOT NULL,
88 started_at_ms INTEGER,
89 ended_at_ms INTEGER,
90 lead_time_ms INTEGER,
91 tokens_in INTEGER,
92 tokens_out INTEGER,
93 reasoning_tokens INTEGER,
94 cost_usd_e6 INTEGER,
95 paths_json TEXT NOT NULL DEFAULT '[]'
96 )",
97 "CREATE TABLE IF NOT EXISTS tool_span_paths (
98 span_id TEXT NOT NULL,
99 path TEXT NOT NULL,
100 PRIMARY KEY (span_id, path)
101 )",
102 "CREATE TABLE IF NOT EXISTS session_repo_binding (
103 session_id TEXT PRIMARY KEY,
104 start_commit TEXT,
105 end_commit TEXT,
106 branch TEXT,
107 dirty_start INTEGER,
108 dirty_end INTEGER,
109 repo_binding_source TEXT NOT NULL DEFAULT ''
110 )",
111 "CREATE TABLE IF NOT EXISTS repo_snapshots (
112 id TEXT PRIMARY KEY,
113 workspace TEXT NOT NULL,
114 head_commit TEXT,
115 dirty_fingerprint TEXT NOT NULL,
116 analyzer_version TEXT NOT NULL,
117 indexed_at_ms INTEGER NOT NULL,
118 dirty INTEGER NOT NULL DEFAULT 0,
119 graph_path TEXT NOT NULL
120 )",
121 "CREATE TABLE IF NOT EXISTS file_facts (
122 snapshot_id TEXT NOT NULL,
123 path TEXT NOT NULL,
124 language TEXT NOT NULL,
125 bytes INTEGER NOT NULL,
126 loc INTEGER NOT NULL,
127 sloc INTEGER NOT NULL,
128 complexity_total INTEGER NOT NULL,
129 max_fn_complexity INTEGER NOT NULL,
130 symbol_count INTEGER NOT NULL,
131 import_count INTEGER NOT NULL,
132 fan_in INTEGER NOT NULL,
133 fan_out INTEGER NOT NULL,
134 churn_30d INTEGER NOT NULL,
135 churn_90d INTEGER NOT NULL,
136 authors_90d INTEGER NOT NULL,
137 last_changed_ms INTEGER,
138 PRIMARY KEY (snapshot_id, path)
139 )",
140 "CREATE TABLE IF NOT EXISTS repo_edges (
141 snapshot_id TEXT NOT NULL,
142 from_id TEXT NOT NULL,
143 to_id TEXT NOT NULL,
144 kind TEXT NOT NULL,
145 weight INTEGER NOT NULL,
146 PRIMARY KEY (snapshot_id, from_id, to_id, kind)
147 )",
148 "CREATE INDEX IF NOT EXISTS sessions_workspace_idx ON sessions(workspace)",
150 "CREATE INDEX IF NOT EXISTS sessions_workspace_started_idx ON sessions(workspace, started_at_ms)",
152 "CREATE TABLE IF NOT EXISTS rules_used (
153 id INTEGER PRIMARY KEY AUTOINCREMENT,
154 session_id TEXT NOT NULL,
155 rule TEXT NOT NULL
156 )",
157 "CREATE UNIQUE INDEX IF NOT EXISTS rules_used_session_rule_idx ON rules_used(session_id, rule)",
158 "CREATE TABLE IF NOT EXISTS remote_pull_state (
160 id INTEGER PRIMARY KEY CHECK (id = 1),
161 query_provider TEXT NOT NULL DEFAULT 'none',
162 cursor_json TEXT NOT NULL DEFAULT '',
163 last_success_ms INTEGER
164 )",
165 "INSERT OR IGNORE INTO remote_pull_state (id) VALUES (1)",
166 "CREATE TABLE IF NOT EXISTS remote_sessions (
167 team_id TEXT NOT NULL,
168 workspace_hash TEXT NOT NULL,
169 session_id_hash TEXT NOT NULL,
170 json TEXT NOT NULL,
171 PRIMARY KEY (team_id, workspace_hash, session_id_hash)
172 )",
173 "CREATE TABLE IF NOT EXISTS remote_events (
174 team_id TEXT NOT NULL,
175 workspace_hash TEXT NOT NULL,
176 session_id_hash TEXT NOT NULL,
177 event_seq INTEGER NOT NULL,
178 json TEXT NOT NULL,
179 PRIMARY KEY (team_id, workspace_hash, session_id_hash, event_seq)
180 )",
181 "CREATE TABLE IF NOT EXISTS remote_tool_spans (
182 team_id TEXT NOT NULL,
183 workspace_hash TEXT NOT NULL,
184 span_id_hash TEXT NOT NULL,
185 json TEXT NOT NULL,
186 PRIMARY KEY (team_id, workspace_hash, span_id_hash)
187 )",
188 "CREATE TABLE IF NOT EXISTS remote_repo_snapshots (
189 team_id TEXT NOT NULL,
190 workspace_hash TEXT NOT NULL,
191 snapshot_id_hash TEXT NOT NULL,
192 chunk_index INTEGER NOT NULL,
193 json TEXT NOT NULL,
194 PRIMARY KEY (team_id, workspace_hash, snapshot_id_hash, chunk_index)
195 )",
196 "CREATE TABLE IF NOT EXISTS remote_workspace_facts (
197 team_id TEXT NOT NULL,
198 workspace_hash TEXT NOT NULL,
199 fact_key TEXT NOT NULL,
200 json TEXT NOT NULL,
201 PRIMARY KEY (team_id, workspace_hash, fact_key)
202 )",
203 "CREATE TABLE IF NOT EXISTS session_evals (
204 id TEXT PRIMARY KEY,
205 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
206 judge_model TEXT NOT NULL,
207 rubric_id TEXT NOT NULL,
208 score REAL NOT NULL CHECK(score BETWEEN 0.0 AND 1.0),
209 rationale TEXT NOT NULL,
210 flagged INTEGER NOT NULL DEFAULT 0,
211 created_at_ms INTEGER NOT NULL
212 );
213 CREATE INDEX IF NOT EXISTS session_evals_session ON session_evals(session_id);
214 CREATE INDEX IF NOT EXISTS session_evals_rubric ON session_evals(rubric_id, score)",
215 "CREATE TABLE IF NOT EXISTS prompt_snapshots (
216 fingerprint TEXT PRIMARY KEY,
217 captured_at_ms INTEGER NOT NULL,
218 files_json TEXT NOT NULL,
219 total_bytes INTEGER NOT NULL
220 )",
221 "CREATE TABLE IF NOT EXISTS session_feedback (
222 id TEXT PRIMARY KEY,
223 session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
224 score INTEGER CHECK(score BETWEEN 1 AND 5),
225 label TEXT CHECK(label IN ('good','bad','interesting','bug','regression')),
226 note TEXT,
227 created_at_ms INTEGER NOT NULL
228 );
229 CREATE INDEX IF NOT EXISTS session_feedback_session ON session_feedback(session_id);
230 CREATE INDEX IF NOT EXISTS session_feedback_label ON session_feedback(label, created_at_ms)",
231];
232
233#[derive(Clone)]
235pub struct InsightsStats {
236 pub total_sessions: u64,
237 pub running_sessions: u64,
238 pub total_events: u64,
239 pub sessions_by_day: Vec<(String, u64)>,
241 pub recent: Vec<(SessionRecord, u64)>,
243 pub top_tools: Vec<(String, u64)>,
245 pub total_cost_usd_e6: i64,
246 pub sessions_with_cost: u64,
247}
248
249pub struct SyncStatusSnapshot {
251 pub pending_outbox: u64,
252 pub last_success_ms: Option<u64>,
253 pub last_error: Option<String>,
254 pub consecutive_failures: u32,
255}
256
257#[derive(serde::Serialize)]
259pub struct SummaryStats {
260 pub session_count: u64,
261 pub total_cost_usd_e6: i64,
262 pub by_agent: Vec<(String, u64)>,
263 pub by_model: Vec<(String, u64)>,
264 pub top_tools: Vec<(String, u64)>,
265}
266
267#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
269#[serde(rename_all = "lowercase")]
270pub enum GuidanceKind {
271 Skill,
272 Rule,
273}
274
275#[derive(Clone, Debug, serde::Serialize)]
277pub struct GuidancePerfRow {
278 pub kind: GuidanceKind,
279 pub id: String,
280 pub sessions: u64,
281 pub sessions_pct: f64,
282 pub total_cost_usd_e6: i64,
283 pub avg_cost_per_session_usd: Option<f64>,
284 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
285 pub on_disk: bool,
286}
287
288#[derive(Clone, Debug, serde::Serialize)]
290pub struct GuidanceReport {
291 pub workspace: String,
292 pub window_start_ms: u64,
293 pub window_end_ms: u64,
294 pub sessions_in_window: u64,
295 pub workspace_avg_cost_per_session_usd: Option<f64>,
296 pub rows: Vec<GuidancePerfRow>,
297}
298
299#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
301pub struct PruneStats {
302 pub sessions_removed: u64,
303 pub events_removed: u64,
304}
305
306pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
308pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
309
310pub struct ToolSpanSyncRow {
311 pub span_id: String,
312 pub session_id: String,
313 pub tool: Option<String>,
314 pub tool_call_id: Option<String>,
315 pub status: String,
316 pub started_at_ms: Option<u64>,
317 pub ended_at_ms: Option<u64>,
318 pub lead_time_ms: Option<u64>,
319 pub tokens_in: Option<u32>,
320 pub tokens_out: Option<u32>,
321 pub reasoning_tokens: Option<u32>,
322 pub cost_usd_e6: Option<i64>,
323 pub paths: Vec<String>,
324}
325
326pub struct Store {
327 conn: Connection,
328}
329
330impl Store {
331 pub(crate) fn conn(&self) -> &Connection {
332 &self.conn
333 }
334
335 pub fn open(path: &Path) -> Result<Self> {
336 if let Some(parent) = path.parent() {
337 std::fs::create_dir_all(parent)?;
338 }
339 let conn =
340 Connection::open(path).with_context(|| format!("open db: {}", path.display()))?;
341 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
342 for sql in MIGRATIONS {
343 conn.execute_batch(sql)?;
344 }
345 ensure_schema_columns(&conn)?;
346 Ok(Self { conn })
347 }
348
349 pub fn upsert_session(&self, s: &SessionRecord) -> Result<()> {
350 self.conn.execute(
351 "INSERT INTO sessions (
352 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
353 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
354 prompt_fingerprint
355 )
356 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
357 ON CONFLICT(id) DO UPDATE SET
358 agent=excluded.agent, model=excluded.model, workspace=excluded.workspace,
359 started_at_ms=excluded.started_at_ms, ended_at_ms=excluded.ended_at_ms,
360 status=excluded.status, trace_path=excluded.trace_path,
361 start_commit=excluded.start_commit, end_commit=excluded.end_commit,
362 branch=excluded.branch, dirty_start=excluded.dirty_start,
363 dirty_end=excluded.dirty_end, repo_binding_source=excluded.repo_binding_source,
364 prompt_fingerprint=excluded.prompt_fingerprint",
365 params![
366 s.id,
367 s.agent,
368 s.model,
369 s.workspace,
370 s.started_at_ms as i64,
371 s.ended_at_ms.map(|v| v as i64),
372 format!("{:?}", s.status),
373 s.trace_path,
374 s.start_commit,
375 s.end_commit,
376 s.branch,
377 s.dirty_start.map(bool_to_i64),
378 s.dirty_end.map(bool_to_i64),
379 s.repo_binding_source.clone().unwrap_or_default(),
380 s.prompt_fingerprint.as_deref(),
381 ],
382 )?;
383 self.conn.execute(
384 "INSERT INTO session_repo_binding (
385 session_id, start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
386 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
387 ON CONFLICT(session_id) DO UPDATE SET
388 start_commit=excluded.start_commit,
389 end_commit=excluded.end_commit,
390 branch=excluded.branch,
391 dirty_start=excluded.dirty_start,
392 dirty_end=excluded.dirty_end,
393 repo_binding_source=excluded.repo_binding_source",
394 params![
395 s.id,
396 s.start_commit,
397 s.end_commit,
398 s.branch,
399 s.dirty_start.map(bool_to_i64),
400 s.dirty_end.map(bool_to_i64),
401 s.repo_binding_source.clone().unwrap_or_default(),
402 ],
403 )?;
404 Ok(())
405 }
406
407 pub fn ensure_session_stub(
410 &self,
411 id: &str,
412 agent: &str,
413 workspace: &str,
414 started_at_ms: u64,
415 ) -> Result<()> {
416 self.conn.execute(
417 "INSERT OR IGNORE INTO sessions (
418 id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
419 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source
420 ) VALUES (?1, ?2, NULL, ?3, ?4, NULL, 'Running', '', NULL, NULL, NULL, NULL, NULL, '')",
421 params![id, agent, workspace, started_at_ms as i64],
422 )?;
423 Ok(())
424 }
425
426 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
428 let n: i64 = self.conn.query_row(
429 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
430 [session_id],
431 |r| r.get(0),
432 )?;
433 Ok(n as u64)
434 }
435
436 pub fn append_event(&self, e: &Event) -> Result<()> {
437 self.append_event_with_sync(e, None)
438 }
439
440 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
442 let payload = serde_json::to_string(&e.payload)?;
443 self.conn.execute(
444 "INSERT INTO events (
445 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
446 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
447 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
448 ON CONFLICT(session_id, seq) DO UPDATE SET
449 ts_ms = excluded.ts_ms,
450 ts_exact = excluded.ts_exact,
451 kind = excluded.kind,
452 source = excluded.source,
453 tool = excluded.tool,
454 tool_call_id = excluded.tool_call_id,
455 tokens_in = excluded.tokens_in,
456 tokens_out = excluded.tokens_out,
457 reasoning_tokens = excluded.reasoning_tokens,
458 cost_usd_e6 = excluded.cost_usd_e6,
459 payload = excluded.payload",
460 params![
461 e.session_id,
462 e.seq as i64,
463 e.ts_ms as i64,
464 bool_to_i64(e.ts_exact),
465 format!("{:?}", e.kind),
466 format!("{:?}", e.source),
467 e.tool,
468 e.tool_call_id,
469 e.tokens_in.map(|v| v as i64),
470 e.tokens_out.map(|v| v as i64),
471 e.reasoning_tokens.map(|v| v as i64),
472 e.cost_usd_e6,
473 payload,
474 ],
475 )?;
476 if self.conn.changes() == 0 {
477 return Ok(());
478 }
479 index_event_derived(&self.conn, e)?;
480 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
481 let Some(ctx) = ctx else {
482 return Ok(());
483 };
484 let sync = &ctx.sync;
485 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
486 return Ok(());
487 }
488 let Some(salt) = try_team_salt(sync) else {
489 tracing::warn!(
490 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
491 );
492 return Ok(());
493 };
494 if sync.sample_rate < 1.0 {
495 let u: f64 = rand::random();
496 if u > sync.sample_rate {
497 return Ok(());
498 }
499 }
500 let Some(session) = self.get_session(&e.session_id)? else {
501 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
502 return Ok(());
503 };
504 let mut outbound = outbound_event_from_row(e, &session, &salt);
505 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
506 let row = serde_json::to_string(&outbound)?;
507 self.conn.execute(
508 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, 'events', ?2, 0)",
509 params![e.session_id, row],
510 )?;
511 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
512 Ok(())
513 }
514
515 pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
516 let mut stmt = self.conn.prepare(
517 "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
518 )?;
519 let rows = stmt.query_map(params![limit as i64], |row| {
520 Ok((
521 row.get::<_, i64>(0)?,
522 row.get::<_, String>(1)?,
523 row.get::<_, String>(2)?,
524 ))
525 })?;
526 let mut out = Vec::new();
527 for r in rows {
528 out.push(r?);
529 }
530 Ok(out)
531 }
532
533 pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
534 for id in ids {
535 self.conn
536 .execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", params![id])?;
537 }
538 Ok(())
539 }
540
541 pub fn replace_outbox_rows(
542 &self,
543 owner_id: &str,
544 kind: &str,
545 payloads: &[String],
546 ) -> Result<()> {
547 self.conn.execute(
548 "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
549 params![owner_id, kind],
550 )?;
551 for payload in payloads {
552 self.conn.execute(
553 "INSERT INTO sync_outbox (session_id, kind, payload, sent) VALUES (?1, ?2, ?3, 0)",
554 params![owner_id, kind, payload],
555 )?;
556 }
557 Ok(())
558 }
559
560 pub fn outbox_pending_count(&self) -> Result<u64> {
561 let c: i64 =
562 self.conn
563 .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
564 r.get(0)
565 })?;
566 Ok(c as u64)
567 }
568
569 pub fn set_sync_state_ok(&self) -> Result<()> {
570 let now = now_ms().to_string();
571 self.conn.execute(
572 "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
573 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
574 params![now],
575 )?;
576 self.conn.execute(
577 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
578 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
579 [],
580 )?;
581 self.conn
582 .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
583 Ok(())
584 }
585
586 pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
587 let prev: i64 = self
588 .conn
589 .query_row(
590 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
591 [],
592 |r| {
593 let s: String = r.get(0)?;
594 Ok(s.parse::<i64>().unwrap_or(0))
595 },
596 )
597 .optional()?
598 .unwrap_or(0);
599 let next = prev.saturating_add(1);
600 self.conn.execute(
601 "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
602 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
603 params![msg],
604 )?;
605 self.conn.execute(
606 "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
607 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
608 params![next.to_string()],
609 )?;
610 Ok(())
611 }
612
613 pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
614 let pending_outbox = self.outbox_pending_count()?;
615 let last_success_ms = self
616 .conn
617 .query_row(
618 "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
619 [],
620 |r| r.get::<_, String>(0),
621 )
622 .optional()?
623 .and_then(|s| s.parse().ok());
624 let last_error = self
625 .conn
626 .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
627 r.get::<_, String>(0)
628 })
629 .optional()?;
630 let consecutive_failures = self
631 .conn
632 .query_row(
633 "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
634 [],
635 |r| r.get::<_, String>(0),
636 )
637 .optional()?
638 .and_then(|s| s.parse().ok())
639 .unwrap_or(0);
640 Ok(SyncStatusSnapshot {
641 pending_outbox,
642 last_success_ms,
643 last_error,
644 consecutive_failures,
645 })
646 }
647
648 pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
649 let row: Option<String> = self
650 .conn
651 .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
652 r.get::<_, String>(0)
653 })
654 .optional()?;
655 Ok(row.and_then(|s| s.parse().ok()))
656 }
657
658 pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
659 self.conn.execute(
660 "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
661 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
662 params![key, v.to_string()],
663 )?;
664 Ok(())
665 }
666
667 pub fn prune_sessions_started_before(&self, cutoff_ms: i64) -> Result<PruneStats> {
669 let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Deferred)?;
670 let sessions_to_remove: i64 = tx.query_row(
671 "SELECT COUNT(*) FROM sessions WHERE started_at_ms < ?1",
672 params![cutoff_ms],
673 |r| r.get(0),
674 )?;
675 let events_to_remove: i64 = tx.query_row(
676 "SELECT COUNT(*) FROM events WHERE session_id IN \
677 (SELECT id FROM sessions WHERE started_at_ms < ?1)",
678 params![cutoff_ms],
679 |r| r.get(0),
680 )?;
681
682 let sub_old_sessions = "SELECT id FROM sessions WHERE started_at_ms < ?1";
683 tx.execute(
684 &format!(
685 "DELETE FROM tool_span_paths WHERE span_id IN \
686 (SELECT span_id FROM tool_spans WHERE session_id IN ({sub_old_sessions}))"
687 ),
688 params![cutoff_ms],
689 )?;
690 tx.execute(
691 &format!("DELETE FROM tool_spans WHERE session_id IN ({sub_old_sessions})"),
692 params![cutoff_ms],
693 )?;
694 tx.execute(
695 &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"),
696 params![cutoff_ms],
697 )?;
698 tx.execute(
699 &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"),
700 params![cutoff_ms],
701 )?;
702 tx.execute(
703 &format!("DELETE FROM skills_used WHERE session_id IN ({sub_old_sessions})"),
704 params![cutoff_ms],
705 )?;
706 tx.execute(
707 &format!("DELETE FROM rules_used WHERE session_id IN ({sub_old_sessions})"),
708 params![cutoff_ms],
709 )?;
710 tx.execute(
711 &format!("DELETE FROM sync_outbox WHERE session_id IN ({sub_old_sessions})"),
712 params![cutoff_ms],
713 )?;
714 tx.execute(
715 &format!("DELETE FROM session_repo_binding WHERE session_id IN ({sub_old_sessions})"),
716 params![cutoff_ms],
717 )?;
718 tx.execute(
719 &format!("DELETE FROM experiment_tags WHERE session_id IN ({sub_old_sessions})"),
720 params![cutoff_ms],
721 )?;
722 tx.execute(
723 "DELETE FROM sessions WHERE started_at_ms < ?1",
724 params![cutoff_ms],
725 )?;
726 tx.commit()?;
727 Ok(PruneStats {
728 sessions_removed: sessions_to_remove as u64,
729 events_removed: events_to_remove as u64,
730 })
731 }
732
733 pub fn vacuum(&self) -> Result<()> {
735 self.conn.execute_batch("VACUUM;").context("VACUUM")?;
736 Ok(())
737 }
738
739 pub fn list_sessions(&self, workspace: &str) -> Result<Vec<SessionRecord>> {
740 let mut stmt = self.conn.prepare(
741 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
742 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
743 prompt_fingerprint
744 FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC",
745 )?;
746 let rows = stmt.query_map(params![workspace], |row| {
747 Ok((
748 row.get::<_, String>(0)?,
749 row.get::<_, String>(1)?,
750 row.get::<_, Option<String>>(2)?,
751 row.get::<_, String>(3)?,
752 row.get::<_, i64>(4)?,
753 row.get::<_, Option<i64>>(5)?,
754 row.get::<_, String>(6)?,
755 row.get::<_, String>(7)?,
756 row.get::<_, Option<String>>(8)?,
757 row.get::<_, Option<String>>(9)?,
758 row.get::<_, Option<String>>(10)?,
759 row.get::<_, Option<i64>>(11)?,
760 row.get::<_, Option<i64>>(12)?,
761 row.get::<_, String>(13)?,
762 row.get::<_, Option<String>>(14)?,
763 ))
764 })?;
765
766 let mut out = Vec::new();
767 for row in rows {
768 let (
769 id,
770 agent,
771 model,
772 workspace,
773 started,
774 ended,
775 status_str,
776 trace,
777 start_commit,
778 end_commit,
779 branch,
780 dirty_start,
781 dirty_end,
782 source,
783 prompt_fingerprint,
784 ) = row?;
785 out.push(SessionRecord {
786 id,
787 agent,
788 model,
789 workspace,
790 started_at_ms: started as u64,
791 ended_at_ms: ended.map(|v| v as u64),
792 status: status_from_str(&status_str),
793 trace_path: trace,
794 start_commit,
795 end_commit,
796 branch,
797 dirty_start: dirty_start.map(i64_to_bool),
798 dirty_end: dirty_end.map(i64_to_bool),
799 repo_binding_source: empty_to_none(source),
800 prompt_fingerprint,
801 });
802 }
803 Ok(out)
804 }
805
806 pub fn summary_stats(&self, workspace: &str) -> Result<SummaryStats> {
807 let session_count: i64 = self.conn.query_row(
808 "SELECT COUNT(*) FROM sessions WHERE workspace = ?1",
809 params![workspace],
810 |r| r.get(0),
811 )?;
812
813 let total_cost: i64 = self.conn.query_row(
814 "SELECT COALESCE(SUM(e.cost_usd_e6), 0) FROM events e
815 JOIN sessions s ON s.id = e.session_id WHERE s.workspace = ?1",
816 params![workspace],
817 |r| r.get(0),
818 )?;
819
820 let mut stmt = self.conn.prepare(
821 "SELECT agent, COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY agent ORDER BY COUNT(*) DESC",
822 )?;
823 let by_agent: Vec<(String, u64)> = stmt
824 .query_map(params![workspace], |r| {
825 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
826 })?
827 .filter_map(|r| r.ok())
828 .collect();
829
830 let mut stmt = self.conn.prepare(
831 "SELECT COALESCE(model, 'unknown'), COUNT(*) FROM sessions WHERE workspace = ?1 GROUP BY model ORDER BY COUNT(*) DESC",
832 )?;
833 let by_model: Vec<(String, u64)> = stmt
834 .query_map(params![workspace], |r| {
835 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
836 })?
837 .filter_map(|r| r.ok())
838 .collect();
839
840 let mut stmt = self.conn.prepare(
841 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id = e.session_id
842 WHERE s.workspace = ?1 AND tool IS NOT NULL
843 GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
844 )?;
845 let top_tools: Vec<(String, u64)> = stmt
846 .query_map(params![workspace], |r| {
847 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
848 })?
849 .filter_map(|r| r.ok())
850 .collect();
851
852 Ok(SummaryStats {
853 session_count: session_count as u64,
854 total_cost_usd_e6: total_cost,
855 by_agent,
856 by_model,
857 top_tools,
858 })
859 }
860
861 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
862 let mut stmt = self.conn.prepare(
863 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
864 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload
865 FROM events WHERE session_id = ?1 ORDER BY seq ASC",
866 )?;
867 let rows = stmt.query_map(params![session_id], |row| {
868 Ok((
869 row.get::<_, String>(0)?,
870 row.get::<_, i64>(1)?,
871 row.get::<_, i64>(2)?,
872 row.get::<_, i64>(3)?,
873 row.get::<_, String>(4)?,
874 row.get::<_, String>(5)?,
875 row.get::<_, Option<String>>(6)?,
876 row.get::<_, Option<String>>(7)?,
877 row.get::<_, Option<i64>>(8)?,
878 row.get::<_, Option<i64>>(9)?,
879 row.get::<_, Option<i64>>(10)?,
880 row.get::<_, Option<i64>>(11)?,
881 row.get::<_, String>(12)?,
882 ))
883 })?;
884
885 let mut events = Vec::new();
886 for row in rows {
887 let (
888 sid,
889 seq,
890 ts_ms,
891 ts_exact,
892 kind_str,
893 source_str,
894 tool,
895 tool_call_id,
896 tokens_in,
897 tokens_out,
898 reasoning_tokens,
899 cost_usd_e6,
900 payload_str,
901 ) = row?;
902 events.push(Event {
903 session_id: sid,
904 seq: seq as u64,
905 ts_ms: ts_ms as u64,
906 ts_exact: ts_exact != 0,
907 kind: kind_from_str(&kind_str),
908 source: source_from_str(&source_str),
909 tool,
910 tool_call_id,
911 tokens_in: tokens_in.map(|v| v as u32),
912 tokens_out: tokens_out.map(|v| v as u32),
913 reasoning_tokens: reasoning_tokens.map(|v| v as u32),
914 cost_usd_e6,
915 payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
916 });
917 }
918 Ok(events)
919 }
920
921 pub fn update_session_status(&self, id: &str, status: SessionStatus) -> Result<()> {
923 self.conn.execute(
924 "UPDATE sessions SET status = ?1 WHERE id = ?2",
925 params![format!("{:?}", status), id],
926 )?;
927 Ok(())
928 }
929
930 pub fn insights(&self, workspace: &str) -> Result<InsightsStats> {
932 let (total_cost_usd_e6, sessions_with_cost) = cost_stats(&self.conn, workspace)?;
933 Ok(InsightsStats {
934 total_sessions: count_q(
935 &self.conn,
936 "SELECT COUNT(*) FROM sessions WHERE workspace=?1",
937 workspace,
938 )?,
939 running_sessions: count_q(
940 &self.conn,
941 "SELECT COUNT(*) FROM sessions WHERE workspace=?1 AND status='Running'",
942 workspace,
943 )?,
944 total_events: count_q(
945 &self.conn,
946 "SELECT COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
947 workspace,
948 )?,
949 sessions_by_day: sessions_by_day_7(&self.conn, workspace, now_ms())?,
950 recent: recent_sessions_3(&self.conn, workspace)?,
951 top_tools: top_tools_5(&self.conn, workspace)?,
952 total_cost_usd_e6,
953 sessions_with_cost,
954 })
955 }
956
957 pub fn retro_events_in_window(
959 &self,
960 workspace: &str,
961 start_ms: u64,
962 end_ms: u64,
963 ) -> Result<Vec<(SessionRecord, Event)>> {
964 let mut stmt = self.conn.prepare(
965 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
966 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
967 s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, s.status, s.trace_path,
968 s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, s.repo_binding_source
969 FROM events e
970 JOIN sessions s ON s.id = e.session_id
971 WHERE s.workspace = ?1
972 AND (
973 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
974 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
975 )
976 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
977 )?;
978 let rows = stmt.query_map(
979 params![
980 workspace,
981 start_ms as i64,
982 end_ms as i64,
983 SYNTHETIC_TS_CEILING_MS,
984 ],
985 |row| {
986 let payload_str: String = row.get(12)?;
987 let status_str: String = row.get(19)?;
988 Ok((
989 SessionRecord {
990 id: row.get(13)?,
991 agent: row.get(14)?,
992 model: row.get(15)?,
993 workspace: row.get(16)?,
994 started_at_ms: row.get::<_, i64>(17)? as u64,
995 ended_at_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
996 status: status_from_str(&status_str),
997 trace_path: row.get(20)?,
998 start_commit: row.get(21)?,
999 end_commit: row.get(22)?,
1000 branch: row.get(23)?,
1001 dirty_start: row.get::<_, Option<i64>>(24)?.map(i64_to_bool),
1002 dirty_end: row.get::<_, Option<i64>>(25)?.map(i64_to_bool),
1003 repo_binding_source: empty_to_none(row.get::<_, String>(26)?),
1004 prompt_fingerprint: None,
1005 },
1006 Event {
1007 session_id: row.get(0)?,
1008 seq: row.get::<_, i64>(1)? as u64,
1009 ts_ms: row.get::<_, i64>(2)? as u64,
1010 ts_exact: row.get::<_, i64>(3)? != 0,
1011 kind: kind_from_str(&row.get::<_, String>(4)?),
1012 source: source_from_str(&row.get::<_, String>(5)?),
1013 tool: row.get(6)?,
1014 tool_call_id: row.get(7)?,
1015 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1016 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1017 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1018 cost_usd_e6: row.get(11)?,
1019 payload: serde_json::from_str(&payload_str)
1020 .unwrap_or(serde_json::Value::Null),
1021 },
1022 ))
1023 },
1024 )?;
1025
1026 let mut out = Vec::new();
1027 for r in rows {
1028 out.push(r?);
1029 }
1030 Ok(out)
1031 }
1032
1033 pub fn files_touched_in_window(
1035 &self,
1036 workspace: &str,
1037 start_ms: u64,
1038 end_ms: u64,
1039 ) -> Result<Vec<(String, String)>> {
1040 let mut stmt = self.conn.prepare(
1041 "SELECT DISTINCT ft.session_id, ft.path
1042 FROM files_touched ft
1043 JOIN sessions s ON s.id = ft.session_id
1044 WHERE s.workspace = ?1
1045 AND EXISTS (
1046 SELECT 1 FROM events e
1047 JOIN sessions ss ON ss.id = e.session_id
1048 WHERE e.session_id = ft.session_id
1049 AND (
1050 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1051 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1052 )
1053 )
1054 ORDER BY ft.session_id, ft.path",
1055 )?;
1056 let out: Vec<(String, String)> = stmt
1057 .query_map(
1058 params![
1059 workspace,
1060 start_ms as i64,
1061 end_ms as i64,
1062 SYNTHETIC_TS_CEILING_MS,
1063 ],
1064 |r| Ok((r.get(0)?, r.get(1)?)),
1065 )?
1066 .filter_map(|r| r.ok())
1067 .collect();
1068 Ok(out)
1069 }
1070
1071 pub fn skills_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1074 let mut stmt = self.conn.prepare(
1075 "SELECT DISTINCT su.skill
1076 FROM skills_used su
1077 JOIN sessions s ON s.id = su.session_id
1078 WHERE s.workspace = ?1
1079 AND EXISTS (
1080 SELECT 1 FROM events e
1081 JOIN sessions ss ON ss.id = e.session_id
1082 WHERE e.session_id = su.session_id
1083 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1084 )
1085 ORDER BY su.skill",
1086 )?;
1087 let out: Vec<String> = stmt
1088 .query_map(
1089 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1090 |r| r.get::<_, String>(0),
1091 )?
1092 .filter_map(|r| r.ok())
1093 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1094 .collect();
1095 Ok(out)
1096 }
1097
1098 pub fn skills_used_in_window(
1100 &self,
1101 workspace: &str,
1102 start_ms: u64,
1103 end_ms: u64,
1104 ) -> Result<Vec<(String, String)>> {
1105 let mut stmt = self.conn.prepare(
1106 "SELECT DISTINCT su.session_id, su.skill
1107 FROM skills_used su
1108 JOIN sessions s ON s.id = su.session_id
1109 WHERE s.workspace = ?1
1110 AND EXISTS (
1111 SELECT 1 FROM events e
1112 JOIN sessions ss ON ss.id = e.session_id
1113 WHERE e.session_id = su.session_id
1114 AND (
1115 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1116 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1117 )
1118 )
1119 ORDER BY su.session_id, su.skill",
1120 )?;
1121 let out: Vec<(String, String)> = stmt
1122 .query_map(
1123 params![
1124 workspace,
1125 start_ms as i64,
1126 end_ms as i64,
1127 SYNTHETIC_TS_CEILING_MS,
1128 ],
1129 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1130 )?
1131 .filter_map(|r| r.ok())
1132 .filter(|(_, skill): &(String, String)| crate::store::event_index::is_valid_slug(skill))
1133 .collect();
1134 Ok(out)
1135 }
1136
1137 pub fn rules_used_since(&self, workspace: &str, since_ms: u64) -> Result<Vec<String>> {
1139 let mut stmt = self.conn.prepare(
1140 "SELECT DISTINCT ru.rule
1141 FROM rules_used ru
1142 JOIN sessions s ON s.id = ru.session_id
1143 WHERE s.workspace = ?1
1144 AND EXISTS (
1145 SELECT 1 FROM events e
1146 JOIN sessions ss ON ss.id = e.session_id
1147 WHERE e.session_id = ru.session_id
1148 AND (e.ts_ms >= ?2 OR (e.ts_ms < ?3 AND ss.started_at_ms >= ?2))
1149 )
1150 ORDER BY ru.rule",
1151 )?;
1152 let out: Vec<String> = stmt
1153 .query_map(
1154 params![workspace, since_ms as i64, SYNTHETIC_TS_CEILING_MS],
1155 |r| r.get::<_, String>(0),
1156 )?
1157 .filter_map(|r| r.ok())
1158 .filter(|s: &String| crate::store::event_index::is_valid_slug(s))
1159 .collect();
1160 Ok(out)
1161 }
1162
1163 pub fn rules_used_in_window(
1165 &self,
1166 workspace: &str,
1167 start_ms: u64,
1168 end_ms: u64,
1169 ) -> Result<Vec<(String, String)>> {
1170 let mut stmt = self.conn.prepare(
1171 "SELECT DISTINCT ru.session_id, ru.rule
1172 FROM rules_used ru
1173 JOIN sessions s ON s.id = ru.session_id
1174 WHERE s.workspace = ?1
1175 AND EXISTS (
1176 SELECT 1 FROM events e
1177 JOIN sessions ss ON ss.id = e.session_id
1178 WHERE e.session_id = ru.session_id
1179 AND (
1180 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1181 OR (e.ts_ms < ?4 AND ss.started_at_ms >= ?2 AND ss.started_at_ms <= ?3)
1182 )
1183 )
1184 ORDER BY ru.session_id, ru.rule",
1185 )?;
1186 let out: Vec<(String, String)> = stmt
1187 .query_map(
1188 params![
1189 workspace,
1190 start_ms as i64,
1191 end_ms as i64,
1192 SYNTHETIC_TS_CEILING_MS,
1193 ],
1194 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
1195 )?
1196 .filter_map(|r| r.ok())
1197 .filter(|(_, rule): &(String, String)| crate::store::event_index::is_valid_slug(rule))
1198 .collect();
1199 Ok(out)
1200 }
1201
1202 pub fn sessions_active_in_window(
1204 &self,
1205 workspace: &str,
1206 start_ms: u64,
1207 end_ms: u64,
1208 ) -> Result<HashSet<String>> {
1209 let mut stmt = self.conn.prepare(
1210 "SELECT DISTINCT s.id
1211 FROM sessions s
1212 WHERE s.workspace = ?1
1213 AND EXISTS (
1214 SELECT 1 FROM events e
1215 WHERE e.session_id = s.id
1216 AND (
1217 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1218 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1219 )
1220 )",
1221 )?;
1222 let out: HashSet<String> = stmt
1223 .query_map(
1224 params![
1225 workspace,
1226 start_ms as i64,
1227 end_ms as i64,
1228 SYNTHETIC_TS_CEILING_MS,
1229 ],
1230 |r| r.get(0),
1231 )?
1232 .filter_map(|r| r.ok())
1233 .collect();
1234 Ok(out)
1235 }
1236
1237 pub fn session_costs_usd_e6_in_window(
1239 &self,
1240 workspace: &str,
1241 start_ms: u64,
1242 end_ms: u64,
1243 ) -> Result<HashMap<String, i64>> {
1244 let mut stmt = self.conn.prepare(
1245 "SELECT e.session_id, SUM(COALESCE(e.cost_usd_e6, 0))
1246 FROM events e
1247 JOIN sessions s ON s.id = e.session_id
1248 WHERE s.workspace = ?1
1249 AND (
1250 (e.ts_ms >= ?2 AND e.ts_ms <= ?3)
1251 OR (e.ts_ms < ?4 AND s.started_at_ms >= ?2 AND s.started_at_ms <= ?3)
1252 )
1253 GROUP BY e.session_id",
1254 )?;
1255 let rows: Vec<(String, i64)> = stmt
1256 .query_map(
1257 params![
1258 workspace,
1259 start_ms as i64,
1260 end_ms as i64,
1261 SYNTHETIC_TS_CEILING_MS,
1262 ],
1263 |r| Ok((r.get(0)?, r.get(1)?)),
1264 )?
1265 .filter_map(|r| r.ok())
1266 .collect();
1267 Ok(rows.into_iter().collect())
1268 }
1269
1270 pub fn guidance_report(
1272 &self,
1273 workspace: &str,
1274 window_start_ms: u64,
1275 window_end_ms: u64,
1276 skill_slugs_on_disk: &HashSet<String>,
1277 rule_slugs_on_disk: &HashSet<String>,
1278 ) -> Result<GuidanceReport> {
1279 let active = self.sessions_active_in_window(workspace, window_start_ms, window_end_ms)?;
1280 let denom = active.len() as u64;
1281 let costs =
1282 self.session_costs_usd_e6_in_window(workspace, window_start_ms, window_end_ms)?;
1283
1284 let workspace_avg_cost_per_session_usd = if denom > 0 {
1285 let total_e6: i64 = active
1286 .iter()
1287 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1288 .sum();
1289 Some(total_e6 as f64 / denom as f64 / 1_000_000.0)
1290 } else {
1291 None
1292 };
1293
1294 let mut skill_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1295 for (sid, skill) in self.skills_used_in_window(workspace, window_start_ms, window_end_ms)? {
1296 skill_sessions.entry(skill).or_default().insert(sid);
1297 }
1298 let mut rule_sessions: HashMap<String, HashSet<String>> = HashMap::new();
1299 for (sid, rule) in self.rules_used_in_window(workspace, window_start_ms, window_end_ms)? {
1300 rule_sessions.entry(rule).or_default().insert(sid);
1301 }
1302
1303 let mut rows: Vec<GuidancePerfRow> = Vec::new();
1304
1305 let mut push_row =
1306 |kind: GuidanceKind, id: String, sids: &HashSet<String>, on_disk: bool| {
1307 let sessions = sids.len() as u64;
1308 let sessions_pct = if denom > 0 {
1309 sessions as f64 * 100.0 / denom as f64
1310 } else {
1311 0.0
1312 };
1313 let total_cost_usd_e6: i64 = sids
1314 .iter()
1315 .map(|sid| costs.get(sid).copied().unwrap_or(0))
1316 .sum();
1317 let avg_cost_per_session_usd = if sessions > 0 {
1318 Some(total_cost_usd_e6 as f64 / sessions as f64 / 1_000_000.0)
1319 } else {
1320 None
1321 };
1322 let vs_workspace_avg_cost_per_session_usd =
1323 match (avg_cost_per_session_usd, workspace_avg_cost_per_session_usd) {
1324 (Some(avg), Some(w)) => Some(avg - w),
1325 _ => None,
1326 };
1327 rows.push(GuidancePerfRow {
1328 kind,
1329 id,
1330 sessions,
1331 sessions_pct,
1332 total_cost_usd_e6,
1333 avg_cost_per_session_usd,
1334 vs_workspace_avg_cost_per_session_usd,
1335 on_disk,
1336 });
1337 };
1338
1339 let mut seen_skills: HashSet<String> = HashSet::new();
1340 for (id, sids) in &skill_sessions {
1341 seen_skills.insert(id.clone());
1342 push_row(
1343 GuidanceKind::Skill,
1344 id.clone(),
1345 sids,
1346 skill_slugs_on_disk.contains(id),
1347 );
1348 }
1349 for slug in skill_slugs_on_disk {
1350 if seen_skills.contains(slug) {
1351 continue;
1352 }
1353 push_row(GuidanceKind::Skill, slug.clone(), &HashSet::new(), true);
1354 }
1355
1356 let mut seen_rules: HashSet<String> = HashSet::new();
1357 for (id, sids) in &rule_sessions {
1358 seen_rules.insert(id.clone());
1359 push_row(
1360 GuidanceKind::Rule,
1361 id.clone(),
1362 sids,
1363 rule_slugs_on_disk.contains(id),
1364 );
1365 }
1366 for slug in rule_slugs_on_disk {
1367 if seen_rules.contains(slug) {
1368 continue;
1369 }
1370 push_row(GuidanceKind::Rule, slug.clone(), &HashSet::new(), true);
1371 }
1372
1373 rows.sort_by(|a, b| {
1374 b.sessions
1375 .cmp(&a.sessions)
1376 .then_with(|| a.kind.cmp(&b.kind))
1377 .then_with(|| a.id.cmp(&b.id))
1378 });
1379
1380 Ok(GuidanceReport {
1381 workspace: workspace.to_string(),
1382 window_start_ms,
1383 window_end_ms,
1384 sessions_in_window: denom,
1385 workspace_avg_cost_per_session_usd,
1386 rows,
1387 })
1388 }
1389
1390 pub fn get_session(&self, id: &str) -> Result<Option<SessionRecord>> {
1391 let mut stmt = self.conn.prepare(
1392 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, status, trace_path,
1393 start_commit, end_commit, branch, dirty_start, dirty_end, repo_binding_source,
1394 prompt_fingerprint
1395 FROM sessions WHERE id = ?1",
1396 )?;
1397 let mut rows = stmt.query_map(params![id], |row| {
1398 Ok((
1399 row.get::<_, String>(0)?,
1400 row.get::<_, String>(1)?,
1401 row.get::<_, Option<String>>(2)?,
1402 row.get::<_, String>(3)?,
1403 row.get::<_, i64>(4)?,
1404 row.get::<_, Option<i64>>(5)?,
1405 row.get::<_, String>(6)?,
1406 row.get::<_, String>(7)?,
1407 row.get::<_, Option<String>>(8)?,
1408 row.get::<_, Option<String>>(9)?,
1409 row.get::<_, Option<String>>(10)?,
1410 row.get::<_, Option<i64>>(11)?,
1411 row.get::<_, Option<i64>>(12)?,
1412 row.get::<_, String>(13)?,
1413 row.get::<_, Option<String>>(14)?,
1414 ))
1415 })?;
1416
1417 if let Some(row) = rows.next() {
1418 let (
1419 id,
1420 agent,
1421 model,
1422 workspace,
1423 started,
1424 ended,
1425 status_str,
1426 trace,
1427 start_commit,
1428 end_commit,
1429 branch,
1430 dirty_start,
1431 dirty_end,
1432 source,
1433 prompt_fingerprint,
1434 ) = row?;
1435 Ok(Some(SessionRecord {
1436 id,
1437 agent,
1438 model,
1439 workspace,
1440 started_at_ms: started as u64,
1441 ended_at_ms: ended.map(|v| v as u64),
1442 status: status_from_str(&status_str),
1443 trace_path: trace,
1444 start_commit,
1445 end_commit,
1446 branch,
1447 dirty_start: dirty_start.map(i64_to_bool),
1448 dirty_end: dirty_end.map(i64_to_bool),
1449 repo_binding_source: empty_to_none(source),
1450 prompt_fingerprint,
1451 }))
1452 } else {
1453 Ok(None)
1454 }
1455 }
1456
1457 pub fn latest_repo_snapshot(&self, workspace: &str) -> Result<Option<RepoSnapshotRecord>> {
1458 let mut stmt = self.conn.prepare(
1459 "SELECT id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1460 indexed_at_ms, dirty, graph_path
1461 FROM repo_snapshots WHERE workspace = ?1
1462 ORDER BY indexed_at_ms DESC LIMIT 1",
1463 )?;
1464 let mut rows = stmt.query_map(params![workspace], |row| {
1465 Ok(RepoSnapshotRecord {
1466 id: row.get(0)?,
1467 workspace: row.get(1)?,
1468 head_commit: row.get(2)?,
1469 dirty_fingerprint: row.get(3)?,
1470 analyzer_version: row.get(4)?,
1471 indexed_at_ms: row.get::<_, i64>(5)? as u64,
1472 dirty: row.get::<_, i64>(6)? != 0,
1473 graph_path: row.get(7)?,
1474 })
1475 })?;
1476 Ok(rows.next().transpose()?)
1477 }
1478
1479 pub fn save_repo_snapshot(
1480 &self,
1481 snapshot: &RepoSnapshotRecord,
1482 facts: &[FileFact],
1483 edges: &[RepoEdge],
1484 ) -> Result<()> {
1485 self.conn.execute(
1486 "INSERT INTO repo_snapshots (
1487 id, workspace, head_commit, dirty_fingerprint, analyzer_version,
1488 indexed_at_ms, dirty, graph_path
1489 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1490 ON CONFLICT(id) DO UPDATE SET
1491 workspace=excluded.workspace,
1492 head_commit=excluded.head_commit,
1493 dirty_fingerprint=excluded.dirty_fingerprint,
1494 analyzer_version=excluded.analyzer_version,
1495 indexed_at_ms=excluded.indexed_at_ms,
1496 dirty=excluded.dirty,
1497 graph_path=excluded.graph_path",
1498 params![
1499 snapshot.id,
1500 snapshot.workspace,
1501 snapshot.head_commit,
1502 snapshot.dirty_fingerprint,
1503 snapshot.analyzer_version,
1504 snapshot.indexed_at_ms as i64,
1505 bool_to_i64(snapshot.dirty),
1506 snapshot.graph_path,
1507 ],
1508 )?;
1509 self.conn.execute(
1510 "DELETE FROM file_facts WHERE snapshot_id = ?1",
1511 params![snapshot.id],
1512 )?;
1513 self.conn.execute(
1514 "DELETE FROM repo_edges WHERE snapshot_id = ?1",
1515 params![snapshot.id],
1516 )?;
1517 for fact in facts {
1518 self.conn.execute(
1519 "INSERT INTO file_facts (
1520 snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1521 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1522 churn_30d, churn_90d, authors_90d, last_changed_ms
1523 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1524 params![
1525 fact.snapshot_id,
1526 fact.path,
1527 fact.language,
1528 fact.bytes as i64,
1529 fact.loc as i64,
1530 fact.sloc as i64,
1531 fact.complexity_total as i64,
1532 fact.max_fn_complexity as i64,
1533 fact.symbol_count as i64,
1534 fact.import_count as i64,
1535 fact.fan_in as i64,
1536 fact.fan_out as i64,
1537 fact.churn_30d as i64,
1538 fact.churn_90d as i64,
1539 fact.authors_90d as i64,
1540 fact.last_changed_ms.map(|v| v as i64),
1541 ],
1542 )?;
1543 }
1544 for edge in edges {
1545 self.conn.execute(
1546 "INSERT INTO repo_edges (snapshot_id, from_id, to_id, kind, weight)
1547 VALUES (?1, ?2, ?3, ?4, ?5)
1548 ON CONFLICT(snapshot_id, from_id, to_id, kind)
1549 DO UPDATE SET weight = weight + excluded.weight",
1550 params![
1551 snapshot.id,
1552 edge.from_path,
1553 edge.to_path,
1554 edge.kind,
1555 edge.weight as i64,
1556 ],
1557 )?;
1558 }
1559 Ok(())
1560 }
1561
1562 pub fn file_facts_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<FileFact>> {
1563 let mut stmt = self.conn.prepare(
1564 "SELECT snapshot_id, path, language, bytes, loc, sloc, complexity_total,
1565 max_fn_complexity, symbol_count, import_count, fan_in, fan_out,
1566 churn_30d, churn_90d, authors_90d, last_changed_ms
1567 FROM file_facts WHERE snapshot_id = ?1 ORDER BY path ASC",
1568 )?;
1569 let rows = stmt.query_map(params![snapshot_id], |row| {
1570 Ok(FileFact {
1571 snapshot_id: row.get(0)?,
1572 path: row.get(1)?,
1573 language: row.get(2)?,
1574 bytes: row.get::<_, i64>(3)? as u64,
1575 loc: row.get::<_, i64>(4)? as u32,
1576 sloc: row.get::<_, i64>(5)? as u32,
1577 complexity_total: row.get::<_, i64>(6)? as u32,
1578 max_fn_complexity: row.get::<_, i64>(7)? as u32,
1579 symbol_count: row.get::<_, i64>(8)? as u32,
1580 import_count: row.get::<_, i64>(9)? as u32,
1581 fan_in: row.get::<_, i64>(10)? as u32,
1582 fan_out: row.get::<_, i64>(11)? as u32,
1583 churn_30d: row.get::<_, i64>(12)? as u32,
1584 churn_90d: row.get::<_, i64>(13)? as u32,
1585 authors_90d: row.get::<_, i64>(14)? as u32,
1586 last_changed_ms: row.get::<_, Option<i64>>(15)?.map(|v| v as u64),
1587 })
1588 })?;
1589 Ok(rows.filter_map(|row| row.ok()).collect())
1590 }
1591
1592 pub fn repo_edges_for_snapshot(&self, snapshot_id: &str) -> Result<Vec<RepoEdge>> {
1593 let mut stmt = self.conn.prepare(
1594 "SELECT from_id, to_id, kind, weight
1595 FROM repo_edges WHERE snapshot_id = ?1
1596 ORDER BY kind, from_id, to_id",
1597 )?;
1598 let rows = stmt.query_map(params![snapshot_id], |row| {
1599 Ok(RepoEdge {
1600 from_path: row.get(0)?,
1601 to_path: row.get(1)?,
1602 kind: row.get(2)?,
1603 weight: row.get::<_, i64>(3)? as u32,
1604 })
1605 })?;
1606 Ok(rows.filter_map(|row| row.ok()).collect())
1607 }
1608
1609 pub fn tool_spans_in_window(
1610 &self,
1611 workspace: &str,
1612 start_ms: u64,
1613 end_ms: u64,
1614 ) -> Result<Vec<ToolSpanView>> {
1615 let mut stmt = self.conn.prepare(
1616 "SELECT ts.tool, ts.status, ts.lead_time_ms, ts.tokens_in, ts.tokens_out,
1617 ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json
1618 FROM tool_spans ts
1619 JOIN sessions s ON s.id = ts.session_id
1620 WHERE s.workspace = ?1
1621 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
1622 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3
1623 ORDER BY COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) DESC",
1624 )?;
1625 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
1626 let paths_json: String = row.get(7)?;
1627 Ok(ToolSpanView {
1628 tool: row
1629 .get::<_, Option<String>>(0)?
1630 .unwrap_or_else(|| "unknown".into()),
1631 status: row.get(1)?,
1632 lead_time_ms: row.get::<_, Option<i64>>(2)?.map(|v| v as u64),
1633 tokens_in: row.get::<_, Option<i64>>(3)?.map(|v| v as u32),
1634 tokens_out: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
1635 reasoning_tokens: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
1636 cost_usd_e6: row.get(6)?,
1637 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1638 })
1639 })?;
1640 Ok(rows.filter_map(|row| row.ok()).collect())
1641 }
1642
1643 pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
1644 let mut stmt = self.conn.prepare(
1645 "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
1646 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
1647 FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
1648 )?;
1649 let rows = stmt.query_map(params![session_id], |row| {
1650 let paths_json: String = row.get(12)?;
1651 Ok(ToolSpanSyncRow {
1652 span_id: row.get(0)?,
1653 session_id: row.get(1)?,
1654 tool: row.get(2)?,
1655 tool_call_id: row.get(3)?,
1656 status: row.get(4)?,
1657 started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
1658 ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
1659 lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
1660 tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1661 tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
1662 reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1663 cost_usd_e6: row.get(11)?,
1664 paths: serde_json::from_str(&paths_json).unwrap_or_default(),
1665 })
1666 })?;
1667 Ok(rows.filter_map(|row| row.ok()).collect())
1668 }
1669
1670 pub fn upsert_eval(&self, eval: &crate::eval::types::EvalRow) -> rusqlite::Result<()> {
1671 self.conn.execute(
1672 "INSERT OR REPLACE INTO session_evals
1673 (id, session_id, judge_model, rubric_id, score, rationale, flagged, created_at_ms)
1674 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1675 rusqlite::params![
1676 eval.id,
1677 eval.session_id,
1678 eval.judge_model,
1679 eval.rubric_id,
1680 eval.score,
1681 eval.rationale,
1682 eval.flagged as i64,
1683 eval.created_at_ms as i64,
1684 ],
1685 )?;
1686 Ok(())
1687 }
1688
1689 pub fn list_evals_in_window(
1690 &self,
1691 start_ms: u64,
1692 end_ms: u64,
1693 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1694 let mut stmt = self.conn.prepare(
1695 "SELECT id, session_id, judge_model, rubric_id, score,
1696 rationale, flagged, created_at_ms
1697 FROM session_evals
1698 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1699 ORDER BY created_at_ms ASC",
1700 )?;
1701 let rows = stmt.query_map(rusqlite::params![start_ms as i64, end_ms as i64], |r| {
1702 Ok(crate::eval::types::EvalRow {
1703 id: r.get(0)?,
1704 session_id: r.get(1)?,
1705 judge_model: r.get(2)?,
1706 rubric_id: r.get(3)?,
1707 score: r.get(4)?,
1708 rationale: r.get(5)?,
1709 flagged: r.get::<_, i64>(6)? != 0,
1710 created_at_ms: r.get::<_, i64>(7)? as u64,
1711 })
1712 })?;
1713 rows.collect()
1714 }
1715
1716 pub fn list_evals_for_session(
1717 &self,
1718 session_id: &str,
1719 ) -> rusqlite::Result<Vec<crate::eval::types::EvalRow>> {
1720 let mut stmt = self.conn.prepare(
1721 "SELECT id, session_id, judge_model, rubric_id, score,
1722 rationale, flagged, created_at_ms
1723 FROM session_evals
1724 WHERE session_id = ?1
1725 ORDER BY created_at_ms DESC",
1726 )?;
1727 let rows = stmt.query_map(rusqlite::params![session_id], |r| {
1728 Ok(crate::eval::types::EvalRow {
1729 id: r.get(0)?,
1730 session_id: r.get(1)?,
1731 judge_model: r.get(2)?,
1732 rubric_id: r.get(3)?,
1733 score: r.get(4)?,
1734 rationale: r.get(5)?,
1735 flagged: r.get::<_, i64>(6)? != 0,
1736 created_at_ms: r.get::<_, i64>(7)? as u64,
1737 })
1738 })?;
1739 rows.collect()
1740 }
1741
1742 pub fn upsert_feedback(&self, r: &crate::feedback::types::FeedbackRecord) -> Result<()> {
1743 use crate::feedback::types::FeedbackLabel;
1744 self.conn.execute(
1745 "INSERT OR REPLACE INTO session_feedback
1746 (id, session_id, score, label, note, created_at_ms)
1747 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1748 rusqlite::params![
1749 r.id,
1750 r.session_id,
1751 r.score.as_ref().map(|s| s.0 as i64),
1752 r.label.as_ref().map(FeedbackLabel::to_db_str),
1753 r.note,
1754 r.created_at_ms as i64,
1755 ],
1756 )?;
1757 let payload = serde_json::to_string(r).unwrap_or_default();
1758 self.conn.execute(
1759 "INSERT INTO sync_outbox (session_id, kind, payload, sent)
1760 VALUES (?1, 'session_feedback', ?2, 0)",
1761 rusqlite::params![r.session_id, payload],
1762 )?;
1763 Ok(())
1764 }
1765
1766 pub fn list_feedback_in_window(
1767 &self,
1768 start_ms: u64,
1769 end_ms: u64,
1770 ) -> Result<Vec<crate::feedback::types::FeedbackRecord>> {
1771 let mut stmt = self.conn.prepare(
1772 "SELECT id, session_id, score, label, note, created_at_ms
1773 FROM session_feedback
1774 WHERE created_at_ms >= ?1 AND created_at_ms < ?2
1775 ORDER BY created_at_ms ASC",
1776 )?;
1777 let rows = stmt.query_map(
1778 rusqlite::params![start_ms as i64, end_ms as i64],
1779 feedback_row,
1780 )?;
1781 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
1782 }
1783
1784 pub fn feedback_for_sessions(
1785 &self,
1786 ids: &[String],
1787 ) -> Result<std::collections::HashMap<String, crate::feedback::types::FeedbackRecord>> {
1788 if ids.is_empty() {
1789 return Ok(std::collections::HashMap::new());
1790 }
1791 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1792 let sql = format!(
1793 "SELECT id, session_id, score, label, note, created_at_ms
1794 FROM session_feedback WHERE session_id IN ({placeholders})
1795 ORDER BY created_at_ms DESC"
1796 );
1797 let mut stmt = self.conn.prepare(&sql)?;
1798 let params: Vec<&dyn rusqlite::ToSql> =
1799 ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
1800 let rows = stmt.query_map(params.as_slice(), feedback_row)?;
1801 let mut map = std::collections::HashMap::new();
1802 for row in rows {
1803 let r = row?;
1804 map.entry(r.session_id.clone()).or_insert(r);
1805 }
1806 Ok(map)
1807 }
1808
1809 pub fn list_sessions_for_eval(
1810 &self,
1811 since_ms: u64,
1812 min_cost_usd: f64,
1813 ) -> Result<Vec<crate::core::event::SessionRecord>> {
1814 let min_cost_e6 = (min_cost_usd * 1_000_000.0) as i64;
1815 let mut stmt = self.conn.prepare(
1816 "SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms,
1817 s.status, s.trace_path, s.start_commit, s.end_commit, s.branch,
1818 s.dirty_start, s.dirty_end, s.repo_binding_source, s.prompt_fingerprint
1819 FROM sessions s
1820 WHERE s.started_at_ms >= ?1
1821 AND COALESCE((SELECT SUM(e.cost_usd_e6) FROM events e WHERE e.session_id = s.id), 0) >= ?2
1822 AND NOT EXISTS (SELECT 1 FROM session_evals ev WHERE ev.session_id = s.id)
1823 ORDER BY s.started_at_ms DESC",
1824 )?;
1825 let rows = stmt.query_map(params![since_ms as i64, min_cost_e6], |r| {
1826 Ok((
1827 r.get::<_, String>(0)?,
1828 r.get::<_, String>(1)?,
1829 r.get::<_, Option<String>>(2)?,
1830 r.get::<_, String>(3)?,
1831 r.get::<_, i64>(4)?,
1832 r.get::<_, Option<i64>>(5)?,
1833 r.get::<_, String>(6)?,
1834 r.get::<_, String>(7)?,
1835 r.get::<_, Option<String>>(8)?,
1836 r.get::<_, Option<String>>(9)?,
1837 r.get::<_, Option<String>>(10)?,
1838 r.get::<_, Option<i64>>(11)?,
1839 r.get::<_, Option<i64>>(12)?,
1840 r.get::<_, Option<String>>(13)?,
1841 r.get::<_, Option<String>>(14)?,
1842 ))
1843 })?;
1844 let mut out = Vec::new();
1845 for row in rows {
1846 let (
1847 id,
1848 agent,
1849 model,
1850 workspace,
1851 started,
1852 ended,
1853 status_str,
1854 trace,
1855 start_commit,
1856 end_commit,
1857 branch,
1858 dirty_start,
1859 dirty_end,
1860 source,
1861 prompt_fingerprint,
1862 ) = row?;
1863 out.push(crate::core::event::SessionRecord {
1864 id,
1865 agent,
1866 model,
1867 workspace,
1868 started_at_ms: started as u64,
1869 ended_at_ms: ended.map(|v| v as u64),
1870 status: status_from_str(&status_str),
1871 trace_path: trace,
1872 start_commit,
1873 end_commit,
1874 branch,
1875 dirty_start: dirty_start.map(i64_to_bool),
1876 dirty_end: dirty_end.map(i64_to_bool),
1877 repo_binding_source: source.and_then(|s| if s.is_empty() { None } else { Some(s) }),
1878 prompt_fingerprint,
1879 });
1880 }
1881 Ok(out)
1882 }
1883
1884 pub fn upsert_prompt_snapshot(&self, snap: &crate::prompt::PromptSnapshot) -> Result<()> {
1885 self.conn.execute(
1886 "INSERT OR IGNORE INTO prompt_snapshots
1887 (fingerprint, captured_at_ms, files_json, total_bytes)
1888 VALUES (?1, ?2, ?3, ?4)",
1889 params![
1890 snap.fingerprint,
1891 snap.captured_at_ms as i64,
1892 snap.files_json,
1893 snap.total_bytes as i64
1894 ],
1895 )?;
1896 Ok(())
1897 }
1898
1899 pub fn get_prompt_snapshot(
1900 &self,
1901 fingerprint: &str,
1902 ) -> Result<Option<crate::prompt::PromptSnapshot>> {
1903 self.conn
1904 .query_row(
1905 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1906 FROM prompt_snapshots WHERE fingerprint = ?1",
1907 params![fingerprint],
1908 |r| {
1909 Ok(crate::prompt::PromptSnapshot {
1910 fingerprint: r.get(0)?,
1911 captured_at_ms: r.get::<_, i64>(1)? as u64,
1912 files_json: r.get(2)?,
1913 total_bytes: r.get::<_, i64>(3)? as u64,
1914 })
1915 },
1916 )
1917 .optional()
1918 .map_err(Into::into)
1919 }
1920
1921 pub fn list_prompt_snapshots(&self) -> Result<Vec<crate::prompt::PromptSnapshot>> {
1922 let mut stmt = self.conn.prepare(
1923 "SELECT fingerprint, captured_at_ms, files_json, total_bytes
1924 FROM prompt_snapshots ORDER BY captured_at_ms DESC",
1925 )?;
1926 let rows = stmt.query_map([], |r| {
1927 Ok(crate::prompt::PromptSnapshot {
1928 fingerprint: r.get(0)?,
1929 captured_at_ms: r.get::<_, i64>(1)? as u64,
1930 files_json: r.get(2)?,
1931 total_bytes: r.get::<_, i64>(3)? as u64,
1932 })
1933 })?;
1934 Ok(rows.filter_map(|r| r.ok()).collect())
1935 }
1936
1937 pub fn sessions_with_prompt_fingerprint(
1939 &self,
1940 workspace: &str,
1941 start_ms: u64,
1942 end_ms: u64,
1943 ) -> Result<Vec<(String, String)>> {
1944 let mut stmt = self.conn.prepare(
1945 "SELECT id, prompt_fingerprint FROM sessions
1946 WHERE workspace = ?1
1947 AND started_at_ms >= ?2 AND started_at_ms < ?3
1948 AND prompt_fingerprint IS NOT NULL",
1949 )?;
1950 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |r| {
1951 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1952 })?;
1953 Ok(rows.filter_map(|r| r.ok()).collect())
1954 }
1955}
1956
1957fn now_ms() -> u64 {
1958 std::time::SystemTime::now()
1959 .duration_since(std::time::UNIX_EPOCH)
1960 .unwrap_or_default()
1961 .as_millis() as u64
1962}
1963
1964fn count_q(conn: &Connection, sql: &str, workspace: &str) -> Result<u64> {
1965 Ok(conn.query_row(sql, params![workspace], |r| r.get::<_, i64>(0))? as u64)
1966}
1967
1968fn cost_stats(conn: &Connection, workspace: &str) -> Result<(i64, u64)> {
1969 let cost: i64 = conn.query_row(
1970 "SELECT COALESCE(SUM(e.cost_usd_e6),0) FROM events e JOIN sessions s ON s.id=e.session_id WHERE s.workspace=?1",
1971 params![workspace], |r| r.get(0),
1972 )?;
1973 let with_cost: i64 = conn.query_row(
1974 "SELECT COUNT(DISTINCT s.id) FROM sessions s JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 AND e.cost_usd_e6 IS NOT NULL",
1975 params![workspace], |r| r.get(0),
1976 )?;
1977 Ok((cost, with_cost as u64))
1978}
1979
1980fn feedback_row(r: &rusqlite::Row<'_>) -> rusqlite::Result<crate::feedback::types::FeedbackRecord> {
1981 use crate::feedback::types::{FeedbackLabel, FeedbackRecord, FeedbackScore};
1982 let score = r
1983 .get::<_, Option<i64>>(2)?
1984 .and_then(|v| FeedbackScore::new(v as u8));
1985 let label = r
1986 .get::<_, Option<String>>(3)?
1987 .and_then(|s| FeedbackLabel::from_str_opt(&s));
1988 Ok(FeedbackRecord {
1989 id: r.get(0)?,
1990 session_id: r.get(1)?,
1991 score,
1992 label,
1993 note: r.get(4)?,
1994 created_at_ms: r.get::<_, i64>(5)? as u64,
1995 })
1996}
1997
1998fn day_label(day_idx: u64) -> &'static str {
1999 ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][((day_idx + 4) % 7) as usize]
2000}
2001
2002fn sessions_by_day_7(conn: &Connection, workspace: &str, now: u64) -> Result<Vec<(String, u64)>> {
2003 let week_ago = now.saturating_sub(7 * 86_400_000);
2004 let mut stmt = conn
2005 .prepare("SELECT started_at_ms FROM sessions WHERE workspace=?1 AND started_at_ms>=?2")?;
2006 let days: Vec<u64> = stmt
2007 .query_map(params![workspace, week_ago as i64], |r| r.get::<_, i64>(0))?
2008 .filter_map(|r| r.ok())
2009 .map(|v| v as u64 / 86_400_000)
2010 .collect();
2011 let today = now / 86_400_000;
2012 Ok((0u64..7)
2013 .map(|i| {
2014 let d = today.saturating_sub(6 - i);
2015 (
2016 day_label(d).to_string(),
2017 days.iter().filter(|&&x| x == d).count() as u64,
2018 )
2019 })
2020 .collect())
2021}
2022
2023fn recent_sessions_3(conn: &Connection, workspace: &str) -> Result<Vec<(SessionRecord, u64)>> {
2024 let sql = "SELECT s.id,s.agent,s.model,s.workspace,s.started_at_ms,s.ended_at_ms,\
2025 s.status,s.trace_path,s.start_commit,s.end_commit,s.branch,s.dirty_start,\
2026 s.dirty_end,s.repo_binding_source,COUNT(e.id) FROM sessions s \
2027 LEFT JOIN events e ON e.session_id=s.id WHERE s.workspace=?1 \
2028 GROUP BY s.id ORDER BY s.started_at_ms DESC LIMIT 3";
2029 let mut stmt = conn.prepare(sql)?;
2030 let out: Vec<(SessionRecord, u64)> = stmt
2031 .query_map(params![workspace], |r| {
2032 let st: String = r.get(6)?;
2033 Ok((
2034 SessionRecord {
2035 id: r.get(0)?,
2036 agent: r.get(1)?,
2037 model: r.get(2)?,
2038 workspace: r.get(3)?,
2039 started_at_ms: r.get::<_, i64>(4)? as u64,
2040 ended_at_ms: r.get::<_, Option<i64>>(5)?.map(|v| v as u64),
2041 status: status_from_str(&st),
2042 trace_path: r.get(7)?,
2043 start_commit: r.get(8)?,
2044 end_commit: r.get(9)?,
2045 branch: r.get(10)?,
2046 dirty_start: r.get::<_, Option<i64>>(11)?.map(i64_to_bool),
2047 dirty_end: r.get::<_, Option<i64>>(12)?.map(i64_to_bool),
2048 repo_binding_source: empty_to_none(r.get::<_, String>(13)?),
2049 prompt_fingerprint: None,
2050 },
2051 r.get::<_, i64>(14)? as u64,
2052 ))
2053 })?
2054 .filter_map(|r| r.ok())
2055 .collect();
2056 Ok(out)
2057}
2058
2059fn top_tools_5(conn: &Connection, workspace: &str) -> Result<Vec<(String, u64)>> {
2060 let mut stmt = conn.prepare(
2061 "SELECT tool, COUNT(*) FROM events e JOIN sessions s ON s.id=e.session_id \
2062 WHERE s.workspace=?1 AND tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 5",
2063 )?;
2064 let out: Vec<(String, u64)> = stmt
2065 .query_map(params![workspace], |r| {
2066 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
2067 })?
2068 .filter_map(|r| r.ok())
2069 .collect();
2070 Ok(out)
2071}
2072
2073fn status_from_str(s: &str) -> SessionStatus {
2074 match s {
2075 "Running" => SessionStatus::Running,
2076 "Waiting" => SessionStatus::Waiting,
2077 "Idle" => SessionStatus::Idle,
2078 _ => SessionStatus::Done,
2079 }
2080}
2081
2082fn kind_from_str(s: &str) -> EventKind {
2083 match s {
2084 "ToolCall" => EventKind::ToolCall,
2085 "ToolResult" => EventKind::ToolResult,
2086 "Message" => EventKind::Message,
2087 "Error" => EventKind::Error,
2088 "Cost" => EventKind::Cost,
2089 _ => EventKind::Hook,
2090 }
2091}
2092
2093fn source_from_str(s: &str) -> EventSource {
2094 match s {
2095 "Tail" => EventSource::Tail,
2096 "Hook" => EventSource::Hook,
2097 _ => EventSource::Proxy,
2098 }
2099}
2100
2101fn ensure_schema_columns(conn: &Connection) -> Result<()> {
2102 ensure_column(conn, "sessions", "start_commit", "TEXT")?;
2103 ensure_column(conn, "sessions", "end_commit", "TEXT")?;
2104 ensure_column(conn, "sessions", "branch", "TEXT")?;
2105 ensure_column(conn, "sessions", "dirty_start", "INTEGER")?;
2106 ensure_column(conn, "sessions", "dirty_end", "INTEGER")?;
2107 ensure_column(
2108 conn,
2109 "sessions",
2110 "repo_binding_source",
2111 "TEXT NOT NULL DEFAULT ''",
2112 )?;
2113 ensure_column(conn, "events", "ts_exact", "INTEGER NOT NULL DEFAULT 0")?;
2114 ensure_column(conn, "events", "tool_call_id", "TEXT")?;
2115 ensure_column(conn, "events", "reasoning_tokens", "INTEGER")?;
2116 ensure_column(
2117 conn,
2118 "sync_outbox",
2119 "kind",
2120 "TEXT NOT NULL DEFAULT 'events'",
2121 )?;
2122 ensure_column(
2123 conn,
2124 "experiments",
2125 "state",
2126 "TEXT NOT NULL DEFAULT 'Draft'",
2127 )?;
2128 ensure_column(conn, "experiments", "concluded_at_ms", "INTEGER")?;
2129 ensure_column(conn, "sessions", "prompt_fingerprint", "TEXT")?;
2130 Ok(())
2131}
2132
2133fn ensure_column(conn: &Connection, table: &str, column: &str, sql_type: &str) -> Result<()> {
2134 if has_column(conn, table, column)? {
2135 return Ok(());
2136 }
2137 conn.execute(
2138 &format!("ALTER TABLE {table} ADD COLUMN {column} {sql_type}"),
2139 [],
2140 )?;
2141 Ok(())
2142}
2143
2144fn has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
2145 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2146 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2147 Ok(rows.filter_map(|r| r.ok()).any(|name| name == column))
2148}
2149
2150fn bool_to_i64(v: bool) -> i64 {
2151 if v { 1 } else { 0 }
2152}
2153
2154fn i64_to_bool(v: i64) -> bool {
2155 v != 0
2156}
2157
2158fn empty_to_none(s: String) -> Option<String> {
2159 if s.is_empty() { None } else { Some(s) }
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164 use super::*;
2165 use serde_json::json;
2166 use std::collections::HashSet;
2167 use tempfile::TempDir;
2168
2169 fn make_session(id: &str) -> SessionRecord {
2170 SessionRecord {
2171 id: id.to_string(),
2172 agent: "cursor".to_string(),
2173 model: None,
2174 workspace: "/ws".to_string(),
2175 started_at_ms: 1000,
2176 ended_at_ms: None,
2177 status: SessionStatus::Done,
2178 trace_path: "/trace".to_string(),
2179 start_commit: None,
2180 end_commit: None,
2181 branch: None,
2182 dirty_start: None,
2183 dirty_end: None,
2184 repo_binding_source: None,
2185 prompt_fingerprint: None,
2186 }
2187 }
2188
2189 fn make_event(session_id: &str, seq: u64) -> Event {
2190 Event {
2191 session_id: session_id.to_string(),
2192 seq,
2193 ts_ms: 1000 + seq * 100,
2194 ts_exact: false,
2195 kind: EventKind::ToolCall,
2196 source: EventSource::Tail,
2197 tool: Some("read_file".to_string()),
2198 tool_call_id: Some(format!("call_{seq}")),
2199 tokens_in: None,
2200 tokens_out: None,
2201 reasoning_tokens: None,
2202 cost_usd_e6: None,
2203 payload: json!({}),
2204 }
2205 }
2206
2207 #[test]
2208 fn open_and_wal_mode() {
2209 let dir = TempDir::new().unwrap();
2210 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2211 let mode: String = store
2212 .conn
2213 .query_row("PRAGMA journal_mode", [], |r| r.get(0))
2214 .unwrap();
2215 assert_eq!(mode, "wal");
2216 }
2217
2218 #[test]
2219 fn upsert_and_get_session() {
2220 let dir = TempDir::new().unwrap();
2221 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2222 let s = make_session("s1");
2223 store.upsert_session(&s).unwrap();
2224
2225 let got = store.get_session("s1").unwrap().unwrap();
2226 assert_eq!(got.id, "s1");
2227 assert_eq!(got.status, SessionStatus::Done);
2228 }
2229
2230 #[test]
2231 fn append_and_list_events_round_trip() {
2232 let dir = TempDir::new().unwrap();
2233 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2234 let s = make_session("s2");
2235 store.upsert_session(&s).unwrap();
2236 store.append_event(&make_event("s2", 0)).unwrap();
2237 store.append_event(&make_event("s2", 1)).unwrap();
2238
2239 let sessions = store.list_sessions("/ws").unwrap();
2240 assert_eq!(sessions.len(), 1);
2241 assert_eq!(sessions[0].id, "s2");
2242 }
2243
2244 #[test]
2245 fn summary_stats_empty() {
2246 let dir = TempDir::new().unwrap();
2247 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2248 let stats = store.summary_stats("/ws").unwrap();
2249 assert_eq!(stats.session_count, 0);
2250 assert_eq!(stats.total_cost_usd_e6, 0);
2251 }
2252
2253 #[test]
2254 fn summary_stats_counts_sessions() {
2255 let dir = TempDir::new().unwrap();
2256 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2257 store.upsert_session(&make_session("a")).unwrap();
2258 store.upsert_session(&make_session("b")).unwrap();
2259 let stats = store.summary_stats("/ws").unwrap();
2260 assert_eq!(stats.session_count, 2);
2261 assert_eq!(stats.by_agent.len(), 1);
2262 assert_eq!(stats.by_agent[0].0, "cursor");
2263 assert_eq!(stats.by_agent[0].1, 2);
2264 }
2265
2266 #[test]
2267 fn list_events_for_session_round_trip() {
2268 let dir = TempDir::new().unwrap();
2269 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2270 store.upsert_session(&make_session("s4")).unwrap();
2271 store.append_event(&make_event("s4", 0)).unwrap();
2272 store.append_event(&make_event("s4", 1)).unwrap();
2273 let events = store.list_events_for_session("s4").unwrap();
2274 assert_eq!(events.len(), 2);
2275 assert_eq!(events[0].seq, 0);
2276 assert_eq!(events[1].seq, 1);
2277 }
2278
2279 #[test]
2280 fn append_event_dedup() {
2281 let dir = TempDir::new().unwrap();
2282 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2283 store.upsert_session(&make_session("s5")).unwrap();
2284 store.append_event(&make_event("s5", 0)).unwrap();
2285 store.append_event(&make_event("s5", 0)).unwrap();
2287 let events = store.list_events_for_session("s5").unwrap();
2288 assert_eq!(events.len(), 1);
2289 }
2290
2291 #[test]
2292 fn upsert_idempotent() {
2293 let dir = TempDir::new().unwrap();
2294 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2295 let mut s = make_session("s3");
2296 store.upsert_session(&s).unwrap();
2297 s.status = SessionStatus::Running;
2298 store.upsert_session(&s).unwrap();
2299
2300 let got = store.get_session("s3").unwrap().unwrap();
2301 assert_eq!(got.status, SessionStatus::Running);
2302 }
2303
2304 #[test]
2305 fn append_event_indexes_path_from_payload() {
2306 let dir = TempDir::new().unwrap();
2307 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2308 store.upsert_session(&make_session("sx")).unwrap();
2309 let mut ev = make_event("sx", 0);
2310 ev.payload = json!({"input": {"path": "src/lib.rs"}});
2311 store.append_event(&ev).unwrap();
2312 let ft = store.files_touched_in_window("/ws", 0, 10_000).unwrap();
2313 assert_eq!(ft, vec![("sx".to_string(), "src/lib.rs".to_string())]);
2314 }
2315
2316 #[test]
2317 fn update_session_status_changes_status() {
2318 let dir = TempDir::new().unwrap();
2319 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2320 store.upsert_session(&make_session("s6")).unwrap();
2321 store
2322 .update_session_status("s6", SessionStatus::Running)
2323 .unwrap();
2324 let got = store.get_session("s6").unwrap().unwrap();
2325 assert_eq!(got.status, SessionStatus::Running);
2326 }
2327
2328 #[test]
2329 fn prune_sessions_removes_old_rows_and_keeps_recent() {
2330 let dir = TempDir::new().unwrap();
2331 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2332 let mut old = make_session("old");
2333 old.started_at_ms = 1_000;
2334 let mut new = make_session("new");
2335 new.started_at_ms = 9_000_000_000_000;
2336 store.upsert_session(&old).unwrap();
2337 store.upsert_session(&new).unwrap();
2338 store.append_event(&make_event("old", 0)).unwrap();
2339
2340 let stats = store.prune_sessions_started_before(5_000).unwrap();
2341 assert_eq!(
2342 stats,
2343 PruneStats {
2344 sessions_removed: 1,
2345 events_removed: 1,
2346 }
2347 );
2348 assert!(store.get_session("old").unwrap().is_none());
2349 assert!(store.get_session("new").unwrap().is_some());
2350 let sessions = store.list_sessions("/ws").unwrap();
2351 assert_eq!(sessions.len(), 1);
2352 assert_eq!(sessions[0].id, "new");
2353 }
2354
2355 #[test]
2356 fn append_event_indexes_rules_from_payload() {
2357 let dir = TempDir::new().unwrap();
2358 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2359 store.upsert_session(&make_session("sr")).unwrap();
2360 let mut ev = make_event("sr", 0);
2361 ev.payload = json!({"path": ".cursor/rules/my-rule.mdc"});
2362 store.append_event(&ev).unwrap();
2363 let r = store.rules_used_in_window("/ws", 0, 10_000).unwrap();
2364 assert_eq!(r, vec![("sr".to_string(), "my-rule".to_string())]);
2365 }
2366
2367 #[test]
2368 fn guidance_report_counts_skill_and_rule_sessions() {
2369 let dir = TempDir::new().unwrap();
2370 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2371 store.upsert_session(&make_session("sx")).unwrap();
2372 let mut ev = make_event("sx", 0);
2373 ev.payload =
2374 json!({"text": "read .cursor/skills/tdd/SKILL.md and .cursor/rules/style.mdc"});
2375 ev.cost_usd_e6 = Some(500_000);
2376 store.append_event(&ev).unwrap();
2377
2378 let mut skill_slugs = HashSet::new();
2379 skill_slugs.insert("tdd".into());
2380 let mut rule_slugs = HashSet::new();
2381 rule_slugs.insert("style".into());
2382
2383 let rep = store
2384 .guidance_report("/ws", 0, 10_000, &skill_slugs, &rule_slugs)
2385 .unwrap();
2386 assert_eq!(rep.sessions_in_window, 1);
2387 let tdd = rep
2388 .rows
2389 .iter()
2390 .find(|r| r.id == "tdd" && r.kind == GuidanceKind::Skill)
2391 .unwrap();
2392 assert_eq!(tdd.sessions, 1);
2393 assert!(tdd.on_disk);
2394 let style = rep
2395 .rows
2396 .iter()
2397 .find(|r| r.id == "style" && r.kind == GuidanceKind::Rule)
2398 .unwrap();
2399 assert_eq!(style.sessions, 1);
2400 assert!(style.on_disk);
2401 }
2402
2403 #[test]
2404 fn prune_sessions_removes_rules_used_rows() {
2405 let dir = TempDir::new().unwrap();
2406 let store = Store::open(&dir.path().join("kaizen.db")).unwrap();
2407 let mut old = make_session("old_r");
2408 old.started_at_ms = 1_000;
2409 store.upsert_session(&old).unwrap();
2410 let mut ev = make_event("old_r", 0);
2411 ev.payload = json!({"path": ".cursor/rules/x.mdc"});
2412 store.append_event(&ev).unwrap();
2413
2414 store.prune_sessions_started_before(5_000).unwrap();
2415 let n: i64 = store
2416 .conn
2417 .query_row("SELECT COUNT(*) FROM rules_used", [], |r| r.get(0))
2418 .unwrap();
2419 assert_eq!(n, 0);
2420 }
2421}