Skip to main content

kaizen/store/sqlite/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync SQLite store. WAL mode, schema migrations as ordered SQL strings.
3
4use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8    FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13    clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::sync::context::SyncIngestContext;
16use crate::sync::outbound::outbound_event_from_row;
17use crate::sync::redact::redact_payload;
18use crate::sync::smart::enqueue_tool_spans_for_session;
19use anyhow::{Context, Result};
20use rusqlite::types::Value;
21use rusqlite::{
22    Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
23};
24use std::cell::RefCell;
25use std::collections::{HashMap, HashSet};
26use std::path::{Path, PathBuf};
27
28pub(super) use constants::{DEFAULT_CACHE_KIB, DEFAULT_MMAP_MB, SYNTHETIC_TS_CEILING_MS};
29pub use constants::{
30    SYNC_STATE_LAST_AGENT_SCAN_MS, SYNC_STATE_LAST_AUTO_PRUNE_MS, SYNC_STATE_SEARCH_DIRTY_MS,
31};
32pub(crate) use contracts::{CaptureQualityRow, TraceSpanQualityRow};
33pub use contracts::{
34    GuidanceKind, GuidancePerfRow, GuidanceReport, InsightsStats, PruneStats, SessionFilter,
35    SessionOutcomeRow, SessionPage, SessionSampleAgg, SessionStatusRow, StoreOpenMode,
36    SummaryStats, SyncStatusSnapshot, ToolSpanSyncRow,
37};
38pub(super) use sql::{PAIN_HOTSPOTS_SQL, SESSION_SELECT, TOOL_RANK_ROWS_SQL};
39
40#[derive(Clone)]
41struct SpanTreeCacheEntry {
42    session_id: String,
43    last_event_seq: Option<u64>,
44    nodes: Vec<crate::store::span_tree::SpanNode>,
45}
46
47pub struct Store {
48    conn: Connection,
49    root: PathBuf,
50    search_writer: RefCell<Option<crate::search::PendingWriter>>,
51    span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
52    projector: RefCell<Projector>,
53}
54
55mod artifact_windows;
56mod constants;
57mod contracts;
58mod evals;
59mod event_projector;
60mod event_read;
61mod event_write;
62mod events;
63mod experiment_windows;
64mod feedback;
65mod guidance;
66mod guidance_candidates;
67mod maintenance;
68mod metrics;
69mod outbox_migration;
70mod outcomes;
71mod prompts;
72mod report_windows;
73mod reports;
74mod rows;
75mod samples;
76mod schema;
77mod session_read;
78mod session_window;
79mod sessions;
80mod sql;
81mod sync;
82#[cfg(test)]
83mod tests;
84mod tool_span_sync;
85mod tool_spans;
86mod trace_spans;
87mod visualization;
88
89pub(super) fn now_ms() -> u64 {
90    std::time::SystemTime::now()
91        .duration_since(std::time::UNIX_EPOCH)
92        .unwrap_or_default()
93        .as_millis() as u64
94}
95
96impl Store {
97    pub(crate) fn conn(&self) -> &Connection {
98        &self.conn
99    }
100
101    pub fn open(path: &Path) -> Result<Self> {
102        Self::open_with_mode(path, StoreOpenMode::ReadWrite)
103    }
104
105    pub fn open_read_only(path: &Path) -> Result<Self> {
106        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
107    }
108
109    pub fn open_query(path: &Path) -> Result<Self> {
110        Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
111    }
112
113    pub(crate) fn open_empty(root: &Path) -> Result<Self> {
114        let conn = Connection::open_in_memory().context("open empty in-memory store")?;
115        initialize_empty(&conn)?;
116        Ok(store_from_connection(conn, root.to_path_buf()))
117    }
118
119    pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
120        prepare_parent(path, mode)?;
121        let conn = match mode {
122            StoreOpenMode::ReadWrite => Connection::open(path),
123            StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
124                path,
125                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
126            ),
127        }
128        .with_context(|| format!("open db: {}", path.display()))?;
129        schema::apply_pragmas(&conn, mode)?;
130        if mode == StoreOpenMode::ReadWrite {
131            for sql in schema::MIGRATIONS {
132                conn.execute_batch(sql)?;
133            }
134            schema::ensure_schema_columns(&conn)?;
135            outbox_migration::migrate(&conn, path.parent().unwrap_or_else(|| Path::new(".")))?;
136        }
137        let root = path
138            .parent()
139            .unwrap_or_else(|| Path::new("."))
140            .to_path_buf();
141        Ok(store_from_connection(conn, root))
142    }
143
144    pub(super) fn invalidate_span_tree_cache(&self) {
145        *self.span_tree_cache.borrow_mut() = None;
146    }
147}
148
149fn initialize_empty(conn: &Connection) -> Result<()> {
150    schema::apply_pragmas(conn, StoreOpenMode::ReadWrite)?;
151    schema::MIGRATIONS
152        .iter()
153        .try_for_each(|statement| conn.execute_batch(statement))?;
154    schema::ensure_schema_columns(conn)
155}
156
157fn store_from_connection(conn: Connection, root: PathBuf) -> Store {
158    Store {
159        conn,
160        root,
161        search_writer: RefCell::new(None),
162        span_tree_cache: RefCell::new(None),
163        projector: RefCell::new(Projector::default()),
164    }
165}
166
167fn prepare_parent(path: &Path, mode: StoreOpenMode) -> Result<()> {
168    if mode == StoreOpenMode::ReadOnlyQuery {
169        return Ok(());
170    }
171    if let Some(parent) = path.parent() {
172        std::fs::create_dir_all(parent)?;
173    }
174    Ok(())
175}
176
177impl Drop for Store {
178    fn drop(&mut self) {
179        if let Some(writer) = self.search_writer.get_mut().as_mut() {
180            let _ = writer.commit();
181        }
182    }
183}