lantern 0.3.0

Local-first, provenance-aware semantic search for agent activity
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
//! Local SQLite-backed store.
//!
//! The schema is intentionally narrow: a `sources` table records where a
//! piece of text came from, and a `chunks` table records the deterministic
//! slices that were indexed. Both carry provenance fields (uri, sha256,
//! byte offsets, timestamps) so a future retrieval path can explain where
//! any result originated.

use crate::embed::DEFAULT_EMBED_MODEL;
use anyhow::{Context, Result};
use rusqlite::Connection;
use rusqlite::ffi::sqlite3_auto_extension;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Once;

pub const DB_FILENAME: &str = "lantern.db";

static VEC_EXT_REGISTER: Once = Once::new();

// Register the statically-linked sqlite-vec extension as a SQLite
// auto-extension so every subsequent connection inherits the `vec0`
// module and `vec_*` functions. The `sqlite-vec` crate declares
// `sqlite3_vec_init` as a zero-arg `fn()`, but the underlying C symbol
// has the full extension-init signature — transmuting between the two
// is what the crate's own tests do.
fn ensure_sqlite_vec_loaded() {
    type VecInit = unsafe extern "C" fn(
        *mut rusqlite::ffi::sqlite3,
        *mut *mut std::os::raw::c_char,
        *const rusqlite::ffi::sqlite3_api_routines,
    ) -> std::os::raw::c_int;
    VEC_EXT_REGISTER.call_once(|| unsafe {
        let init: VecInit =
            std::mem::transmute::<*const (), VecInit>(sqlite_vec::sqlite3_vec_init as *const ());
        sqlite3_auto_extension(Some(init));
    });
}

const SCHEMA_VERSION: i64 = 18;

/// vec0 mirror of `embeddings` for the default model only
/// (`nomic-embed-text` / 768-dim). `embeddings` remains the source of truth;
/// this table exists so a later slice can switch search to the ANN path
/// without another migration.
pub const VEC_MIRROR_TABLE: &str = "chunks_vec_nomic_768";

pub struct Store {
    root: PathBuf,
    conn: Connection,
}

impl Store {
    /// Create the store directory if needed, then open and migrate it.
    pub fn initialize(root: &Path) -> Result<Self> {
        fs::create_dir_all(root)
            .with_context(|| format!("creating store dir {}", root.display()))?;
        Self::open(root)
    }

    /// Open an existing store directory. Fails if the directory is missing.
    pub fn open(root: &Path) -> Result<Self> {
        if !root.exists() {
            anyhow::bail!("store directory does not exist: {}", root.display());
        }
        // Must be registered *before* the connection is opened so the
        // `vec0` module and `vec_*` functions are visible on it.
        ensure_sqlite_vec_loaded();
        let db_path = root.join(DB_FILENAME);
        let conn = Connection::open(&db_path)
            .with_context(|| format!("opening sqlite at {}", db_path.display()))?;
        conn.execute_batch(
            "PRAGMA journal_mode = WAL;
             PRAGMA foreign_keys = ON;
             PRAGMA synchronous = NORMAL;",
        )?;
        let store = Self {
            root: root.to_path_buf(),
            conn,
        };
        store.migrate()?;
        Ok(store)
    }

    pub fn root(&self) -> &Path {
        &self.root
    }

    pub fn db_path(&self) -> PathBuf {
        self.root.join(DB_FILENAME)
    }

    pub fn schema_version(&self) -> rusqlite::Result<i64> {
        self.conn
            .pragma_query_value(None, "user_version", |row| row.get(0))
    }

    pub fn conn(&self) -> &Connection {
        &self.conn
    }

    pub fn conn_mut(&mut self) -> &mut Connection {
        &mut self.conn
    }

