kaizen/store/sqlite/
mod.rs1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8 FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::sync::context::SyncIngestContext;
16use crate::sync::outbound::outbound_event_from_row;
17use crate::sync::redact::redact_payload;
18use crate::sync::smart::enqueue_tool_spans_for_session;
19use anyhow::{Context, Result};
20use rusqlite::types::Value;
21use rusqlite::{
22 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
23};
24use std::cell::RefCell;
25use std::collections::{HashMap, HashSet};
26use std::path::{Path, PathBuf};
27
28pub(super) use constants::{DEFAULT_CACHE_KIB, DEFAULT_MMAP_MB, SYNTHETIC_TS_CEILING_MS};
29pub use constants::{
30 SYNC_STATE_LAST_AGENT_SCAN_MS, SYNC_STATE_LAST_AUTO_PRUNE_MS, SYNC_STATE_SEARCH_DIRTY_MS,
31};
32pub(crate) use contracts::{CaptureQualityRow, TraceSpanQualityRow};
33pub use contracts::{
34 GuidanceKind, GuidancePerfRow, GuidanceReport, InsightsStats, PruneStats, SessionFilter,
35 SessionOutcomeRow, SessionPage, SessionSampleAgg, SessionStatusRow, StoreOpenMode,
36 SummaryStats, SyncStatusSnapshot, ToolSpanSyncRow,
37};
38pub(super) use sql::{PAIN_HOTSPOTS_SQL, SESSION_SELECT, TOOL_RANK_ROWS_SQL};
39
40#[derive(Clone)]
41struct SpanTreeCacheEntry {
42 session_id: String,
43 last_event_seq: Option<u64>,
44 nodes: Vec<crate::store::span_tree::SpanNode>,
45}
46
47pub struct Store {
48 conn: Connection,
49 root: PathBuf,
50 search_writer: RefCell<Option<crate::search::PendingWriter>>,
51 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
52 projector: RefCell<Projector>,
53}
54
55mod artifact_windows;
56mod constants;
57mod contracts;
58mod evals;
59mod event_batch;
60mod event_extensions;
61mod event_projector;
62mod event_read;
63mod event_write;
64mod events;
65mod experiment_windows;
66mod feedback;
67mod guidance;
68mod guidance_candidates;
69mod maintenance;
70mod metrics;
71mod outbox_migration;
72mod outcomes;
73mod prompts;
74mod report_windows;
75mod reports;
76mod rows;
77mod samples;
78mod schema;
79mod session_identity;
80mod session_read;
81mod session_window;
82mod sessions;
83mod sql;
84mod sync;
85#[cfg(test)]
86mod tests;
87mod tool_span_sync;
88mod tool_spans;
89mod trace_spans;
90mod visualization;
91
92pub(super) fn now_ms() -> u64 {
93 std::time::SystemTime::now()
94 .duration_since(std::time::UNIX_EPOCH)
95 .unwrap_or_default()
96 .as_millis() as u64
97}
98
99impl Store {
100 pub(crate) fn conn(&self) -> &Connection {
101 &self.conn
102 }
103
104 pub fn open(path: &Path) -> Result<Self> {
105 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
106 }
107
108 pub fn open_read_only(path: &Path) -> Result<Self> {
109 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
110 }
111
112 pub fn open_query(path: &Path) -> Result<Self> {
113 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
114 }
115
116 pub(crate) fn open_empty(root: &Path) -> Result<Self> {
117 let conn = Connection::open_in_memory().context("open empty in-memory store")?;
118 initialize_empty(&conn)?;
119 Ok(store_from_connection(conn, root.to_path_buf()))
120 }
121
122 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
123 prepare_parent(path, mode)?;
124 let conn = match mode {
125 StoreOpenMode::ReadWrite => Connection::open(path),
126 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
127 path,
128 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
129 ),
130 }
131 .with_context(|| format!("open db: {}", path.display()))?;
132 schema::apply_pragmas(&conn, mode)?;
133 if mode == StoreOpenMode::ReadWrite {
134 for sql in schema::MIGRATIONS {
135 conn.execute_batch(sql)?;
136 }
137 schema::ensure_schema_columns(&conn)?;
138 session_identity::backfill(&conn)?;
139 outbox_migration::migrate(&conn, path.parent().unwrap_or_else(|| Path::new(".")))?;
140 }
141 let root = path
142 .parent()
143 .unwrap_or_else(|| Path::new("."))
144 .to_path_buf();
145 Ok(store_from_connection(conn, root))
146 }
147
148 pub(super) fn invalidate_span_tree_cache(&self) {
149 *self.span_tree_cache.borrow_mut() = None;
150 }
151}
152
153fn initialize_empty(conn: &Connection) -> Result<()> {
154 schema::apply_pragmas(conn, StoreOpenMode::ReadWrite)?;
155 schema::MIGRATIONS
156 .iter()
157 .try_for_each(|statement| conn.execute_batch(statement))?;
158 schema::ensure_schema_columns(conn)
159}
160
161fn store_from_connection(conn: Connection, root: PathBuf) -> Store {
162 Store {
163 conn,
164 root,
165 search_writer: RefCell::new(None),
166 span_tree_cache: RefCell::new(None),
167 projector: RefCell::new(Projector::default()),
168 }
169}
170
171fn prepare_parent(path: &Path, mode: StoreOpenMode) -> Result<()> {
172 if mode == StoreOpenMode::ReadOnlyQuery {
173 return Ok(());
174 }
175 if let Some(parent) = path.parent() {
176 std::fs::create_dir_all(parent)?;
177 }
178 Ok(())
179}
180
181impl Drop for Store {
182 fn drop(&mut self) {
183 if let Some(writer) = self.search_writer.get_mut().as_mut() {
184 let _ = writer.commit();
185 }
186 }
187}