use crate::embed::DEFAULT_EMBED_MODEL;
use anyhow::{Context, Result};
use rusqlite::Connection;
use rusqlite::ffi::sqlite3_auto_extension;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Once;
pub const DB_FILENAME: &str = "lantern.db";
static VEC_EXT_REGISTER: Once = Once::new();
fn ensure_sqlite_vec_loaded() {
type VecInit = unsafe extern "C" fn(
*mut rusqlite::ffi::sqlite3,
*mut *mut std::os::raw::c_char,
*const rusqlite::ffi::sqlite3_api_routines,
) -> std::os::raw::c_int;
VEC_EXT_REGISTER.call_once(|| unsafe {
let init: VecInit =
std::mem::transmute::<*const (), VecInit>(sqlite_vec::sqlite3_vec_init as *const ());
sqlite3_auto_extension(Some(init));
});
}
const SCHEMA_VERSION: i64 = 7;
pub const VEC_MIRROR_TABLE: &str = "chunks_vec_nomic_768";
pub struct Store {
root: PathBuf,
conn: Connection,
}
impl Store {
pub fn initialize(root: &Path) -> Result<Self> {
fs::create_dir_all(root)
.with_context(|| format!("creating store dir {}", root.display()))?;
Self::open(root)
}
pub fn open(root: &Path) -> Result<Self> {
if !root.exists() {
anyhow::bail!("store directory does not exist: {}", root.display());
}
ensure_sqlite_vec_loaded();
let db_path = root.join(DB_FILENAME);
let conn = Connection::open(&db_path)
.with_context(|| format!("opening sqlite at {}", db_path.display()))?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
PRAGMA synchronous = NORMAL;",
)?;
let store = Self {
root: root.to_path_buf(),
conn,
};
store.migrate()?;
Ok(store)
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn db_path(&self) -> PathBuf {
self.root.join(DB_FILENAME)
}
pub fn schema_version(&self) -> rusqlite::Result<i64> {
self.conn
.pragma_query_value(None, "user_version", |row| row.get(0))
}
pub fn conn(&self) -> &Connection {
&self.conn
}
pub fn conn_mut(&mut self) -> &mut Connection {
&mut self.conn
}
fn migrate(&self) -> Result<()> {
let current: i64 = self
.conn
.pragma_query_value(None, "user_version", |row| row.get(0))?;
if current < 1 {
self.conn.execute_batch(SCHEMA_V1)?;
}
if current < 2 {
self.conn.execute_batch(SCHEMA_V2)?;
}
if current < 3 {
self.conn.execute_batch(SCHEMA_V3)?;
}
if current < 4 {
self.conn.execute_batch(&schema_v4())?;
}
if current < 5 {
self.conn.execute_batch(&schema_v5())?;
}
if current < 6 {
self.conn.execute_batch(&schema_v6())?;
}
if current < 7 {
self.conn.execute_batch(&schema_v7())?;
}
if current < SCHEMA_VERSION {
self.conn
.pragma_update(None, "user_version", SCHEMA_VERSION)?;
}
Ok(())
}
}
const SCHEMA_V1: &str = r#"
CREATE TABLE IF NOT EXISTS sources (
id TEXT PRIMARY KEY,
uri TEXT NOT NULL,
path TEXT,
kind TEXT NOT NULL,
bytes INTEGER NOT NULL,
content_sha256 TEXT NOT NULL,
mtime_unix INTEGER,
ingested_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sources_uri ON sources(uri);
CREATE INDEX IF NOT EXISTS idx_sources_content_sha ON sources(content_sha256);
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
ordinal INTEGER NOT NULL,
byte_start INTEGER NOT NULL,
byte_end INTEGER NOT NULL,
char_count INTEGER NOT NULL,
text TEXT NOT NULL,
sha256 TEXT NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(source_id, ordinal)
);
CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source_id);
"#;
const SCHEMA_V2: &str = r#"
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
text,
tokenize = 'unicode61 remove_diacritics 2'
);
CREATE TRIGGER IF NOT EXISTS chunks_fts_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, text) VALUES (new.rowid, new.text);
END;
CREATE TRIGGER IF NOT EXISTS chunks_fts_ad AFTER DELETE ON chunks BEGIN
DELETE FROM chunks_fts WHERE rowid = old.rowid;
END;
CREATE TRIGGER IF NOT EXISTS chunks_fts_au AFTER UPDATE OF text ON chunks BEGIN
UPDATE chunks_fts SET text = new.text WHERE rowid = old.rowid;
END;
-- Backfill rows that predated the FTS index (no-op on a fresh store).
INSERT INTO chunks_fts(rowid, text)
SELECT rowid, text FROM chunks
WHERE rowid NOT IN (SELECT rowid FROM chunks_fts);
"#;
const SCHEMA_V3: &str = r#"
CREATE TABLE IF NOT EXISTS embeddings (
chunk_id TEXT PRIMARY KEY REFERENCES chunks(id) ON DELETE CASCADE,
model TEXT NOT NULL,
dim INTEGER NOT NULL,
embedding BLOB NOT NULL,
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_embeddings_model ON embeddings(model);
"#;
fn schema_v4() -> String {
format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS {VEC_MIRROR_TABLE} USING vec0(
embedding float[768] distance_metric=cosine
);"
)
}
fn schema_v5() -> String {
format!(
"INSERT INTO {VEC_MIRROR_TABLE}(rowid, embedding)
SELECT c.rowid, e.embedding
FROM embeddings e
JOIN chunks c ON c.id = e.chunk_id
WHERE e.model = '{DEFAULT_EMBED_MODEL}'
AND NOT EXISTS (
SELECT 1 FROM {VEC_MIRROR_TABLE} v WHERE v.rowid = c.rowid
);"
)
}
fn schema_v6() -> String {
[
"ALTER TABLE chunks ADD COLUMN role TEXT;",
"ALTER TABLE chunks ADD COLUMN session_id TEXT;",
"ALTER TABLE chunks ADD COLUMN turn_id TEXT;",
"ALTER TABLE chunks ADD COLUMN tool_name TEXT;",
"ALTER TABLE chunks ADD COLUMN timestamp_unix INTEGER;",
]
.join("\n")
}
fn schema_v7() -> String {
[
"ALTER TABLE chunks ADD COLUMN access_count INTEGER NOT NULL DEFAULT 0;",
"ALTER TABLE chunks ADD COLUMN last_accessed_at INTEGER;",
]
.join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn initialize_is_idempotent() {
let dir = tempdir().unwrap();
let root = dir.path().join("store");
let _ = Store::initialize(&root).unwrap();
let store = Store::initialize(&root).unwrap();
let version: i64 = store
.conn()
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, SCHEMA_VERSION);
}
#[test]
fn open_requires_existing_directory() {
let dir = tempdir().unwrap();
let missing = dir.path().join("nope");
assert!(Store::open(&missing).is_err());
}
#[test]
fn sqlite_vec_extension_is_available() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let version: String = store
.conn()
.query_row("SELECT vec_version()", [], |row| row.get(0))
.unwrap();
assert!(!version.is_empty());
}
#[test]
fn fts_table_exists_after_init() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let name: String = store
.conn()
.query_row(
"SELECT name FROM sqlite_master WHERE type='table' AND name='chunks_fts'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(name, "chunks_fts");
}
#[test]
fn vec_mirror_table_exists_after_init() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let name: String = store
.conn()
.query_row(
"SELECT name FROM sqlite_master WHERE name = 'chunks_vec_nomic_768'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(name, "chunks_vec_nomic_768");
}
}