#![forbid(unsafe_code)]
#![warn(missing_docs)]
pub mod api;
pub mod cjk;
pub use api::{
ChunkHit, ChunkLookup, LineageChain, PendingEmbeddingJob, RecordSummary, SearchFilter,
SourceRow, SourceWithCounts, StoreStats, MAX_LIST_LIMIT,
};
use std::path::Path;
use rusqlite::functions::FunctionFlags;
use rusqlite::Connection;
use thiserror::Error;
const MIGRATIONS: &[(&str, &str)] = &[
("0001_init", include_str!("migrations/0001_init.sql")),
("0002_phase1", include_str!("migrations/0002_phase1.sql")),
("0003_cjk_fts", include_str!("migrations/0003_cjk_fts.sql")),
(
"0004_provenance_derived_from",
include_str!("migrations/0004_provenance_derived_from.sql"),
),
];
fn register_cjk_function(conn: &Connection) -> rusqlite::Result<()> {
conn.create_scalar_function(
"tokenize_cjk",
1,
FunctionFlags::SQLITE_DETERMINISTIC | FunctionFlags::SQLITE_UTF8,
|ctx| {
let text: String = ctx.get(0).unwrap_or_default();
Ok(crate::cjk::tokenize_indexing(&text))
},
)
}
#[derive(Debug, Error)]
pub enum StoreError {
#[error("sqlite: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("database schema is newer than this binary supports (found {found})")]
SchemaTooNew {
found: u32,
},
#[error("store corruption: {0}")]
Corruption(String),
}
pub type Result<T> = std::result::Result<T, StoreError>;
pub struct Store {
pub(crate) conn: parking_lot::Mutex<Connection>,
}
impl Store {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let conn = Connection::open(path)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
register_cjk_function(&conn)?;
let store = Self {
conn: parking_lot::Mutex::new(conn),
};
store.run_migrations()?;
store.reindex_fts_if_pending()?;
Ok(store)
}
pub fn open_in_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
register_cjk_function(&conn)?;
let store = Self {
conn: parking_lot::Mutex::new(conn),
};
store.run_migrations()?;
store.reindex_fts_if_pending()?;
Ok(store)
}
fn reindex_fts_if_pending(&self) -> Result<()> {
let pending: Option<String> = {
let conn = self.conn.lock();
conn.query_row(
"SELECT value FROM meta WHERE key = 'chunks_fts_rebuild_pending'",
[],
|r| r.get(0),
)
.ok()
};
if pending.as_deref() != Some("1") {
return Ok(());
}
tracing::info!("0003_cjk_fts: re-tokenising existing record_chunks into chunks_fts");
let mut conn = self.conn.lock();
let tx = conn.transaction()?;
tx.execute(
"INSERT INTO chunks_fts(chunks_fts) VALUES('delete-all')",
[],
)?;
let n: usize = tx.execute(
"INSERT INTO chunks_fts(rowid, content)
SELECT rowid, tokenize_cjk(content) FROM record_chunks",
[],
)?;
tx.execute(
"DELETE FROM meta WHERE key = 'chunks_fts_rebuild_pending'",
[],
)?;
tx.commit()?;
tracing::info!(reindexed_rows = n, "0003_cjk_fts: chunks_fts rebuilt");
Ok(())
}
fn run_migrations(&self) -> Result<()> {
let mut conn = self.conn.lock();
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS _migrations (
id TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
);",
)?;
for (id, sql) in MIGRATIONS {
let already: i64 = conn.query_row(
"SELECT COUNT(1) FROM _migrations WHERE id = ?1",
[id],
|r| r.get(0),
)?;
if already == 0 {
let tx = conn.transaction()?;
tx.execute_batch(sql)?;
tx.execute(
"INSERT INTO _migrations(id, applied_at) VALUES (?1, strftime('%s','now'))",
[id],
)?;
tx.commit()?;
tracing::info!(migration = id, "applied migration");
}
}
Ok(())
}
pub fn conn(&self) -> parking_lot::MutexGuard<'_, Connection> {
self.conn.lock()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn open_in_memory_runs_migrations() {
let store = Store::open_in_memory().unwrap();
let count: i64 = store
.conn()
.query_row("SELECT COUNT(1) FROM records", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 0);
let version: String = store
.conn()
.query_row(
"SELECT value FROM meta WHERE key = 'schema_version'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(version, "3");
}
#[test]
fn phase1_tables_exist() {
let store = Store::open_in_memory().unwrap();
for table in [
"sources",
"raw_artifacts",
"record_chunks",
"chunks_fts",
"chunk_embeddings",
"embedding_jobs",
"import_errors",
] {
let n: i64 = store
.conn()
.query_row(
"SELECT COUNT(1) FROM sqlite_master WHERE name = ?1",
[table],
|r| r.get(0),
)
.unwrap_or_else(|_| panic!("query failed for {table}"));
assert_eq!(n, 1, "expected table/view {table} to exist");
}
}
#[test]
fn record_level_fts_was_dropped() {
let store = Store::open_in_memory().unwrap();
let n: i64 = store
.conn()
.query_row(
"SELECT COUNT(1) FROM sqlite_master WHERE name = 'records_fts'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(n, 0, "records_fts should not exist after 0002");
}
#[test]
fn chunks_fts_is_maintained_by_triggers() {
let store = Store::open_in_memory().unwrap();
let conn = store.conn();
conn.execute(
"INSERT INTO records(id, adapter, instance, content, scope, kind, \
created_at, native_id, captured_at, raw_hash) \
VALUES('r1','claude-code',NULL,'parent','user','fact',0,'n1',0,'h')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
VALUES('r1:0','r1',0,'hello world','h0',2)",
[],
)
.unwrap();
let hits: i64 = conn
.query_row(
"SELECT COUNT(1) FROM chunks_fts WHERE chunks_fts MATCH 'hello'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(hits, 1, "FTS should index inserted chunk content");
conn.execute("DELETE FROM record_chunks WHERE id = 'r1:0'", [])
.unwrap();
let hits: i64 = conn
.query_row(
"SELECT COUNT(1) FROM chunks_fts WHERE chunks_fts MATCH 'hello'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(hits, 0, "FTS should drop entry on chunk delete");
}
#[test]
fn embedding_jobs_unique_per_chunk_and_model() {
let store = Store::open_in_memory().unwrap();
let conn = store.conn();
conn.execute(
"INSERT INTO records(id, adapter, instance, content, scope, kind, \
created_at, native_id, captured_at, raw_hash) \
VALUES('r1','claude-code',NULL,'p','user','fact',0,'n1',0,'h')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
VALUES('r1:0','r1',0,'x','h0',1)",
[],
)
.unwrap();
let ok = conn.execute(
"INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
VALUES('r1:0','h0','local:e5:1','pending',0)",
[],
);
assert!(ok.is_ok());
let dup = conn.execute(
"INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
VALUES('r1:0','h0','local:e5:1','pending',1)",
[],
);
assert!(dup.is_err());
let other = conn.execute(
"INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
VALUES('r1:0','h0','local:bge-m3:1','pending',2)",
[],
);
assert!(other.is_ok());
}
#[test]
fn cascade_delete_record_clears_chunks_and_artifacts() {
let store = Store::open_in_memory().unwrap();
let conn = store.conn();
conn.execute(
"INSERT INTO records(id, adapter, instance, content, scope, kind, \
created_at, native_id, captured_at, raw_hash) \
VALUES('r1','claude-code',NULL,'p','user','fact',0,'n1',0,'h')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO raw_artifacts(record_id, payload_json, captured_at) \
VALUES('r1','{}',0)",
[],
)
.unwrap();
conn.execute(
"INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
VALUES('r1:0','r1',0,'x','h0',1)",
[],
)
.unwrap();
conn.execute("DELETE FROM records WHERE id = 'r1'", [])
.unwrap();
let c: i64 = conn
.query_row("SELECT COUNT(1) FROM record_chunks", [], |r| r.get(0))
.unwrap();
assert_eq!(c, 0, "chunks should cascade-delete with parent record");
let a: i64 = conn
.query_row("SELECT COUNT(1) FROM raw_artifacts", [], |r| r.get(0))
.unwrap();
assert_eq!(a, 0, "artifacts should cascade-delete with parent record");
}
}