    fn migrate(&self) -> Result<()> {
        let current: i64 = self
            .conn
            .pragma_query_value(None, "user_version", |row| row.get(0))?;
        if current < 1 {
            self.conn.execute_batch(SCHEMA_V1)?;
        }
        if current < 2 {
            self.conn.execute_batch(SCHEMA_V2)?;
        }
        if current < 3 {
            self.conn.execute_batch(SCHEMA_V3)?;
        }
        if current < 4 {
            self.conn.execute_batch(&schema_v4())?;
        }
        if current < 5 {
            self.conn.execute_batch(&schema_v5())?;
        }
        if current < 6 {
            self.conn.execute_batch(&schema_v6())?;
        }
        if current < 7 {
            self.conn.execute_batch(&schema_v7())?;
        }
        if current < 8 {
            self.conn.execute_batch(&schema_v8())?;
        }
        if current < 9 {
            self.conn.execute_batch(&schema_v9())?;
        }
        if current < 10 {
            self.conn.execute_batch(SCHEMA_V10)?;
        }
        if current < 11 {
            self.conn.execute_batch(&schema_v11())?;
        }
        if current < 12 {
            self.conn.execute_batch(&schema_v12())?;
        }
        if current < 13 {
            self.conn.execute_batch(&schema_v13())?;
        }
        if current < 14 {
            self.conn.execute_batch(&schema_v14())?;
        }
        if current < 15 {
            self.conn.execute_batch(&schema_v15())?;
        }
        if current < 16 {
            self.conn.execute_batch(&schema_v16())?;
        }
        if current < 17 {
            self.conn.execute_batch(&schema_v17())?;
        }
        if current < 18 {
            self.conn.execute_batch(SCHEMA_V18)?;
        }
        if current < SCHEMA_VERSION {
            self.conn
                .pragma_update(None, "user_version", SCHEMA_VERSION)?;
        }
        Ok(())
    }
}

const SCHEMA_V1: &str = r#"
CREATE TABLE IF NOT EXISTS sources (
    id              TEXT PRIMARY KEY,
    uri             TEXT NOT NULL,
    path            TEXT,
    kind            TEXT NOT NULL,
    bytes           INTEGER NOT NULL,
    content_sha256  TEXT NOT NULL,
    mtime_unix      INTEGER,
    ingested_at     INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_sources_uri ON sources(uri);
CREATE INDEX IF NOT EXISTS idx_sources_content_sha ON sources(content_sha256);

CREATE TABLE IF NOT EXISTS chunks (
    id          TEXT PRIMARY KEY,
    source_id   TEXT NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
    ordinal     INTEGER NOT NULL,
    byte_start  INTEGER NOT NULL,
    byte_end    INTEGER NOT NULL,
    char_count  INTEGER NOT NULL,
    text        TEXT NOT NULL,
    sha256      TEXT NOT NULL,
    created_at  INTEGER NOT NULL,
    UNIQUE(source_id, ordinal)
);

CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source_id);
"#;

// v2: add an FTS5 virtual table shadowing chunk text plus triggers that keep
// it synchronized with the `chunks` table. A contentful FTS5 table (the
// default) stores its own copy of the text; duplication is small compared to
// the value of getting BM25 ranking and snippet() support for free.
const SCHEMA_V2: &str = r#"
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
    text,
    tokenize = 'unicode61 remove_diacritics 2'
);

CREATE TRIGGER IF NOT EXISTS chunks_fts_ai AFTER INSERT ON chunks BEGIN
    INSERT INTO chunks_fts(rowid, text) VALUES (new.rowid, new.text);
END;

CREATE TRIGGER IF NOT EXISTS chunks_fts_ad AFTER DELETE ON chunks BEGIN
    DELETE FROM chunks_fts WHERE rowid = old.rowid;
END;

CREATE TRIGGER IF NOT EXISTS chunks_fts_au AFTER UPDATE OF text ON chunks BEGIN
    UPDATE chunks_fts SET text = new.text WHERE rowid = old.rowid;
END;

-- Backfill rows that predated the FTS index (no-op on a fresh store).
INSERT INTO chunks_fts(rowid, text)
SELECT rowid, text FROM chunks
WHERE rowid NOT IN (SELECT rowid FROM chunks_fts);
"#;

// v3: per-chunk dense embeddings, stored as a little-endian f32 BLOB alongside
// the model that produced them so a future retrieval path can decide whether
// a stored vector is still valid. Brute-force cosine search happens in Rust.
const SCHEMA_V3: &str = r#"
CREATE TABLE IF NOT EXISTS embeddings (
    chunk_id    TEXT PRIMARY KEY REFERENCES chunks(id) ON DELETE CASCADE,
    model       TEXT NOT NULL,
    dim         INTEGER NOT NULL,
    embedding   BLOB NOT NULL,
    created_at  INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_embeddings_model ON embeddings(model);
"#;

// v4: a vec0 mirror of the default-model embeddings. `distance_metric=cosine`
// makes the mirror's `distance` column `1 - cosine_similarity`, which lines up
// with the existing brute-force ordering when a later slice wires search
// through it. Created here so the table is ready before any dual-write path.
fn schema_v4() -> String {
    format!(
        "CREATE VIRTUAL TABLE IF NOT EXISTS {VEC_MIRROR_TABLE} USING vec0(
            embedding float[768] distance_metric=cosine
        );"
    )
}

