Skip to main content

sqlite_knowledge_graph/
schema.rs

1//! Database schema creation and management.
2//!
3//! # Versioned migration system
4//!
5//! The schema is versioned via the `kg_schema_version` table (single-row).
6//! Call [`ensure_schema`] on every database open to apply any pending migrations
7//! automatically.
8//!
9//! | Version | Changes |
10//! |---------|---------|
11//! | 1       | Initial schema: entities, relations, vectors, hyperedges, turboquant cache |
12//! | 2       | Add `vectors_checksum` column to `kg_turboquant_cache` |
13//! | 3       | SmartVector: temporal awareness, confidence decay, dependencies |
14//! | 4       | QuaQue versioning: `kg_versions` table, `validity` columns on entities/relations |
15
16use rusqlite::Connection;
17
18use crate::error::{Error, Result};
19
20/// Latest known schema version.  Bump this whenever a new migration is added.
21const CURRENT_SCHEMA_VERSION: i32 = 4;
22
23// ─────────────────────────────────────────────────────────────────────────────
24// Public API
25// ─────────────────────────────────────────────────────────────────────────────
26
27/// Ensure the database schema is up to date, running any pending migrations.
28///
29/// Safe to call on:
30/// - A brand-new empty database (applies all migrations from scratch).
31/// - An existing database without version tracking (detects v1 tables, starts
32///   from v1 so existing data is preserved).
33/// - An already fully-migrated database (fast no-op path).
34pub fn ensure_schema(conn: &Connection) -> Result<()> {
35    // Bootstrap: create the version table.  This is idempotent.
36    conn.execute_batch("CREATE TABLE IF NOT EXISTS kg_schema_version (version INTEGER NOT NULL);")?;
37
38    // Read the stored version, if any.
39    let stored: Option<i32> = conn
40        .query_row("SELECT version FROM kg_schema_version", [], |r| r.get(0))
41        .ok();
42
43    let current_version = match stored {
44        Some(v) => v,
45        None => {
46            // No version row yet.  Distinguish a legacy DB (core tables already
47            // exist) from a fresh one so we never re-create existing tables.
48            if schema_exists(conn)? {
49                1 // Legacy database: all v1 tables are present, start from there.
50            } else {
51                0 // Brand-new database: nothing applied yet.
52            }
53        }
54    };
55
56    if current_version >= CURRENT_SCHEMA_VERSION {
57        return Ok(()); // Already up to date — fast path.
58    }
59
60    // Apply all pending migrations inside a single transaction.
61    let tx = conn.unchecked_transaction()?;
62    for v in (current_version + 1)..=CURRENT_SCHEMA_VERSION {
63        apply_migration(&tx, v)?;
64    }
65
66    // Persist the new version (replace any existing row).
67    tx.execute("DELETE FROM kg_schema_version", [])?;
68    tx.execute(
69        "INSERT INTO kg_schema_version (version) VALUES (?1)",
70        [CURRENT_SCHEMA_VERSION],
71    )?;
72
73    tx.commit()?;
74    Ok(())
75}
76
77/// Create the knowledge graph schema in the database.
78///
79/// Alias for [`ensure_schema`] kept for backward compatibility.
80#[inline]
81pub fn create_schema(conn: &Connection) -> Result<()> {
82    ensure_schema(conn)
83}
84
85/// Return the current schema version stored in the database.
86///
87/// Returns `None` if `kg_schema_version` has not been populated yet (e.g. a
88/// legacy DB that has never been opened with this library version).
89pub fn schema_version(conn: &Connection) -> Result<Option<i32>> {
90    let v = conn
91        .query_row("SELECT version FROM kg_schema_version", [], |r| r.get(0))
92        .ok();
93    Ok(v)
94}
95
96/// Check if the core schema tables exist (used for legacy-DB detection).
97pub fn schema_exists(conn: &Connection) -> Result<bool> {
98    let mut stmt = conn
99        .prepare("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='kg_entities'")?;
100    let count: i64 = stmt.query_row([], |row| row.get(0))?;
101    Ok(count > 0)
102}
103
104// ─────────────────────────────────────────────────────────────────────────────
105// Migration runner
106// ─────────────────────────────────────────────────────────────────────────────
107
108fn apply_migration(conn: &Connection, version: i32) -> Result<()> {
109    match version {
110        1 => migration_v1(conn),
111        2 => migration_v2(conn),
112        3 => migration_v3(conn),
113        4 => migration_v4(conn),
114        _ => Err(Error::Other(format!(
115            "Unknown schema migration version: {}",
116            version
117        ))),
118    }
119}
120
121// ─────────────────────────────────────────────────────────────────────────────
122// Migrations
123// ─────────────────────────────────────────────────────────────────────────────
124
125/// Migration v1 — initial schema (all core tables and indexes).
126fn migration_v1(conn: &Connection) -> Result<()> {
127    conn.execute_batch(
128        r#"
129        CREATE TABLE IF NOT EXISTS kg_entities (
130            id INTEGER PRIMARY KEY AUTOINCREMENT,
131            entity_type TEXT NOT NULL,
132            name TEXT NOT NULL,
133            properties TEXT,
134            created_at INTEGER DEFAULT (strftime('%s', 'now')),
135            updated_at INTEGER DEFAULT (strftime('%s', 'now'))
136        );
137
138        CREATE TABLE IF NOT EXISTS kg_relations (
139            id INTEGER PRIMARY KEY AUTOINCREMENT,
140            source_id INTEGER NOT NULL,
141            target_id INTEGER NOT NULL,
142            rel_type TEXT NOT NULL,
143            weight REAL DEFAULT 1.0,
144            properties TEXT,
145            created_at INTEGER DEFAULT (strftime('%s', 'now')),
146            FOREIGN KEY (source_id) REFERENCES kg_entities(id) ON DELETE CASCADE,
147            FOREIGN KEY (target_id) REFERENCES kg_entities(id) ON DELETE CASCADE
148        );
149
150        CREATE TABLE IF NOT EXISTS kg_vectors (
151            entity_id INTEGER NOT NULL PRIMARY KEY,
152            vector BLOB NOT NULL,
153            dimension INTEGER NOT NULL,
154            created_at INTEGER DEFAULT (strftime('%s', 'now')),
155            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
156        );
157
158        CREATE INDEX IF NOT EXISTS idx_entities_type ON kg_entities(entity_type);
159        CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name);
160        CREATE INDEX IF NOT EXISTS idx_relations_source ON kg_relations(source_id);
161        CREATE INDEX IF NOT EXISTS idx_relations_target ON kg_relations(target_id);
162        CREATE INDEX IF NOT EXISTS idx_relations_type ON kg_relations(rel_type);
163
164        CREATE TABLE IF NOT EXISTS kg_hyperedges (
165            id INTEGER PRIMARY KEY AUTOINCREMENT,
166            hyperedge_type TEXT NOT NULL,
167            entity_ids TEXT NOT NULL,
168            weight REAL DEFAULT 1.0,
169            arity INTEGER NOT NULL,
170            properties TEXT,
171            created_at INTEGER DEFAULT (strftime('%s', 'now')),
172            updated_at INTEGER DEFAULT (strftime('%s', 'now'))
173        );
174
175        CREATE TABLE IF NOT EXISTS kg_hyperedge_entities (
176            hyperedge_id INTEGER NOT NULL,
177            entity_id INTEGER NOT NULL,
178            position INTEGER NOT NULL,
179            PRIMARY KEY (hyperedge_id, entity_id),
180            FOREIGN KEY (hyperedge_id) REFERENCES kg_hyperedges(id) ON DELETE CASCADE,
181            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
182        );
183
184        CREATE INDEX IF NOT EXISTS idx_hyperedges_type ON kg_hyperedges(hyperedge_type);
185        CREATE INDEX IF NOT EXISTS idx_hyperedges_arity ON kg_hyperedges(arity);
186        CREATE INDEX IF NOT EXISTS idx_he_entities_entity ON kg_hyperedge_entities(entity_id);
187        CREATE INDEX IF NOT EXISTS idx_he_entities_hyperedge ON kg_hyperedge_entities(hyperedge_id);
188
189        CREATE TABLE IF NOT EXISTS kg_turboquant_cache (
190            id INTEGER PRIMARY KEY CHECK (id = 1),
191            index_blob BLOB NOT NULL,
192            vector_count INTEGER NOT NULL
193        );
194        "#,
195    )?;
196    Ok(())
197}
198
199/// Migration v2 — add `vectors_checksum` to `kg_turboquant_cache`.
200///
201/// The checksum (`COALESCE(SUM(entity_id), 0)` over `kg_vectors`) is a
202/// lightweight fingerprint that detects cache staleness even when the
203/// *count* of vectors stays the same — for example when one vector is
204/// deleted and a different one is inserted.
205fn migration_v2(conn: &Connection) -> Result<()> {
206    conn.execute_batch(
207        "ALTER TABLE kg_turboquant_cache \
208         ADD COLUMN vectors_checksum INTEGER NOT NULL DEFAULT 0;",
209    )?;
210    Ok(())
211}
212
213/// Migration v3 — SmartVector: temporal awareness, confidence decay, dependencies.
214///
215/// Adds new columns to `kg_entities` and `kg_relations`, and creates two new
216/// tables (`kg_dependencies`, `kg_confidence_log`) for dependency tracking and
217/// confidence change history.  All changes are non-destructive: existing rows
218/// receive column defaults and are otherwise untouched.
219fn migration_v3(conn: &Connection) -> Result<()> {
220    conn.execute_batch(
221        r#"
222        ALTER TABLE kg_entities ADD COLUMN confidence      REAL    DEFAULT 1.0;
223        ALTER TABLE kg_entities ADD COLUMN access_count   INTEGER DEFAULT 0;
224        ALTER TABLE kg_entities ADD COLUMN last_accessed  INTEGER;
225        ALTER TABLE kg_entities ADD COLUMN valid_from     INTEGER;
226        ALTER TABLE kg_entities ADD COLUMN valid_until    INTEGER;
227        ALTER TABLE kg_entities ADD COLUMN base_confidence REAL   DEFAULT 1.0;
228        ALTER TABLE kg_entities ADD COLUMN decay_rate     REAL    DEFAULT 0.05;
229
230        ALTER TABLE kg_relations ADD COLUMN confidence  REAL    DEFAULT 1.0;
231        ALTER TABLE kg_relations ADD COLUMN valid_from  INTEGER;
232        ALTER TABLE kg_relations ADD COLUMN valid_until INTEGER;
233
234        CREATE TABLE IF NOT EXISTS kg_dependencies (
235            id        INTEGER PRIMARY KEY AUTOINCREMENT,
236            source_id INTEGER NOT NULL,
237            target_id INTEGER NOT NULL,
238            dep_type  TEXT    NOT NULL,
239            created_at INTEGER DEFAULT (strftime('%s', 'now')),
240            FOREIGN KEY (source_id) REFERENCES kg_entities(id) ON DELETE CASCADE,
241            FOREIGN KEY (target_id) REFERENCES kg_entities(id) ON DELETE CASCADE
242        );
243        CREATE INDEX IF NOT EXISTS idx_deps_source ON kg_dependencies(source_id);
244        CREATE INDEX IF NOT EXISTS idx_deps_target ON kg_dependencies(target_id);
245
246        CREATE TABLE IF NOT EXISTS kg_confidence_log (
247            id         INTEGER PRIMARY KEY AUTOINCREMENT,
248            entity_id  INTEGER NOT NULL,
249            old_value  REAL    NOT NULL,
250            new_value  REAL    NOT NULL,
251            reason     TEXT    NOT NULL,
252            created_at INTEGER DEFAULT (strftime('%s', 'now')),
253            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
254        );
255        CREATE INDEX IF NOT EXISTS idx_conf_log_entity ON kg_confidence_log(entity_id);
256        CREATE INDEX IF NOT EXISTS idx_conf_log_entity_reason ON kg_confidence_log(entity_id, reason);
257    "#,
258    )?;
259    Ok(())
260}
261
262/// Migration v4 — QuaQue versioning: versions table and validity columns.
263///
264/// Adds a `kg_versions` table for version metadata and a `validity` bitstring
265/// column to `kg_entities` and `kg_relations`.  NULL validity means "unversioned,
266/// visible in all queries".  Non-NULL bitstring tracks which versions the row
267/// belongs to.
268///
269/// Each version owns a `bit_slot` in `[0, 63]` (the bit position it occupies in
270/// the validity bitstring), assigned at creation and freed on deletion.  The
271/// `UNIQUE` constraint guarantees no two live versions share a slot, and the
272/// 0–63 range is what makes the 64-version concurrency limit a hard, reclaimable
273/// boundary rather than an unbounded function of the auto-increment id.
274fn migration_v4(conn: &Connection) -> Result<()> {
275    conn.execute_batch(
276        r#"
277        CREATE TABLE IF NOT EXISTS kg_versions (
278            id          INTEGER PRIMARY KEY AUTOINCREMENT,
279            name        TEXT NOT NULL UNIQUE,
280            branch      TEXT NOT NULL DEFAULT 'main',
281            parent_id   INTEGER REFERENCES kg_versions(id) ON DELETE SET NULL,
282            description TEXT,
283            created_at  INTEGER DEFAULT (strftime('%s', 'now')),
284            is_merged   INTEGER NOT NULL DEFAULT 0,
285            bit_slot    INTEGER NOT NULL UNIQUE CHECK (bit_slot BETWEEN 0 AND 63)
286        );
287
288        CREATE INDEX IF NOT EXISTS idx_versions_branch ON kg_versions(branch);
289        CREATE INDEX IF NOT EXISTS idx_versions_parent ON kg_versions(parent_id);
290
291        ALTER TABLE kg_entities ADD COLUMN validity INTEGER DEFAULT NULL;
292        ALTER TABLE kg_relations ADD COLUMN validity INTEGER DEFAULT NULL;
293    "#,
294    )?;
295    Ok(())
296}
297
298// ─────────────────────────────────────────────────────────────────────────────
299// Tests
300// ─────────────────────────────────────────────────────────────────────────────
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use rusqlite::Connection;
306
307    #[test]
308    fn test_fresh_db_reaches_current_version() {
309        let conn = Connection::open_in_memory().unwrap();
310        ensure_schema(&conn).unwrap();
311        let v = schema_version(&conn).unwrap();
312        assert_eq!(v, Some(CURRENT_SCHEMA_VERSION));
313    }
314
315    #[test]
316    fn test_idempotent_second_call() {
317        let conn = Connection::open_in_memory().unwrap();
318        ensure_schema(&conn).unwrap();
319        // Should not error and version stays the same.
320        ensure_schema(&conn).unwrap();
321        let v = schema_version(&conn).unwrap();
322        assert_eq!(v, Some(CURRENT_SCHEMA_VERSION));
323    }
324
325    #[test]
326    fn test_legacy_db_migrates_from_v1() {
327        let conn = Connection::open_in_memory().unwrap();
328
329        // Simulate a v1 database: apply only migration_v1 manually (no version row).
330        migration_v1(&conn).unwrap();
331        assert!(schema_exists(&conn).unwrap());
332        assert_eq!(schema_version(&conn).unwrap(), None); // no version yet
333
334        // Now run ensure_schema: should detect legacy v1 and apply only v2.
335        ensure_schema(&conn).unwrap();
336        assert_eq!(schema_version(&conn).unwrap(), Some(CURRENT_SCHEMA_VERSION));
337
338        // The vectors_checksum column must now exist.
339        conn.execute(
340            "INSERT INTO kg_turboquant_cache (id, index_blob, vector_count, vectors_checksum) \
341             VALUES (1, X'', 0, 0)",
342            [],
343        )
344        .unwrap();
345    }
346
347    #[test]
348    fn test_all_tables_created() {
349        let conn = Connection::open_in_memory().unwrap();
350        ensure_schema(&conn).unwrap();
351
352        let tables = [
353            "kg_entities",
354            "kg_relations",
355            "kg_vectors",
356            "kg_hyperedges",
357            "kg_hyperedge_entities",
358            "kg_turboquant_cache",
359            "kg_schema_version",
360            "kg_versions",
361            "kg_dependencies",
362            "kg_confidence_log",
363        ];
364
365        for table in &tables {
366            let count: i64 = conn
367                .query_row(
368                    "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
369                    [table],
370                    |r| r.get(0),
371                )
372                .unwrap();
373            assert_eq!(count, 1, "table {table} should exist");
374        }
375    }
376
377    #[test]
378    fn test_v3_entity_columns_exist() {
379        let conn = Connection::open_in_memory().unwrap();
380        ensure_schema(&conn).unwrap();
381
382        // Verify new v3 columns are writable
383        conn.execute(
384            "INSERT INTO kg_entities \
385             (entity_type, name, confidence, access_count, base_confidence, decay_rate) \
386             VALUES ('test', 'T', 0.9, 5, 0.9, 0.05)",
387            [],
388        )
389        .unwrap();
390
391        let (conf, acc): (f64, i64) = conn
392            .query_row(
393                "SELECT confidence, access_count FROM kg_entities WHERE name = 'T'",
394                [],
395                |r| Ok((r.get(0)?, r.get(1)?)),
396            )
397            .unwrap();
398        assert!((conf - 0.9).abs() < 1e-9);
399        assert_eq!(acc, 5);
400    }
401
402    #[test]
403    fn test_v3_new_tables_writable() {
404        let conn = Connection::open_in_memory().unwrap();
405        ensure_schema(&conn).unwrap();
406
407        conn.execute(
408            "INSERT INTO kg_entities (entity_type, name) VALUES ('a', 'X')",
409            [],
410        )
411        .unwrap();
412        let id: i64 = conn.last_insert_rowid();
413
414        conn.execute(
415            "INSERT INTO kg_confidence_log (entity_id, old_value, new_value, reason) \
416             VALUES (?1, 1.0, 0.8, 'test')",
417            [id],
418        )
419        .unwrap();
420
421        let count: i64 = conn
422            .query_row("SELECT COUNT(*) FROM kg_confidence_log", [], |r| r.get(0))
423            .unwrap();
424        assert_eq!(count, 1);
425    }
426
427    #[test]
428    fn test_create_schema_alias() {
429        // create_schema must behave identically to ensure_schema.
430        let conn = Connection::open_in_memory().unwrap();
431        create_schema(&conn).unwrap();
432        assert_eq!(schema_version(&conn).unwrap(), Some(CURRENT_SCHEMA_VERSION));
433    }
434
435    #[test]
436    fn test_v4_validity_columns_exist() {
437        let conn = Connection::open_in_memory().unwrap();
438        ensure_schema(&conn).unwrap();
439
440        // Verify validity column on kg_entities (NULL by default)
441        conn.execute(
442            "INSERT INTO kg_entities (entity_type, name) VALUES ('test', 'V')",
443            [],
444        )
445        .unwrap();
446        let validity: Option<i64> = conn
447            .query_row(
448                "SELECT validity FROM kg_entities WHERE name = 'V'",
449                [],
450                |r| r.get(0),
451            )
452            .unwrap();
453        assert_eq!(validity, None);
454
455        // Verify validity column on kg_relations
456        let eid: i64 = conn.last_insert_rowid();
457        conn.execute(
458            "INSERT INTO kg_entities (entity_type, name) VALUES ('test', 'V2')",
459            [],
460        )
461        .unwrap();
462        let eid2: i64 = conn.last_insert_rowid();
463        conn.execute(
464            "INSERT INTO kg_relations (source_id, target_id, rel_type) VALUES (?1, ?2, 'rel')",
465            rusqlite::params![eid, eid2],
466        )
467        .unwrap();
468        let rel_validity: Option<i64> = conn
469            .query_row(
470                "SELECT validity FROM kg_relations WHERE source_id = ?1",
471                [eid],
472                |r| r.get(0),
473            )
474            .unwrap();
475        assert_eq!(rel_validity, None);
476    }
477
478    #[test]
479    fn test_v4_versions_table_writable() {
480        let conn = Connection::open_in_memory().unwrap();
481        ensure_schema(&conn).unwrap();
482
483        conn.execute(
484            "INSERT INTO kg_versions (name, branch, description, bit_slot) \
485             VALUES ('v1', 'main', 'first', 0)",
486            [],
487        )
488        .unwrap();
489
490        let (name, branch, desc): (String, String, Option<String>) = conn
491            .query_row(
492                "SELECT name, branch, description FROM kg_versions WHERE name = 'v1'",
493                [],
494                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
495            )
496            .unwrap();
497        assert_eq!(name, "v1");
498        assert_eq!(branch, "main");
499        assert_eq!(desc.as_deref(), Some("first"));
500    }
501}