lantern 0.2.3

Local-first, provenance-aware semantic search for agent activity
Documentation
//! Local SQLite-backed store.
//!
//! The schema is intentionally narrow: a `sources` table records where a
//! piece of text came from, and a `chunks` table records the deterministic
//! slices that were indexed. Both carry provenance fields (uri, sha256,
//! byte offsets, timestamps) so a future retrieval path can explain where
//! any result originated.

use crate::embed::DEFAULT_EMBED_MODEL;
use anyhow::{Context, Result};
use rusqlite::Connection;
use rusqlite::ffi::sqlite3_auto_extension;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Once;

pub const DB_FILENAME: &str = "lantern.db";

static VEC_EXT_REGISTER: Once = Once::new();

// Register the statically-linked sqlite-vec extension as a SQLite
// auto-extension so every subsequent connection inherits the `vec0`
// module and `vec_*` functions. The `sqlite-vec` crate declares
// `sqlite3_vec_init` as a zero-arg `fn()`, but the underlying C symbol
// has the full extension-init signature — transmuting between the two
// is what the crate's own tests do.
fn ensure_sqlite_vec_loaded() {
    type VecInit = unsafe extern "C" fn(
        *mut rusqlite::ffi::sqlite3,
        *mut *mut std::os::raw::c_char,
        *const rusqlite::ffi::sqlite3_api_routines,
    ) -> std::os::raw::c_int;
    VEC_EXT_REGISTER.call_once(|| unsafe {
        let init: VecInit =
            std::mem::transmute::<*const (), VecInit>(sqlite_vec::sqlite3_vec_init as *const ());
        sqlite3_auto_extension(Some(init));
    });
}

const SCHEMA_VERSION: i64 = 9;

/// vec0 mirror of `embeddings` for the default model only
/// (`nomic-embed-text` / 768-dim). `embeddings` remains the source of truth;
/// this table exists so a later slice can switch search to the ANN path
/// without another migration.
pub const VEC_MIRROR_TABLE: &str = "chunks_vec_nomic_768";

pub struct Store {
    root: PathBuf,
    conn: Connection,
}

impl Store {
    /// Create the store directory if needed, then open and migrate it.
    pub fn initialize(root: &Path) -> Result<Self> {
        fs::create_dir_all(root)
            .with_context(|| format!("creating store dir {}", root.display()))?;
        Self::open(root)
    }

    /// Open an existing store directory. Fails if the directory is missing.
    pub fn open(root: &Path) -> Result<Self> {
        if !root.exists() {
            anyhow::bail!("store directory does not exist: {}", root.display());
        }
        // Must be registered *before* the connection is opened so the
        // `vec0` module and `vec_*` functions are visible on it.
        ensure_sqlite_vec_loaded();
        let db_path = root.join(DB_FILENAME);
        let conn = Connection::open(&db_path)
            .with_context(|| format!("opening sqlite at {}", db_path.display()))?;
        conn.execute_batch(
            "PRAGMA journal_mode = WAL;
             PRAGMA foreign_keys = ON;
             PRAGMA synchronous = NORMAL;",
        )?;
        let store = Self {
            root: root.to_path_buf(),
            conn,
        };
        store.migrate()?;
        Ok(store)
    }

    pub fn root(&self) -> &Path {
        &self.root
    }

    pub fn db_path(&self) -> PathBuf {
        self.root.join(DB_FILENAME)
    }

    pub fn schema_version(&self) -> rusqlite::Result<i64> {
        self.conn
            .pragma_query_value(None, "user_version", |row| row.get(0))
    }

    pub fn conn(&self) -> &Connection {
        &self.conn
    }

    pub fn conn_mut(&mut self) -> &mut Connection {
        &mut self.conn
    }