// v5: backfill the default-model vec0 mirror from existing canonical embeddings
// so upgraded stores keep working immediately after migration.
fn schema_v5() -> String {
    format!(
        "INSERT INTO {VEC_MIRROR_TABLE}(rowid, embedding)
         SELECT c.rowid, e.embedding
         FROM embeddings e
         JOIN chunks c ON c.id = e.chunk_id
         WHERE e.model = '{DEFAULT_EMBED_MODEL}'
           AND NOT EXISTS (
               SELECT 1 FROM {VEC_MIRROR_TABLE} v WHERE v.rowid = c.rowid
           );"
    )
}

fn schema_v6() -> String {
    [
        "ALTER TABLE chunks ADD COLUMN role TEXT;",
        "ALTER TABLE chunks ADD COLUMN session_id TEXT;",
        "ALTER TABLE chunks ADD COLUMN turn_id TEXT;",
        "ALTER TABLE chunks ADD COLUMN tool_name TEXT;",
        "ALTER TABLE chunks ADD COLUMN timestamp_unix INTEGER;",
    ]
    .join("\n")
}

fn schema_v7() -> String {
    [
        "ALTER TABLE chunks ADD COLUMN access_count INTEGER NOT NULL DEFAULT 0;",
        "ALTER TABLE chunks ADD COLUMN last_accessed_at INTEGER;",
    ]
    .join("\n")
}

// v8: signed net user-feedback score per chunk. Positive values are explicit
// thumbs-up (the chunk helped answer a query); negative values are thumbs-down
// (the chunk was misleading or irrelevant). `0` is the neutral default and must
// leave confidence scoring unchanged so existing stores see no ranking drift
// after the migration.
fn schema_v8() -> String {
    "ALTER TABLE chunks ADD COLUMN feedback_score INTEGER NOT NULL DEFAULT 0;".to_string()
}

// v9: separate checkpoint for background access-metadata decay. The search
// path updates this column whenever a chunk is touched; the compacting pass can
// then decay old access counts incrementally without disturbing the
// user-visible freshness timestamp used for confidence scoring.
fn schema_v9() -> String {
    "ALTER TABLE chunks ADD COLUMN access_decay_at INTEGER;".to_string()
}

// v10: provenance-preserving entity layer. `entities` holds one row per distinct
// (kind, value) pair extracted from chunk text — for now just URLs, but the
// shape is general so future kinds (emails, repository slugs, domains, file
// paths, @-mentions, #hashtags) plug in
// without another migration. `chunk_entities` joins each entity back to the
// chunks it appeared in, so a query like "which chunks mention this URL?" is
// an index lookup. Both tables cascade on chunk/source delete to keep the
// graph in sync with the underlying chunk lifecycle.
const SCHEMA_V10: &str = r#"
CREATE TABLE IF NOT EXISTS entities (
    id          TEXT PRIMARY KEY,
    kind        TEXT NOT NULL,
    value       TEXT NOT NULL,
    created_at  INTEGER NOT NULL,
    UNIQUE(kind, value)
);

CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(kind);

CREATE TABLE IF NOT EXISTS chunk_entities (
    chunk_id   TEXT NOT NULL REFERENCES chunks(id) ON DELETE CASCADE,
    entity_id  TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
    PRIMARY KEY (chunk_id, entity_id)
);

CREATE INDEX IF NOT EXISTS idx_chunk_entities_entity ON chunk_entities(entity_id);
"#;

// v11: positive-only query-success counter per chunk. Incremented when a chunk
// is recorded as having helped answer a query successfully. `0` is the neutral
// default and must leave confidence scoring unchanged so existing stores see
// no ranking drift after the migration.
fn schema_v11() -> String {
    "ALTER TABLE chunks ADD COLUMN query_success_count INTEGER NOT NULL DEFAULT 0;".to_string()
}

// v12: tool-call linkage for agent-aware transcript ingestion. Tool call IDs are
// often distinct from turn/message IDs; storing them separately lets exports and
// search results preserve the direct tool-result provenance without conflating it
// with the turn identifier.
fn schema_v12() -> String {
    "ALTER TABLE chunks ADD COLUMN tool_call_id TEXT;".to_string()
}

// v13: parent-turn linkage for agent-aware transcript ingestion. A chunk can
// point back to the turn or message it replied to, preserving conversation
// lineage without overloading the chunk's own turn identifier.
fn schema_v13() -> String {
    "ALTER TABLE chunks ADD COLUMN parent_turn_id TEXT;".to_string()
}

