Skip to main content

anamnesis_store/
lib.rs

1//! SQLite-backed storage for Anamnesis records.
2//!
3//! The crate exposes `Store::open` / `Store::open_in_memory` plus a typed
4//! API in `api` for records, chunks, embeddings, jobs, and sources. The
5//! raw `Connection` is intentionally kept private to callers outside this
6//! crate; only tests use `conn()` directly.
7
8#![forbid(unsafe_code)]
9#![warn(missing_docs)]
10
11pub mod api;
12pub mod cjk;
13
14pub use api::{
15    ChunkHit, ChunkLookup, LineageChain, PendingEmbeddingJob, RecordSummary, SearchFilter,
16    SourceRow, SourceWithCounts, StoreStats, MAX_LIST_LIMIT,
17};
18
19use std::path::Path;
20
21use rusqlite::functions::FunctionFlags;
22use rusqlite::Connection;
23use thiserror::Error;
24
25/// Embedded SQL migrations. Add new files in `migrations/` and list them here
26/// in order.
27const MIGRATIONS: &[(&str, &str)] = &[
28    ("0001_init", include_str!("migrations/0001_init.sql")),
29    ("0002_phase1", include_str!("migrations/0002_phase1.sql")),
30    ("0003_cjk_fts", include_str!("migrations/0003_cjk_fts.sql")),
31    (
32        "0004_provenance_derived_from",
33        include_str!("migrations/0004_provenance_derived_from.sql"),
34    ),
35];
36
37/// Register the `tokenize_cjk(text)` SQLite scalar function on `conn`.
38///
39/// The function is called by the `chunks_fts` triggers (`0003_cjk_fts`)
40/// to turn record content into a jieba-segmented token stream before it
41/// hits the FTS index. Must be installed on EVERY connection before any
42/// trigger fires — the migration itself sets it up, and `Store::open`
43/// re-registers because each fresh `Connection` starts without it.
44fn register_cjk_function(conn: &Connection) -> rusqlite::Result<()> {
45    conn.create_scalar_function(
46        "tokenize_cjk",
47        1,
48        FunctionFlags::SQLITE_DETERMINISTIC | FunctionFlags::SQLITE_UTF8,
49        |ctx| {
50            let text: String = ctx.get(0).unwrap_or_default();
51            Ok(crate::cjk::tokenize_indexing(&text))
52        },
53    )
54}
55
56/// Store-layer errors.
57#[derive(Debug, Error)]
58pub enum StoreError {
59    /// SQLite error.
60    #[error("sqlite: {0}")]
61    Sqlite(#[from] rusqlite::Error),
62
63    /// Schema version on disk is newer than this binary supports.
64    #[error("database schema is newer than this binary supports (found {found})")]
65    SchemaTooNew {
66        /// Version found on disk.
67        found: u32,
68    },
69
70    /// Invariant we expect SQLite + the migration set to uphold was
71    /// violated — e.g. a `provenance.derived_from` chain cycle. These
72    /// are loud rather than silent so corruption surfaces fast.
73    #[error("store corruption: {0}")]
74    Corruption(String),
75}
76
77/// Crate result.
78pub type Result<T> = std::result::Result<T, StoreError>;
79
80/// Anamnesis storage handle. The underlying SQLite connection is wrapped
81/// in a `parking_lot::Mutex` so the type is `Send + Sync` and can be
82/// shared across async tasks (the MCP server holds an `Arc<Store>`).
83/// All methods take `&self`; the mutex enforces serialised access to the
84/// connection.
85pub struct Store {
86    pub(crate) conn: parking_lot::Mutex<Connection>,
87}
88
89impl Store {
90    /// Open (or create) a store at the given path and run pending migrations.
91    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
92        let conn = Connection::open(path)?;
93        conn.pragma_update(None, "journal_mode", "WAL")?;
94        conn.pragma_update(None, "foreign_keys", "ON")?;
95        conn.pragma_update(None, "synchronous", "NORMAL")?;
96        register_cjk_function(&conn)?;
97        let store = Self {
98            conn: parking_lot::Mutex::new(conn),
99        };
100        store.run_migrations()?;
101        store.reindex_fts_if_pending()?;
102        Ok(store)
103    }
104
105    /// Open an in-memory store (useful for tests).
106    pub fn open_in_memory() -> Result<Self> {
107        let conn = Connection::open_in_memory()?;
108        register_cjk_function(&conn)?;
109        let store = Self {
110            conn: parking_lot::Mutex::new(conn),
111        };
112        store.run_migrations()?;
113        store.reindex_fts_if_pending()?;
114        Ok(store)
115    }
116
117    /// If migration 0003 set the `chunks_fts_rebuild_pending` flag,
118    /// re-tokenize and re-insert every row from `record_chunks` into
119    /// `chunks_fts`, then clear the flag.
120    ///
121    /// This is the second half of the 0003 migration: the SQL part can
122    /// only drop the FTS data, because `tokenize_cjk` is per-connection
123    /// and not guaranteed to be installed at migration time on a fresh
124    /// DB. Doing the rebuild here keeps the work idempotent (no flag →
125    /// no-op) and bounded (flagged once per DB lifetime).
126    fn reindex_fts_if_pending(&self) -> Result<()> {
127        let pending: Option<String> = {
128            let conn = self.conn.lock();
129            conn.query_row(
130                "SELECT value FROM meta WHERE key = 'chunks_fts_rebuild_pending'",
131                [],
132                |r| r.get(0),
133            )
134            .ok()
135        };
136        if pending.as_deref() != Some("1") {
137            return Ok(());
138        }
139        tracing::info!("0003_cjk_fts: re-tokenising existing record_chunks into chunks_fts");
140        let mut conn = self.conn.lock();
141        let tx = conn.transaction()?;
142        // Wipe whatever's in chunks_fts (external-content mode means
143        // there's no automatic clear when triggers re-insert).
144        tx.execute(
145            "INSERT INTO chunks_fts(chunks_fts) VALUES('delete-all')",
146            [],
147        )?;
148        // Re-insert each chunk through the new tokenize_cjk trigger.
149        // We do it via UPDATE-noop on record_chunks so the AFTER UPDATE
150        // trigger fires consistently, which avoids encoding the
151        // tokenization logic in two places.
152        let n: usize = tx.execute(
153            "INSERT INTO chunks_fts(rowid, content)
154             SELECT rowid, tokenize_cjk(content) FROM record_chunks",
155            [],
156        )?;
157        tx.execute(
158            "DELETE FROM meta WHERE key = 'chunks_fts_rebuild_pending'",
159            [],
160        )?;
161        tx.commit()?;
162        tracing::info!(reindexed_rows = n, "0003_cjk_fts: chunks_fts rebuilt");
163        Ok(())
164    }
165
166    fn run_migrations(&self) -> Result<()> {
167        let mut conn = self.conn.lock();
168        // Tiny home-grown runner: keep applied migration ids in a meta table.
169        conn.execute_batch(
170            "CREATE TABLE IF NOT EXISTS _migrations (
171                id    TEXT PRIMARY KEY,
172                applied_at INTEGER NOT NULL
173            );",
174        )?;
175
176        for (id, sql) in MIGRATIONS {
177            let already: i64 = conn.query_row(
178                "SELECT COUNT(1) FROM _migrations WHERE id = ?1",
179                [id],
180                |r| r.get(0),
181            )?;
182            if already == 0 {
183                let tx = conn.transaction()?;
184                tx.execute_batch(sql)?;
185                tx.execute(
186                    "INSERT INTO _migrations(id, applied_at) VALUES (?1, strftime('%s','now'))",
187                    [id],
188                )?;
189                tx.commit()?;
190                tracing::info!(migration = id, "applied migration");
191            }
192        }
193        Ok(())
194    }
195
196    /// Borrow the inner connection. Intended for tests and ad-hoc reads;
197    /// production code should call the typed methods in `api`. The
198    /// returned guard holds the mutex — drop it before any `.await`.
199    pub fn conn(&self) -> parking_lot::MutexGuard<'_, Connection> {
200        self.conn.lock()
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn open_in_memory_runs_migrations() {
210        let store = Store::open_in_memory().unwrap();
211        let count: i64 = store
212            .conn()
213            .query_row("SELECT COUNT(1) FROM records", [], |r| r.get(0))
214            .unwrap();
215        assert_eq!(count, 0);
216
217        let version: String = store
218            .conn()
219            .query_row(
220                "SELECT value FROM meta WHERE key = 'schema_version'",
221                [],
222                |r| r.get(0),
223            )
224            .unwrap();
225        assert_eq!(version, "3");
226    }
227
228    #[test]
229    fn phase1_tables_exist() {
230        let store = Store::open_in_memory().unwrap();
231        for table in [
232            "sources",
233            "raw_artifacts",
234            "record_chunks",
235            "chunks_fts",
236            "chunk_embeddings",
237            "embedding_jobs",
238            "import_errors",
239        ] {
240            let n: i64 = store
241                .conn()
242                .query_row(
243                    "SELECT COUNT(1) FROM sqlite_master WHERE name = ?1",
244                    [table],
245                    |r| r.get(0),
246                )
247                .unwrap_or_else(|_| panic!("query failed for {table}"));
248            assert_eq!(n, 1, "expected table/view {table} to exist");
249        }
250    }
251
252    #[test]
253    fn record_level_fts_was_dropped() {
254        let store = Store::open_in_memory().unwrap();
255        let n: i64 = store
256            .conn()
257            .query_row(
258                "SELECT COUNT(1) FROM sqlite_master WHERE name = 'records_fts'",
259                [],
260                |r| r.get(0),
261            )
262            .unwrap();
263        assert_eq!(n, 0, "records_fts should not exist after 0002");
264    }
265
266    #[test]
267    fn chunks_fts_is_maintained_by_triggers() {
268        let store = Store::open_in_memory().unwrap();
269        let conn = store.conn();
270
271        // Insert a parent record so the FK on record_chunks is satisfied.
272        conn.execute(
273            "INSERT INTO records(id, adapter, instance, content, scope, kind, \
274             created_at, native_id, captured_at, raw_hash) \
275             VALUES('r1','claude-code',NULL,'parent','user','fact',0,'n1',0,'h')",
276            [],
277        )
278        .unwrap();
279
280        // Insert a chunk → AFTER INSERT trigger should populate FTS.
281        conn.execute(
282            "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
283             VALUES('r1:0','r1',0,'hello world','h0',2)",
284            [],
285        )
286        .unwrap();
287
288        let hits: i64 = conn
289            .query_row(
290                "SELECT COUNT(1) FROM chunks_fts WHERE chunks_fts MATCH 'hello'",
291                [],
292                |r| r.get(0),
293            )
294            .unwrap();
295        assert_eq!(hits, 1, "FTS should index inserted chunk content");
296
297        // Delete the chunk → AFTER DELETE trigger should clean FTS.
298        conn.execute("DELETE FROM record_chunks WHERE id = 'r1:0'", [])
299            .unwrap();
300        let hits: i64 = conn
301            .query_row(
302                "SELECT COUNT(1) FROM chunks_fts WHERE chunks_fts MATCH 'hello'",
303                [],
304                |r| r.get(0),
305            )
306            .unwrap();
307        assert_eq!(hits, 0, "FTS should drop entry on chunk delete");
308    }
309
310    #[test]
311    fn embedding_jobs_unique_per_chunk_and_model() {
312        let store = Store::open_in_memory().unwrap();
313        let conn = store.conn();
314        conn.execute(
315            "INSERT INTO records(id, adapter, instance, content, scope, kind, \
316             created_at, native_id, captured_at, raw_hash) \
317             VALUES('r1','claude-code',NULL,'p','user','fact',0,'n1',0,'h')",
318            [],
319        )
320        .unwrap();
321        conn.execute(
322            "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
323             VALUES('r1:0','r1',0,'x','h0',1)",
324            [],
325        )
326        .unwrap();
327
328        let ok = conn.execute(
329            "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
330             VALUES('r1:0','h0','local:e5:1','pending',0)",
331            [],
332        );
333        assert!(ok.is_ok());
334
335        // Same (chunk_id, model_id) should violate UNIQUE.
336        let dup = conn.execute(
337            "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
338             VALUES('r1:0','h0','local:e5:1','pending',1)",
339            [],
340        );
341        assert!(dup.is_err());
342
343        // Different model_id → fresh job is allowed.
344        let other = conn.execute(
345            "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
346             VALUES('r1:0','h0','local:bge-m3:1','pending',2)",
347            [],
348        );
349        assert!(other.is_ok());
350    }
351
352    #[test]
353    fn cascade_delete_record_clears_chunks_and_artifacts() {
354        let store = Store::open_in_memory().unwrap();
355        let conn = store.conn();
356        conn.execute(
357            "INSERT INTO records(id, adapter, instance, content, scope, kind, \
358             created_at, native_id, captured_at, raw_hash) \
359             VALUES('r1','claude-code',NULL,'p','user','fact',0,'n1',0,'h')",
360            [],
361        )
362        .unwrap();
363        conn.execute(
364            "INSERT INTO raw_artifacts(record_id, payload_json, captured_at) \
365             VALUES('r1','{}',0)",
366            [],
367        )
368        .unwrap();
369        conn.execute(
370            "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
371             VALUES('r1:0','r1',0,'x','h0',1)",
372            [],
373        )
374        .unwrap();
375
376        conn.execute("DELETE FROM records WHERE id = 'r1'", [])
377            .unwrap();
378
379        let c: i64 = conn
380            .query_row("SELECT COUNT(1) FROM record_chunks", [], |r| r.get(0))
381            .unwrap();
382        assert_eq!(c, 0, "chunks should cascade-delete with parent record");
383        let a: i64 = conn
384            .query_row("SELECT COUNT(1) FROM raw_artifacts", [], |r| r.get(0))
385            .unwrap();
386        assert_eq!(a, 0, "artifacts should cascade-delete with parent record");
387    }
388}