use crate::embed::DEFAULT_EMBED_MODEL;
use anyhow::{Context, Result};
use rusqlite::Connection;
use rusqlite::ffi::sqlite3_auto_extension;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Once;
pub const DB_FILENAME: &str = "lantern.db";
static VEC_EXT_REGISTER: Once = Once::new();
fn ensure_sqlite_vec_loaded() {
type VecInit = unsafe extern "C" fn(
*mut rusqlite::ffi::sqlite3,
*mut *mut std::os::raw::c_char,
*const rusqlite::ffi::sqlite3_api_routines,
) -> std::os::raw::c_int;
VEC_EXT_REGISTER.call_once(|| unsafe {
let init: VecInit =
std::mem::transmute::<*const (), VecInit>(sqlite_vec::sqlite3_vec_init as *const ());
sqlite3_auto_extension(Some(init));
});
}
const SCHEMA_VERSION: i64 = 18;
pub const VEC_MIRROR_TABLE: &str = "chunks_vec_nomic_768";
pub struct Store {
root: PathBuf,
conn: Connection,
}
impl Store {
pub fn initialize(root: &Path) -> Result<Self> {
fs::create_dir_all(root)
.with_context(|| format!("creating store dir {}", root.display()))?;
Self::open(root)
}
pub fn open(root: &Path) -> Result<Self> {
if !root.exists() {
anyhow::bail!("store directory does not exist: {}", root.display());
}
ensure_sqlite_vec_loaded();
let db_path = root.join(DB_FILENAME);
let conn = Connection::open(&db_path)
.with_context(|| format!("opening sqlite at {}", db_path.display()))?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
PRAGMA synchronous = NORMAL;",
)?;
let store = Self {
root: root.to_path_buf(),
conn,
};
store.migrate()?;
Ok(store)
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn db_path(&self) -> PathBuf {
self.root.join(DB_FILENAME)
}
pub fn schema_version(&self) -> rusqlite::Result<i64> {
self.conn
.pragma_query_value(None, "user_version", |row| row.get(0))
}
pub fn conn(&self) -> &Connection {
&self.conn
}
pub fn conn_mut(&mut self) -> &mut Connection {
&mut self.conn
}
fn migrate(&self) -> Result<()> {
let current: i64 = self
.conn
.pragma_query_value(None, "user_version", |row| row.get(0))?;
if current < 1 {
self.conn.execute_batch(SCHEMA_V1)?;
}
if current < 2 {
self.conn.execute_batch(SCHEMA_V2)?;
}
if current < 3 {
self.conn.execute_batch(SCHEMA_V3)?;
}
if current < 4 {
self.conn.execute_batch(&schema_v4())?;
}
if current < 5 {
self.conn.execute_batch(&schema_v5())?;
}
if current < 6 {
self.conn.execute_batch(&schema_v6())?;
}
if current < 7 {
self.conn.execute_batch(&schema_v7())?;
}
if current < 8 {
self.conn.execute_batch(&schema_v8())?;
}
if current < 9 {
self.conn.execute_batch(&schema_v9())?;
}
if current < 10 {
self.conn.execute_batch(SCHEMA_V10)?;
}
if current < 11 {
self.conn.execute_batch(&schema_v11())?;
}
if current < 12 {
self.conn.execute_batch(&schema_v12())?;
}
if current < 13 {
self.conn.execute_batch(&schema_v13())?;
}
if current < 14 {
self.conn.execute_batch(&schema_v14())?;
}
if current < 15 {
self.conn.execute_batch(&schema_v15())?;
}
if current < 16 {
self.conn.execute_batch(&schema_v16())?;
}
if current < 17 {
self.conn.execute_batch(&schema_v17())?;
}
if current < 18 {
self.conn.execute_batch(SCHEMA_V18)?;
}
if current < SCHEMA_VERSION {
self.conn
.pragma_update(None, "user_version", SCHEMA_VERSION)?;
}
Ok(())
}
}
const SCHEMA_V1: &str = r#"
CREATE TABLE IF NOT EXISTS sources (
id TEXT PRIMARY KEY,
uri TEXT NOT NULL,
path TEXT,
kind TEXT NOT NULL,
bytes INTEGER NOT NULL,
content_sha256 TEXT NOT NULL,
mtime_unix INTEGER,
ingested_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sources_uri ON sources(uri);
CREATE INDEX IF NOT EXISTS idx_sources_content_sha ON sources(content_sha256);
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
byte_start INTEGER NOT NULL,
byte_end INTEGER NOT NULL,
char_count INTEGER NOT NULL,
text TEXT NOT NULL,
sha256 TEXT NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(source_id, ordinal)
);
CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source_id);
"#;
const SCHEMA_V2: &str = r#"
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
text,
tokenize = 'unicode61 remove_diacritics 2'
);
CREATE TRIGGER IF NOT EXISTS chunks_fts_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, text) VALUES (new.rowid, new.text);
END;
CREATE TRIGGER IF NOT EXISTS chunks_fts_ad AFTER DELETE ON chunks BEGIN
DELETE FROM chunks_fts WHERE rowid = old.rowid;
END;
CREATE TRIGGER IF NOT EXISTS chunks_fts_au AFTER UPDATE OF text ON chunks BEGIN
UPDATE chunks_fts SET text = new.text WHERE rowid = old.rowid;
END;
-- Backfill rows that predated the FTS index (no-op on a fresh store).
INSERT INTO chunks_fts(rowid, text)
SELECT rowid, text FROM chunks
WHERE rowid NOT IN (SELECT rowid FROM chunks_fts);
"#;
const SCHEMA_V3: &str = r#"
CREATE TABLE IF NOT EXISTS embeddings (
chunk_id TEXT PRIMARY KEY REFERENCES chunks(id) ON DELETE CASCADE,
model TEXT NOT NULL,
dim INTEGER NOT NULL,
embedding BLOB NOT NULL,
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_embeddings_model ON embeddings(model);
"#;
fn schema_v4() -> String {
format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS {VEC_MIRROR_TABLE} USING vec0(
embedding float[768] distance_metric=cosine
);"
)
}
fn schema_v5() -> String {
format!(
"INSERT INTO {VEC_MIRROR_TABLE}(rowid, embedding)
SELECT c.rowid, e.embedding
FROM embeddings e
JOIN chunks c ON c.id = e.chunk_id
WHERE e.model = '{DEFAULT_EMBED_MODEL}'
AND NOT EXISTS (
SELECT 1 FROM {VEC_MIRROR_TABLE} v WHERE v.rowid = c.rowid
);"
)
}
fn schema_v6() -> String {
[
"ALTER TABLE chunks ADD COLUMN role TEXT;",
"ALTER TABLE chunks ADD COLUMN session_id TEXT;",
"ALTER TABLE chunks ADD COLUMN turn_id TEXT;",
"ALTER TABLE chunks ADD COLUMN tool_name TEXT;",
"ALTER TABLE chunks ADD COLUMN timestamp_unix INTEGER;",
]
.join("\n")
}
fn schema_v7() -> String {
[
"ALTER TABLE chunks ADD COLUMN access_count INTEGER NOT NULL DEFAULT 0;",
"ALTER TABLE chunks ADD COLUMN last_accessed_at INTEGER;",
]
.join("\n")
}
fn schema_v8() -> String {
"ALTER TABLE chunks ADD COLUMN feedback_score INTEGER NOT NULL DEFAULT 0;".to_string()
}
fn schema_v9() -> String {
"ALTER TABLE chunks ADD COLUMN access_decay_at INTEGER;".to_string()
}
const SCHEMA_V10: &str = r#"
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
value TEXT NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(kind, value)
);
CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(kind);
CREATE TABLE IF NOT EXISTS chunk_entities (
chunk_id TEXT NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
PRIMARY KEY (chunk_id, entity_id)
);
CREATE INDEX IF NOT EXISTS idx_chunk_entities_entity ON chunk_entities(entity_id);
"#;
fn schema_v11() -> String {
"ALTER TABLE chunks ADD COLUMN query_success_count INTEGER NOT NULL DEFAULT 0;".to_string()
}
fn schema_v12() -> String {
"ALTER TABLE chunks ADD COLUMN tool_call_id TEXT;".to_string()
}
fn schema_v13() -> String {
"ALTER TABLE chunks ADD COLUMN parent_turn_id TEXT;".to_string()
}
fn schema_v14() -> String {
"ALTER TABLE chunks ADD COLUMN project TEXT;".to_string()
}
fn schema_v15() -> String {
"ALTER TABLE chunks ADD COLUMN user TEXT;".to_string()
}
fn schema_v16() -> String {
"ALTER TABLE chunks ADD COLUMN topic TEXT;".to_string()
}
fn schema_v17() -> String {
"ALTER TABLE chunks ADD COLUMN thread TEXT;".to_string()
}
const SCHEMA_V18: &str = r#"
CREATE TABLE IF NOT EXISTS memory_records (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
scope TEXT NOT NULL,
content TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 50,
urgency INTEGER NOT NULL DEFAULT 0,
confidence REAL NOT NULL DEFAULT 1.0,
status TEXT NOT NULL DEFAULT 'active',
source_refs_json TEXT NOT NULL DEFAULT '[]',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_memory_records_kind ON memory_records(kind);
CREATE INDEX IF NOT EXISTS idx_memory_records_scope ON memory_records(scope);
CREATE INDEX IF NOT EXISTS idx_memory_records_status ON memory_records(status);
CREATE INDEX IF NOT EXISTS idx_memory_records_priority ON memory_records(priority DESC, urgency DESC, updated_at DESC);
"#;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn initialize_is_idempotent() {
let dir = tempdir().unwrap();
let root = dir.path().join("store");
let _ = Store::initialize(&root).unwrap();
let store = Store::initialize(&root).unwrap();
let version: i64 = store
.conn()
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, SCHEMA_VERSION);
}
#[test]
fn open_requires_existing_directory() {
let dir = tempdir().unwrap();
let missing = dir.path().join("nope");
assert!(Store::open(&missing).is_err());
}
#[test]
fn sqlite_vec_extension_is_available() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let version: String = store
.conn()
.query_row("SELECT vec_version()", [], |row| row.get(0))
.unwrap();
assert!(!version.is_empty());
}
#[test]
fn fts_table_exists_after_init() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let name: String = store
.conn()
.query_row(
"SELECT name FROM sqlite_master WHERE type='table' AND name='chunks_fts'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(name, "chunks_fts");
}
#[test]
fn vec_mirror_table_exists_after_init() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let name: String = store
.conn()
.query_row(
"SELECT name FROM sqlite_master WHERE name = 'chunks_vec_nomic_768'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(name, "chunks_vec_nomic_768");
}
}