// v14: higher-level grouping metadata for multi-session memory linkage. A
// chunk's `project` records the upstream project / repository the JSONL
// transcript came from (extracted from `project`, `project_id`, `projectId`,
// `repo`, or `repository` aliases on ingest). Nullable so the field stays a
// no-op for content that never carried a project tag, and so existing stores
// keep working unchanged after the migration.
fn schema_v14() -> String {
    "ALTER TABLE chunks ADD COLUMN project TEXT;".to_string()
}

// v15: companion to v14. A chunk's `user` records the upstream user /
// participant the JSONL transcript came from (extracted from `user`,
// `user_id`, `userId`, `author`, or `participant` aliases on ingest).
// Nullable so the field stays a no-op for content that never carried a user
// tag, and so existing stores keep working unchanged after the migration.
fn schema_v15() -> String {
    "ALTER TABLE chunks ADD COLUMN user TEXT;".to_string()
}

// v16: companion to v14/v15. A chunk's `topic` records the upstream subject /
// topic / category the JSONL transcript came from (extracted from `topic`,
// `topic_id`, `topicId`, `subject`, or `category` aliases on ingest).
// Nullable so the field stays a no-op for content that never carried a topic
// tag, and so existing stores keep working unchanged after the migration.
fn schema_v16() -> String {
    "ALTER TABLE chunks ADD COLUMN topic TEXT;".to_string()
}

// v17: companion to v14/v15/v16. A chunk's `thread` records the upstream
// thread or channel grouping the JSONL transcript came from (extracted from
// `thread`, `thread_id`, or `threadId` aliases on ingest). Nullable so the
// field stays a no-op for content that never carried a thread tag, and so
// existing stores keep working unchanged after the migration.
fn schema_v17() -> String {
    "ALTER TABLE chunks ADD COLUMN thread TEXT;".to_string()
}

// v18: first-class writable memory records. These are explicit durable state
// objects above raw evidence chunks: facts, preferences, goals, constraints,
// observations, hypotheses, and task state with priority/lifecycle fields.
const SCHEMA_V18: &str = r#"
CREATE TABLE IF NOT EXISTS memory_records (
    id               TEXT PRIMARY KEY,
    kind             TEXT NOT NULL,
    scope            TEXT NOT NULL,
    content          TEXT NOT NULL,
    priority         INTEGER NOT NULL DEFAULT 50,
    urgency          INTEGER NOT NULL DEFAULT 0,
    confidence       REAL NOT NULL DEFAULT 1.0,
    status           TEXT NOT NULL DEFAULT 'active',
    source_refs_json TEXT NOT NULL DEFAULT '[]',
    created_at       INTEGER NOT NULL,
    updated_at       INTEGER NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_memory_records_kind ON memory_records(kind);
CREATE INDEX IF NOT EXISTS idx_memory_records_scope ON memory_records(scope);
CREATE INDEX IF NOT EXISTS idx_memory_records_status ON memory_records(status);
CREATE INDEX IF NOT EXISTS idx_memory_records_priority ON memory_records(priority DESC, urgency DESC, updated_at DESC);
"#;

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[test]
    fn initialize_is_idempotent() {
        let dir = tempdir().unwrap();
        let root = dir.path().join("store");
        let _ = Store::initialize(&root).unwrap();
        let store = Store::initialize(&root).unwrap();
        let version: i64 = store
            .conn()
            .pragma_query_value(None, "user_version", |row| row.get(0))
            .unwrap();
        assert_eq!(version, SCHEMA_VERSION);
    }

    #[test]
    fn open_requires_existing_directory() {
        let dir = tempdir().unwrap();
        let missing = dir.path().join("nope");
        assert!(Store::open(&missing).is_err());
    }

    #[test]
    fn sqlite_vec_extension_is_available() {
        let dir = tempdir().unwrap();
        let store = Store::initialize(&dir.path().join("store")).unwrap();
        let version: String = store
            .conn()
            .query_row("SELECT vec_version()", [], |row| row.get(0))
            .unwrap();
        assert!(!version.is_empty());
    }

    #[test]
    fn fts_table_exists_after_init() {
        let dir = tempdir().unwrap();
        let store = Store::initialize(&dir.path().join("store")).unwrap();
        let name: String = store
            .conn()
            .query_row(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='chunks_fts'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(name, "chunks_fts");
    }

    #[test]
    fn vec_mirror_table_exists_after_init() {
        let dir = tempdir().unwrap();
        let store = Store::initialize(&dir.path().join("store")).unwrap();
        let name: String = store
            .conn()
            .query_row(
                "SELECT name FROM sqlite_master WHERE name = 'chunks_vec_nomic_768'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(name, "chunks_vec_nomic_768");
    }
}