    fn migrate(&self) -> Result<()> {
        let current: i64 = self
            .conn
            .pragma_query_value(None, "user_version", |row| row.get(0))?;
        if current < 1 {
            self.conn.execute_batch(SCHEMA_V1)?;
        }
        if current < 2 {
            self.conn.execute_batch(SCHEMA_V2)?;
        }
        if current < 3 {
            self.conn.execute_batch(SCHEMA_V3)?;
        }
        if current < 4 {
            self.conn.execute_batch(&schema_v4())?;
        }
        if current < 5 {
            self.conn.execute_batch(&schema_v5())?;
        }
        if current < 6 {
            self.conn.execute_batch(&schema_v6())?;
        }
        if current < 7 {
            self.conn.execute_batch(&schema_v7())?;
        }
        if current < 8 {
            self.conn.execute_batch(&schema_v8())?;
        }
        if current < 9 {
            self.conn.execute_batch(&schema_v9())?;
        }
        if current < SCHEMA_VERSION {
            self.conn
                .pragma_update(None, "user_version", SCHEMA_VERSION)?;
        }
        Ok(())
    }
}

const SCHEMA_V1: &str = r#"
CREATE TABLE IF NOT EXISTS sources (
    id              TEXT PRIMARY KEY,
    uri             TEXT NOT NULL,
    path            TEXT,
    kind            TEXT NOT NULL,
    bytes           INTEGER NOT NULL,
    content_sha256  TEXT NOT NULL,
    mtime_unix      INTEGER,
    ingested_at     INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_sources_uri ON sources(uri);
CREATE INDEX IF NOT EXISTS idx_sources_content_sha ON sources(content_sha256);

CREATE TABLE IF NOT EXISTS chunks (
    id          TEXT PRIMARY KEY,
    source_id   TEXT NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
    ordinal     INTEGER NOT NULL,
    byte_start  INTEGER NOT NULL,
    byte_end    INTEGER NOT NULL,
    char_count  INTEGER NOT NULL,
    text        TEXT NOT NULL,
    sha256      TEXT NOT NULL,
    created_at  INTEGER NOT NULL,
    UNIQUE(source_id, ordinal)
);

CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source_id);
"#;

// v2: add an FTS5 virtual table shadowing chunk text plus triggers that keep
// it synchronized with the `chunks` table. A contentful FTS5 table (the
// default) stores its own copy of the text; duplication is small compared to
// the value of getting BM25 ranking and snippet() support for free.
const SCHEMA_V2: &str = r#"
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
    text,
    tokenize = 'unicode61 remove_diacritics 2'
);

CREATE TRIGGER IF NOT EXISTS chunks_fts_ai AFTER INSERT ON chunks BEGIN
    INSERT INTO chunks_fts(rowid, text) VALUES (new.rowid, new.text);
END;

CREATE TRIGGER IF NOT EXISTS chunks_fts_ad AFTER DELETE ON chunks BEGIN
    DELETE FROM chunks_fts WHERE rowid = old.rowid;
END;

CREATE TRIGGER IF NOT EXISTS chunks_fts_au AFTER UPDATE OF text ON chunks BEGIN
    UPDATE chunks_fts SET text = new.text WHERE rowid = old.rowid;
END;

-- Backfill rows that predated the FTS index (no-op on a fresh store).
INSERT INTO chunks_fts(rowid, text)
SELECT rowid, text FROM chunks
WHERE rowid NOT IN (SELECT rowid FROM chunks_fts);
"#;

// v3: per-chunk dense embeddings, stored as a little-endian f32 BLOB alongside
// the model that produced them so a future retrieval path can decide whether
// a stored vector is still valid. Brute-force cosine search happens in Rust.
const SCHEMA_V3: &str = r#"
CREATE TABLE IF NOT EXISTS embeddings (
    chunk_id    TEXT PRIMARY KEY REFERENCES chunks(id) ON DELETE CASCADE,
    model       TEXT NOT NULL,
    dim         INTEGER NOT NULL,
    embedding   BLOB NOT NULL,
    created_at  INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_embeddings_model ON embeddings(model);
"#;

// v4: a vec0 mirror of the default-model embeddings. `distance_metric=cosine`
// makes the mirror's `distance` column `1 - cosine_similarity`, which lines up
// with the existing brute-force ordering when a later slice wires search
// through it. Created here so the table is ready before any dual-write path.
fn schema_v4() -> String {
    format!(
        "CREATE VIRTUAL TABLE IF NOT EXISTS {VEC_MIRROR_TABLE} USING vec0(
            embedding float[768] distance_metric=cosine
        );"
    )
}

