1#![forbid(unsafe_code)]
9#![warn(missing_docs)]
10
11pub mod api;
12pub mod cjk;
13
14pub use api::{
15 ChunkHit, ChunkLookup, LineageChain, PendingEmbeddingJob, RecordSummary, SearchFilter,
16 SourceRow, SourceWithCounts, StoreStats, MAX_LIST_LIMIT,
17};
18
19use std::path::Path;
20
21use rusqlite::functions::FunctionFlags;
22use rusqlite::Connection;
23use thiserror::Error;
24
25const MIGRATIONS: &[(&str, &str)] = &[
28 ("0001_init", include_str!("migrations/0001_init.sql")),
29 ("0002_phase1", include_str!("migrations/0002_phase1.sql")),
30 ("0003_cjk_fts", include_str!("migrations/0003_cjk_fts.sql")),
31 (
32 "0004_provenance_derived_from",
33 include_str!("migrations/0004_provenance_derived_from.sql"),
34 ),
35];
36
37fn register_cjk_function(conn: &Connection) -> rusqlite::Result<()> {
45 conn.create_scalar_function(
46 "tokenize_cjk",
47 1,
48 FunctionFlags::SQLITE_DETERMINISTIC | FunctionFlags::SQLITE_UTF8,
49 |ctx| {
50 let text: String = ctx.get(0).unwrap_or_default();
51 Ok(crate::cjk::tokenize_indexing(&text))
52 },
53 )
54}
55
56#[derive(Debug, Error)]
58pub enum StoreError {
59 #[error("sqlite: {0}")]
61 Sqlite(#[from] rusqlite::Error),
62
63 #[error("database schema is newer than this binary supports (found {found})")]
65 SchemaTooNew {
66 found: u32,
68 },
69
70 #[error("store corruption: {0}")]
74 Corruption(String),
75}
76
77pub type Result<T> = std::result::Result<T, StoreError>;
79
80pub struct Store {
86 pub(crate) conn: parking_lot::Mutex<Connection>,
87}
88
89impl Store {
90 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
92 let conn = Connection::open(path)?;
93 conn.pragma_update(None, "journal_mode", "WAL")?;
94 conn.pragma_update(None, "foreign_keys", "ON")?;
95 conn.pragma_update(None, "synchronous", "NORMAL")?;
96 register_cjk_function(&conn)?;
97 let store = Self {
98 conn: parking_lot::Mutex::new(conn),
99 };
100 store.run_migrations()?;
101 store.reindex_fts_if_pending()?;
102 Ok(store)
103 }
104
105 pub fn open_in_memory() -> Result<Self> {
107 let conn = Connection::open_in_memory()?;
108 register_cjk_function(&conn)?;
109 let store = Self {
110 conn: parking_lot::Mutex::new(conn),
111 };
112 store.run_migrations()?;
113 store.reindex_fts_if_pending()?;
114 Ok(store)
115 }
116
117 fn reindex_fts_if_pending(&self) -> Result<()> {
127 let pending: Option<String> = {
128 let conn = self.conn.lock();
129 conn.query_row(
130 "SELECT value FROM meta WHERE key = 'chunks_fts_rebuild_pending'",
131 [],
132 |r| r.get(0),
133 )
134 .ok()
135 };
136 if pending.as_deref() != Some("1") {
137 return Ok(());
138 }
139 tracing::info!("0003_cjk_fts: re-tokenising existing record_chunks into chunks_fts");
140 let mut conn = self.conn.lock();
141 let tx = conn.transaction()?;
142 tx.execute(
145 "INSERT INTO chunks_fts(chunks_fts) VALUES('delete-all')",
146 [],
147 )?;
148 let n: usize = tx.execute(
153 "INSERT INTO chunks_fts(rowid, content)
154 SELECT rowid, tokenize_cjk(content) FROM record_chunks",
155 [],
156 )?;
157 tx.execute(
158 "DELETE FROM meta WHERE key = 'chunks_fts_rebuild_pending'",
159 [],
160 )?;
161 tx.commit()?;
162 tracing::info!(reindexed_rows = n, "0003_cjk_fts: chunks_fts rebuilt");
163 Ok(())
164 }
165
166 fn run_migrations(&self) -> Result<()> {
167 let mut conn = self.conn.lock();
168 conn.execute_batch(
170 "CREATE TABLE IF NOT EXISTS _migrations (
171 id TEXT PRIMARY KEY,
172 applied_at INTEGER NOT NULL
173 );",
174 )?;
175
176 for (id, sql) in MIGRATIONS {
177 let already: i64 = conn.query_row(
178 "SELECT COUNT(1) FROM _migrations WHERE id = ?1",
179 [id],
180 |r| r.get(0),
181 )?;
182 if already == 0 {
183 let tx = conn.transaction()?;
184 tx.execute_batch(sql)?;
185 tx.execute(
186 "INSERT INTO _migrations(id, applied_at) VALUES (?1, strftime('%s','now'))",
187 [id],
188 )?;
189 tx.commit()?;
190 tracing::info!(migration = id, "applied migration");
191 }
192 }
193 Ok(())
194 }
195
196 pub fn conn(&self) -> parking_lot::MutexGuard<'_, Connection> {
200 self.conn.lock()
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn open_in_memory_runs_migrations() {
210 let store = Store::open_in_memory().unwrap();
211 let count: i64 = store
212 .conn()
213 .query_row("SELECT COUNT(1) FROM records", [], |r| r.get(0))
214 .unwrap();
215 assert_eq!(count, 0);
216
217 let version: String = store
218 .conn()
219 .query_row(
220 "SELECT value FROM meta WHERE key = 'schema_version'",
221 [],
222 |r| r.get(0),
223 )
224 .unwrap();
225 assert_eq!(version, "3");
226 }
227
228 #[test]
229 fn phase1_tables_exist() {
230 let store = Store::open_in_memory().unwrap();
231 for table in [
232 "sources",
233 "raw_artifacts",
234 "record_chunks",
235 "chunks_fts",
236 "chunk_embeddings",
237 "embedding_jobs",
238 "import_errors",
239 ] {
240 let n: i64 = store
241 .conn()
242 .query_row(
243 "SELECT COUNT(1) FROM sqlite_master WHERE name = ?1",
244 [table],
245 |r| r.get(0),
246 )
247 .unwrap_or_else(|_| panic!("query failed for {table}"));
248 assert_eq!(n, 1, "expected table/view {table} to exist");
249 }
250 }
251
252 #[test]
253 fn record_level_fts_was_dropped() {
254 let store = Store::open_in_memory().unwrap();
255 let n: i64 = store
256 .conn()
257 .query_row(
258 "SELECT COUNT(1) FROM sqlite_master WHERE name = 'records_fts'",
259 [],
260 |r| r.get(0),
261 )
262 .unwrap();
263 assert_eq!(n, 0, "records_fts should not exist after 0002");
264 }
265
266 #[test]
267 fn chunks_fts_is_maintained_by_triggers() {
268 let store = Store::open_in_memory().unwrap();
269 let conn = store.conn();
270
271 conn.execute(
273 "INSERT INTO records(id, adapter, instance, content, scope, kind, \
274 created_at, native_id, captured_at, raw_hash) \
275 VALUES('r1','claude-code',NULL,'parent','user','fact',0,'n1',0,'h')",
276 [],
277 )
278 .unwrap();
279
280 conn.execute(
282 "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
283 VALUES('r1:0','r1',0,'hello world','h0',2)",
284 [],
285 )
286 .unwrap();
287
288 let hits: i64 = conn
289 .query_row(
290 "SELECT COUNT(1) FROM chunks_fts WHERE chunks_fts MATCH 'hello'",
291 [],
292 |r| r.get(0),
293 )
294 .unwrap();
295 assert_eq!(hits, 1, "FTS should index inserted chunk content");
296
297 conn.execute("DELETE FROM record_chunks WHERE id = 'r1:0'", [])
299 .unwrap();
300 let hits: i64 = conn
301 .query_row(
302 "SELECT COUNT(1) FROM chunks_fts WHERE chunks_fts MATCH 'hello'",
303 [],
304 |r| r.get(0),
305 )
306 .unwrap();
307 assert_eq!(hits, 0, "FTS should drop entry on chunk delete");
308 }
309
310 #[test]
311 fn embedding_jobs_unique_per_chunk_and_model() {
312 let store = Store::open_in_memory().unwrap();
313 let conn = store.conn();
314 conn.execute(
315 "INSERT INTO records(id, adapter, instance, content, scope, kind, \
316 created_at, native_id, captured_at, raw_hash) \
317 VALUES('r1','claude-code',NULL,'p','user','fact',0,'n1',0,'h')",
318 [],
319 )
320 .unwrap();
321 conn.execute(
322 "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
323 VALUES('r1:0','r1',0,'x','h0',1)",
324 [],
325 )
326 .unwrap();
327
328 let ok = conn.execute(
329 "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
330 VALUES('r1:0','h0','local:e5:1','pending',0)",
331 [],
332 );
333 assert!(ok.is_ok());
334
335 let dup = conn.execute(
337 "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
338 VALUES('r1:0','h0','local:e5:1','pending',1)",
339 [],
340 );
341 assert!(dup.is_err());
342
343 let other = conn.execute(
345 "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
346 VALUES('r1:0','h0','local:bge-m3:1','pending',2)",
347 [],
348 );
349 assert!(other.is_ok());
350 }
351
352 #[test]
353 fn cascade_delete_record_clears_chunks_and_artifacts() {
354 let store = Store::open_in_memory().unwrap();
355 let conn = store.conn();
356 conn.execute(
357 "INSERT INTO records(id, adapter, instance, content, scope, kind, \
358 created_at, native_id, captured_at, raw_hash) \
359 VALUES('r1','claude-code',NULL,'p','user','fact',0,'n1',0,'h')",
360 [],
361 )
362 .unwrap();
363 conn.execute(
364 "INSERT INTO raw_artifacts(record_id, payload_json, captured_at) \
365 VALUES('r1','{}',0)",
366 [],
367 )
368 .unwrap();
369 conn.execute(
370 "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
371 VALUES('r1:0','r1',0,'x','h0',1)",
372 [],
373 )
374 .unwrap();
375
376 conn.execute("DELETE FROM records WHERE id = 'r1'", [])
377 .unwrap();
378
379 let c: i64 = conn
380 .query_row("SELECT COUNT(1) FROM record_chunks", [], |r| r.get(0))
381 .unwrap();
382 assert_eq!(c, 0, "chunks should cascade-delete with parent record");
383 let a: i64 = conn
384 .query_row("SELECT COUNT(1) FROM raw_artifacts", [], |r| r.get(0))
385 .unwrap();
386 assert_eq!(a, 0, "artifacts should cascade-delete with parent record");
387 }
388}