Skip to main content

kaizen/store/sqlite/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8    FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
16use crate::sync::context::SyncIngestContext;
17use crate::sync::outbound::outbound_event_from_row;
18use crate::sync::redact::redact_payload;
19use crate::sync::smart::enqueue_tool_spans_for_session;
20use anyhow::{Context, Result};
21use rusqlite::types::Value;
22use rusqlite::{
23    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
24};
25use std::cell::RefCell;
26use std::collections::{HashMap, HashSet};
27use std::path::{Path, PathBuf};
28
29/// Max `ts_ms` still treated as transcript-only synthetic timing (seq-based fallbacks).
30/// Rows below this use `sessions.started_at_ms` for time-window matching.
31pub(super) const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
32pub(super) const DEFAULT_MMAP_MB: u64 = 256;
33pub(super) const SESSION_SELECT: &str =
34    "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
35    status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
36    repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
37    repo_file_count, repo_total_loc FROM sessions";
38pub(super) const PAIN_HOTSPOTS_SQL: &str = "
39    SELECT f.path,
40           COUNT(s.id) * f.complexity_total AS value,
41           f.complexity_total,
42           f.churn_30d
43    FROM file_facts f
44    LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
45    LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
46       AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
47         OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
48    LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
49    WHERE f.snapshot_id = ?1
50    GROUP BY f.path, f.complexity_total, f.churn_30d
51    ORDER BY value DESC, f.path ASC
52    LIMIT 10";
53pub(super) const TOOL_RANK_ROWS_SQL: &str = "
54    WITH scoped AS (
55      SELECT COALESCE(ts.tool, 'unknown') AS tool,
56             ts.lead_time_ms,
57             COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
58                 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
59             COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
60      FROM tool_spans ts
61      JOIN sessions s ON s.id = ts.session_id
62      WHERE s.workspace = ?1
63        AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
64          OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
65    ),
66    agg AS (
67      SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
68             SUM(reasoning_tokens) AS total_reasoning_tokens
69      FROM scoped GROUP BY tool
70    ),
71    lat AS (
72      SELECT tool, lead_time_ms,
73             ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
74             COUNT(*) OVER (PARTITION BY tool) AS n
75      FROM scoped WHERE lead_time_ms IS NOT NULL
76    ),
77    pct AS (
78      SELECT tool,
79             MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
80             MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
81      FROM lat GROUP BY tool
82    )
83    SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
84           agg.total_tokens, agg.total_reasoning_tokens
85    FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
86
87/// Per-workspace activity dashboard stats.
88#[derive(Clone)]
89pub struct InsightsStats {
90    pub total_sessions: u64,
91    pub running_sessions: u64,
92    pub total_events: u64,
93    /// (day label e.g. "Mon", count) last 7 days oldest first
94    pub sessions_by_day: Vec<(String, u64)>,
95    /// Recent sessions DESC by started_at, max 3; paired with event count
96    pub recent: Vec<(SessionRecord, u64)>,
97    /// Top tools by event count, max 5
98    pub top_tools: Vec<(String, u64)>,
99    pub total_cost_usd_e6: i64,
100    pub sessions_with_cost: u64,
101}
102
103/// Sync daemon / outbox status for `kaizen sync status`.
104pub struct SyncStatusSnapshot {
105    pub pending_outbox: u64,
106    pub last_success_ms: Option<u64>,
107    pub last_error: Option<String>,
108    pub consecutive_failures: u32,
109}
110
111/// Aggregate stats across sessions + events for a workspace.
112#[derive(serde::Serialize)]
113pub struct SummaryStats {
114    pub session_count: u64,
115    pub total_cost_usd_e6: i64,
116    pub by_agent: Vec<(String, u64)>,
117    pub by_model: Vec<(String, u64)>,
118    pub top_tools: Vec<(String, u64)>,
119}
120
121/// Skill vs Cursor rule for [`GuidancePerfRow`].
122#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
123#[serde(rename_all = "lowercase")]
124pub enum GuidanceKind {
125    Skill,
126    Rule,
127}
128
129/// One row for `kaizen guidance` — observed references in payloads (not Cursor auto-load counts).
130#[derive(Clone, Debug, serde::Serialize)]
131pub struct GuidancePerfRow {
132    pub kind: GuidanceKind,
133    pub id: String,
134    pub sessions: u64,
135    pub sessions_pct: f64,
136    pub total_cost_usd_e6: i64,
137    pub avg_cost_per_session_usd: Option<f64>,
138    pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
139    pub on_disk: bool,
140}
141
142/// Aggregated skill/rule adoption and cost proxy for a time window.
143#[derive(Clone, Debug, serde::Serialize)]
144pub struct GuidanceReport {
145    pub workspace: String,
146    pub window_start_ms: u64,
147    pub window_end_ms: u64,
148    pub sessions_in_window: u64,
149    pub workspace_avg_cost_per_session_usd: Option<f64>,
150    pub rows: Vec<GuidancePerfRow>,
151}
152
153/// Result of [`Store::prune_sessions_started_before`].
154#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
155pub struct PruneStats {
156    pub sessions_removed: u64,
157    pub events_removed: u64,
158}
159
160/// Row in `session_outcomes` (Tier C — post-stop test/lint snapshot).
161#[derive(Debug, Clone, Eq, PartialEq)]
162pub struct SessionOutcomeRow {
163    pub session_id: String,
164    pub test_passed: Option<i64>,
165    pub test_failed: Option<i64>,
166    pub test_skipped: Option<i64>,
167    pub build_ok: Option<bool>,
168    pub lint_errors: Option<i64>,
169    pub revert_lines_14d: Option<i64>,
170    pub pr_open: Option<i64>,
171    pub ci_ok: Option<bool>,
172    pub measured_at_ms: u64,
173    pub measure_error: Option<String>,
174}
175
176/// Aggregated process samples for retro (Tier D).
177#[derive(Debug, Clone)]
178pub struct SessionSampleAgg {
179    pub session_id: String,
180    pub sample_count: u64,
181    pub max_cpu_percent: f64,
182    pub max_rss_bytes: u64,
183}
184
185/// `sync_state` keys for agent rescan throttling and auto-prune.
186pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
187pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
188pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
189
190pub struct ToolSpanSyncRow {
191    pub span_id: String,
192    pub session_id: String,
193    pub tool: Option<String>,
194    pub tool_call_id: Option<String>,
195    pub status: String,
196    pub started_at_ms: Option<u64>,
197    pub ended_at_ms: Option<u64>,
198    pub lead_time_ms: Option<u64>,
199    pub tokens_in: Option<u32>,
200    pub tokens_out: Option<u32>,
201    pub reasoning_tokens: Option<u32>,
202    pub cost_usd_e6: Option<i64>,
203    pub paths: Vec<String>,
204}
205
206pub(crate) struct CaptureQualityRow {
207    pub source: String,
208    pub has_tokens: bool,
209    pub has_latency: bool,
210    pub has_context: bool,
211}
212
213pub(crate) struct TraceSpanQualityRow {
214    pub kind: String,
215    pub is_orphan: bool,
216}
217
218#[derive(Debug, Clone, Copy, Eq, PartialEq)]
219pub enum StoreOpenMode {
220    ReadWrite,
221    ReadOnlyQuery,
222}
223
224#[derive(Debug, Clone)]
225pub struct SessionStatusRow {
226    pub id: String,
227    pub status: SessionStatus,
228    pub ended_at_ms: Option<u64>,
229}
230
231#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
232pub struct SessionFilter {
233    pub agent_prefix: Option<String>,
234    pub status: Option<SessionStatus>,
235    pub since_ms: Option<u64>,
236}
237
238#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
239pub struct SessionPage {
240    pub rows: Vec<SessionRecord>,
241    pub total: usize,
242    pub next_offset: Option<usize>,
243}
244
245#[derive(Clone)]
246struct SpanTreeCacheEntry {
247    session_id: String,
248    last_event_seq: Option<u64>,
249    nodes: Vec<crate::store::span_tree::SpanNode>,
250}
251
252pub struct Store {
253    conn: Connection,
254    root: PathBuf,
255    hot_log: RefCell<Option<HotLog>>,
256    search_writer: RefCell<Option<crate::search::PendingWriter>>,
257    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
258    projector: RefCell<Projector>,
259}
260
261mod artifact_windows;
262mod evals;
263mod event_projector;
264mod event_read;
265mod event_write;
266mod events;
267mod experiment_windows;
268mod feedback;
269mod guidance;
270mod maintenance;
271mod metrics;
272mod outcomes;
273mod prompts;
274mod report_windows;
275mod reports;
276mod rows;
277mod samples;
278mod schema;
279mod session_read;
280mod session_window;
281mod sessions;
282mod sync;
283#[cfg(test)]
284mod tests;
285mod tool_span_sync;
286mod tool_spans;
287mod trace_spans;
288
289pub(super) fn now_ms() -> u64 {
290    std::time::SystemTime::now()
291        .duration_since(std::time::UNIX_EPOCH)
292        .unwrap_or_default()
293        .as_millis() as u64
294}
295
296impl Store {
297    pub(crate) fn conn(&self) -> &Connection {
298        &self.conn
299    }
300
301    pub fn open(path: &Path) -> Result<Self> {
302        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
303    }
304
305    pub fn open_read_only(path: &Path) -> Result<Self> {
306        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
307    }
308
309    pub fn open_query(path: &Path) -> Result<Self> {
310        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
311    }
312
313    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
314        if let Some(parent) = path.parent() {
315            std::fs::create_dir_all(parent)?;
316        }
317        let conn = match mode {
318            StoreOpenMode::ReadWrite => Connection::open(path),
319            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
320                path,
321                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
322            ),
323        }
324        .with_context(|| format!("open db: {}", path.display()))?;
325        schema::apply_pragmas(&conn, mode)?;
326        if mode == StoreOpenMode::ReadWrite {
327            for sql in schema::MIGRATIONS {
328                conn.execute_batch(sql)?;
329            }
330            schema::ensure_schema_columns(&conn)?;
331        }
332        let store = Self {
333            conn,
334            root: path
335                .parent()
336                .unwrap_or_else(|| Path::new("."))
337                .to_path_buf(),
338            hot_log: RefCell::new(None),
339            search_writer: RefCell::new(None),
340            span_tree_cache: RefCell::new(None),
341            projector: RefCell::new(Projector::default()),
342        };
343        if mode == StoreOpenMode::ReadWrite {
344            store.warm_projector()?;
345        }
346        Ok(store)
347    }
348
349    pub(super) fn invalidate_span_tree_cache(&self) {
350        *self.span_tree_cache.borrow_mut() = None;
351    }
352
353    pub(super) fn warm_projector(&self) -> Result<()> {
354        let ids = self.running_session_ids()?;
355        let mut projector = self.projector.borrow_mut();
356        for id in ids {
357            for event in self.list_events_for_session(&id)? {
358                let _ = projector.apply(&event);
359            }
360        }
361        Ok(())
362    }
363
364    pub(super) fn outbox(&self) -> Result<Outbox> {
365        Outbox::open(&self.root)
366    }
367}
368
369impl Drop for Store {
370    fn drop(&mut self) {
371        if let Some(writer) = self.search_writer.get_mut().as_mut() {
372            let _ = writer.commit();
373        }
374    }
375}