// v5: backfill the default-model vec0 mirror from existing canonical embeddings
// so upgraded stores keep working immediately after migration.
fn schema_v5() -> String {
    format!(
        "INSERT INTO {VEC_MIRROR_TABLE}(rowid, embedding)
         SELECT c.rowid, e.embedding
         FROM embeddings e
         JOIN chunks c ON c.id = e.chunk_id
         WHERE e.model = '{DEFAULT_EMBED_MODEL}'
           AND NOT EXISTS (
               SELECT 1 FROM {VEC_MIRROR_TABLE} v WHERE v.rowid = c.rowid
           );"
    )
}

fn schema_v6() -> String {
    [
        "ALTER TABLE chunks ADD COLUMN role TEXT;",
        "ALTER TABLE chunks ADD COLUMN session_id TEXT;",
        "ALTER TABLE chunks ADD COLUMN turn_id TEXT;",
        "ALTER TABLE chunks ADD COLUMN tool_name TEXT;",
        "ALTER TABLE chunks ADD COLUMN timestamp_unix INTEGER;",
    ]
    .join("\n")
}

fn schema_v7() -> String {
    [
        "ALTER TABLE chunks ADD COLUMN access_count INTEGER NOT NULL DEFAULT 0;",
        "ALTER TABLE chunks ADD COLUMN last_accessed_at INTEGER;",
    ]
    .join("\n")
}

// v8: signed net user-feedback score per chunk. Positive values are explicit
// thumbs-up (the chunk helped answer a query); negative values are thumbs-down
// (the chunk was misleading or irrelevant). `0` is the neutral default and must
// leave confidence scoring unchanged so existing stores see no ranking drift
// after the migration.
fn schema_v8() -> String {
    "ALTER TABLE chunks ADD COLUMN feedback_score INTEGER NOT NULL DEFAULT 0;".to_string()
}

// v9: separate checkpoint for background access-metadata decay. The search
// path updates this column whenever a chunk is touched; the compacting pass can
// then decay old access counts incrementally without disturbing the
// user-visible freshness timestamp used for confidence scoring.
fn schema_v9() -> String {
    "ALTER TABLE chunks ADD COLUMN access_decay_at INTEGER;".to_string()
}
#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[test]
    fn initialize_is_idempotent() {
        let dir = tempdir().unwrap();
        let root = dir.path().join("store");
        let _ = Store::initialize(&root).unwrap();
        let store = Store::initialize(&root).unwrap();
        let version: i64 = store
            .conn()
            .pragma_query_value(None, "user_version", |row| row.get(0))
            .unwrap();
        assert_eq!(version, SCHEMA_VERSION);
    }

    #[test]
    fn open_requires_existing_directory() {
        let dir = tempdir().unwrap();
        let missing = dir.path().join("nope");
        assert!(Store::open(&missing).is_err());
    }

    #[test]
    fn sqlite_vec_extension_is_available() {
        let dir = tempdir().unwrap();
        let store = Store::initialize(&dir.path().join("store")).unwrap();
        let version: String = store
            .conn()
            .query_row("SELECT vec_version()", [], |row| row.get(0))
            .unwrap();
        assert!(!version.is_empty());
    }

    #[test]
    fn fts_table_exists_after_init() {
        let dir = tempdir().unwrap();
        let store = Store::initialize(&dir.path().join("store")).unwrap();
        let name: String = store
            .conn()
            .query_row(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='chunks_fts'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(name, "chunks_fts");
    }

    #[test]
    fn vec_mirror_table_exists_after_init() {
        let dir = tempdir().unwrap();
        let store = Store::initialize(&dir.path().join("store")).unwrap();
        let name: String = store
            .conn()
            .query_row(
                "SELECT name FROM sqlite_master WHERE name = 'chunks_vec_nomic_768'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(name, "chunks_vec_nomic_768");
    }
}