sqlite-knowledge-graph 0.13.0

A Rust library for building and querying knowledge graphs using SQLite as the backend, with graph algorithms, vector search, and RAG support
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
//! Database schema creation and management.
//!
//! # Versioned migration system
//!
//! The schema is versioned via the `kg_schema_version` table (single-row).
//! Call [`ensure_schema`] on every database open to apply any pending migrations
//! automatically.
//!
//! | Version | Changes |
//! |---------|---------|
//! | 1       | Initial schema: entities, relations, vectors, hyperedges, turboquant cache |
//! | 2       | Add `vectors_checksum` column to `kg_turboquant_cache` |
//! | 3       | SmartVector: temporal awareness, confidence decay, dependencies |
//! | 4       | QuaQue versioning: `kg_versions` table, `validity` columns on entities/relations |

use rusqlite::Connection;

use crate::error::{Error, Result};

/// Latest known schema version.  Bump this whenever a new migration is added.
const CURRENT_SCHEMA_VERSION: i32 = 4;

// ─────────────────────────────────────────────────────────────────────────────
// Public API
// ─────────────────────────────────────────────────────────────────────────────

/// Ensure the database schema is up to date, running any pending migrations.
///
/// Safe to call on:
/// - A brand-new empty database (applies all migrations from scratch).
/// - An existing database without version tracking (detects v1 tables, starts
///   from v1 so existing data is preserved).
/// - An already fully-migrated database (fast no-op path).
pub fn ensure_schema(conn: &Connection) -> Result<()> {
    // Bootstrap: create the version table.  This is idempotent.
    conn.execute_batch("CREATE TABLE IF NOT EXISTS kg_schema_version (version INTEGER NOT NULL);")?;

    // Read the stored version, if any.
    let stored: Option<i32> = conn
        .query_row("SELECT version FROM kg_schema_version", [], |r| r.get(0))
        .ok();

    let current_version = match stored {
        Some(v) => v,
        None => {
            // No version row yet.  Distinguish a legacy DB (core tables already
            // exist) from a fresh one so we never re-create existing tables.
            if schema_exists(conn)? {
                1 // Legacy database: all v1 tables are present, start from there.
            } else {
                0 // Brand-new database: nothing applied yet.
            }
        }
    };

    if current_version >= CURRENT_SCHEMA_VERSION {
        return Ok(()); // Already up to date — fast path.
    }

    // Apply all pending migrations inside a single transaction.
    let tx = conn.unchecked_transaction()?;
    for v in (current_version + 1)..=CURRENT_SCHEMA_VERSION {
        apply_migration(&tx, v)?;
    }

    // Persist the new version (replace any existing row).
    tx.execute("DELETE FROM kg_schema_version", [])?;
    tx.execute(
        "INSERT INTO kg_schema_version (version) VALUES (?1)",
        [CURRENT_SCHEMA_VERSION],
    )?;

    tx.commit()?;
    Ok(())
}

/// Create the knowledge graph schema in the database.
///
/// Alias for [`ensure_schema`] kept for backward compatibility.
#[inline]
pub fn create_schema(conn: &Connection) -> Result<()> {
    ensure_schema(conn)
}

/// Return the current schema version stored in the database.
///
/// Returns `None` if `kg_schema_version` has not been populated yet (e.g. a
/// legacy DB that has never been opened with this library version).
pub fn schema_version(conn: &Connection) -> Result<Option<i32>> {
    let v = conn
        .query_row("SELECT version FROM kg_schema_version", [], |r| r.get(0))
        .ok();
    Ok(v)
}

/// Check if the core schema tables exist (used for legacy-DB detection).
pub fn schema_exists(conn: &Connection) -> Result<bool> {
    let mut stmt = conn
        .prepare("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='kg_entities'")?;
    let count: i64 = stmt.query_row([], |row| row.get(0))?;
    Ok(count > 0)
}

// ─────────────────────────────────────────────────────────────────────────────
// Migration runner
// ─────────────────────────────────────────────────────────────────────────────

fn apply_migration(conn: &Connection, version: i32) -> Result<()> {
    match version {
        1 => migration_v1(conn),
        2 => migration_v2(conn),
        3 => migration_v3(conn),
        4 => migration_v4(conn),
        _ => Err(Error::Other(format!(
            "Unknown schema migration version: {}",
            version
        ))),
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// Migrations
// ─────────────────────────────────────────────────────────────────────────────

/// Migration v1 — initial schema (all core tables and indexes).
fn migration_v1(conn: &Connection) -> Result<()> {
    conn.execute_batch(
        r#"
        CREATE TABLE IF NOT EXISTS kg_entities (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            entity_type TEXT NOT NULL,
            name TEXT NOT NULL,
            properties TEXT,
            created_at INTEGER DEFAULT (strftime('%s', 'now')),
            updated_at INTEGER DEFAULT (strftime('%s', 'now'))
        );

        CREATE TABLE IF NOT EXISTS kg_relations (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source_id INTEGER NOT NULL,
            target_id INTEGER NOT NULL,
            rel_type TEXT NOT NULL,
            weight REAL DEFAULT 1.0,
            properties TEXT,
            created_at INTEGER DEFAULT (strftime('%s', 'now')),
            FOREIGN KEY (source_id) REFERENCES kg_entities(id) ON DELETE CASCADE,
            FOREIGN KEY (target_id) REFERENCES kg_entities(id) ON DELETE CASCADE
        );

        CREATE TABLE IF NOT EXISTS kg_vectors (
            entity_id INTEGER NOT NULL PRIMARY KEY,
            vector BLOB NOT NULL,
            dimension INTEGER NOT NULL,
            created_at INTEGER DEFAULT (strftime('%s', 'now')),
            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
        );

        CREATE INDEX IF NOT EXISTS idx_entities_type ON kg_entities(entity_type);
        CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name);
        CREATE INDEX IF NOT EXISTS idx_relations_source ON kg_relations(source_id);
        CREATE INDEX IF NOT EXISTS idx_relations_target ON kg_relations(target_id);
        CREATE INDEX IF NOT EXISTS idx_relations_type ON kg_relations(rel_type);

        CREATE TABLE IF NOT EXISTS kg_hyperedges (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            hyperedge_type TEXT NOT NULL,
            entity_ids TEXT NOT NULL,
            weight REAL DEFAULT 1.0,
            arity INTEGER NOT NULL,
            properties TEXT,
            created_at INTEGER DEFAULT (strftime('%s', 'now')),
            updated_at INTEGER DEFAULT (strftime('%s', 'now'))
        );

        CREATE TABLE IF NOT EXISTS kg_hyperedge_entities (
            hyperedge_id INTEGER NOT NULL,
            entity_id INTEGER NOT NULL,
            position INTEGER NOT NULL,
            PRIMARY KEY (hyperedge_id, entity_id),
            FOREIGN KEY (hyperedge_id) REFERENCES kg_hyperedges(id) ON DELETE CASCADE,
            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
        );

        CREATE INDEX IF NOT EXISTS idx_hyperedges_type ON kg_hyperedges(hyperedge_type);
        CREATE INDEX IF NOT EXISTS idx_hyperedges_arity ON kg_hyperedges(arity);
        CREATE INDEX IF NOT EXISTS idx_he_entities_entity ON kg_hyperedge_entities(entity_id);
        CREATE INDEX IF NOT EXISTS idx_he_entities_hyperedge ON kg_hyperedge_entities(hyperedge_id);

        CREATE TABLE IF NOT EXISTS kg_turboquant_cache (
            id INTEGER PRIMARY KEY CHECK (id = 1),
            index_blob BLOB NOT NULL,
            vector_count INTEGER NOT NULL
        );
        "#,
    )?;
    Ok(())
}

/// Migration v2 — add `vectors_checksum` to `kg_turboquant_cache`.
///
/// The checksum (`COALESCE(SUM(entity_id), 0)` over `kg_vectors`) is a
/// lightweight fingerprint that detects cache staleness even when the
/// *count* of vectors stays the same — for example when one vector is
/// deleted and a different one is inserted.
fn migration_v2(conn: &Connection) -> Result<()> {
    conn.execute_batch(
        "ALTER TABLE kg_turboquant_cache \
         ADD COLUMN vectors_checksum INTEGER NOT NULL DEFAULT 0;",
    )?;
    Ok(())
}

/// Migration v3 — SmartVector: temporal awareness, confidence decay, dependencies.
///
/// Adds new columns to `kg_entities` and `kg_relations`, and creates two new
/// tables (`kg_dependencies`, `kg_confidence_log`) for dependency tracking and
/// confidence change history.  All changes are non-destructive: existing rows
/// receive column defaults and are otherwise untouched.
fn migration_v3(conn: &Connection) -> Result<()> {
    conn.execute_batch(
        r#"
        ALTER TABLE kg_entities ADD COLUMN confidence      REAL    DEFAULT 1.0;
        ALTER TABLE kg_entities ADD COLUMN access_count   INTEGER DEFAULT 0;
        ALTER TABLE kg_entities ADD COLUMN last_accessed  INTEGER;
        ALTER TABLE kg_entities ADD COLUMN valid_from     INTEGER;
        ALTER TABLE kg_entities ADD COLUMN valid_until    INTEGER;
        ALTER TABLE kg_entities ADD COLUMN base_confidence REAL   DEFAULT 1.0;
        ALTER TABLE kg_entities ADD COLUMN decay_rate     REAL    DEFAULT 0.05;

        ALTER TABLE kg_relations ADD COLUMN confidence  REAL    DEFAULT 1.0;
        ALTER TABLE kg_relations ADD COLUMN valid_from  INTEGER;
        ALTER TABLE kg_relations ADD COLUMN valid_until INTEGER;

        CREATE TABLE IF NOT EXISTS kg_dependencies (
            id        INTEGER PRIMARY KEY AUTOINCREMENT,
            source_id INTEGER NOT NULL,
            target_id INTEGER NOT NULL,
            dep_type  TEXT    NOT NULL,
            created_at INTEGER DEFAULT (strftime('%s', 'now')),
            FOREIGN KEY (source_id) REFERENCES kg_entities(id) ON DELETE CASCADE,
            FOREIGN KEY (target_id) REFERENCES kg_entities(id) ON DELETE CASCADE
        );
        CREATE INDEX IF NOT EXISTS idx_deps_source ON kg_dependencies(source_id);
        CREATE INDEX IF NOT EXISTS idx_deps_target ON kg_dependencies(target_id);

        CREATE TABLE IF NOT EXISTS kg_confidence_log (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            entity_id  INTEGER NOT NULL,
            old_value  REAL    NOT NULL,
            new_value  REAL    NOT NULL,
            reason     TEXT    NOT NULL,
            created_at INTEGER DEFAULT (strftime('%s', 'now')),
            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
        );
        CREATE INDEX IF NOT EXISTS idx_conf_log_entity ON kg_confidence_log(entity_id);
        CREATE INDEX IF NOT EXISTS idx_conf_log_entity_reason ON kg_confidence_log(entity_id, reason);
    "#,
    )?;
    Ok(())
}

/// Migration v4 — QuaQue versioning: versions table and validity columns.
///
/// Adds a `kg_versions` table for version metadata and a `validity` bitstring
/// column to `kg_entities` and `kg_relations`.  NULL validity means "unversioned,
/// visible in all queries".  Non-NULL bitstring tracks which versions the row
/// belongs to.
///
/// Each version owns a `bit_slot` in `[0, 63]` (the bit position it occupies in
/// the validity bitstring), assigned at creation and freed on deletion.  The
/// `UNIQUE` constraint guarantees no two live versions share a slot, and the
/// 0–63 range is what makes the 64-version concurrency limit a hard, reclaimable
/// boundary rather than an unbounded function of the auto-increment id.
fn migration_v4(conn: &Connection) -> Result<()> {
    conn.execute_batch(
        r#"
        CREATE TABLE IF NOT EXISTS kg_versions (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            name        TEXT NOT NULL UNIQUE,
            branch      TEXT NOT NULL DEFAULT 'main',
            parent_id   INTEGER REFERENCES kg_versions(id) ON DELETE SET NULL,
            description TEXT,
            created_at  INTEGER DEFAULT (strftime('%s', 'now')),
            is_merged   INTEGER NOT NULL DEFAULT 0,
            bit_slot    INTEGER NOT NULL UNIQUE CHECK (bit_slot BETWEEN 0 AND 63)
        );

        CREATE INDEX IF NOT EXISTS idx_versions_branch ON kg_versions(branch);
        CREATE INDEX IF NOT EXISTS idx_versions_parent ON kg_versions(parent_id);

        ALTER TABLE kg_entities ADD COLUMN validity INTEGER DEFAULT NULL;
        ALTER TABLE kg_relations ADD COLUMN validity INTEGER DEFAULT NULL;
    "#,
    )?;
    Ok(())
}

// ─────────────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use rusqlite::Connection;

    #[test]
    fn test_fresh_db_reaches_current_version() {
        let conn = Connection::open_in_memory().unwrap();
        ensure_schema(&conn).unwrap();
        let v = schema_version(&conn).unwrap();
        assert_eq!(v, Some(CURRENT_SCHEMA_VERSION));
    }

    #[test]
    fn test_idempotent_second_call() {
        let conn = Connection::open_in_memory().unwrap();
        ensure_schema(&conn).unwrap();
        // Should not error and version stays the same.
        ensure_schema(&conn).unwrap();
        let v = schema_version(&conn).unwrap();
        assert_eq!(v, Some(CURRENT_SCHEMA_VERSION));
    }

    #[test]
    fn test_legacy_db_migrates_from_v1() {
        let conn = Connection::open_in_memory().unwrap();

        // Simulate a v1 database: apply only migration_v1 manually (no version row).
        migration_v1(&conn).unwrap();
        assert!(schema_exists(&conn).unwrap());
        assert_eq!(schema_version(&conn).unwrap(), None); // no version yet

        // Now run ensure_schema: should detect legacy v1 and apply only v2.
        ensure_schema(&conn).unwrap();
        assert_eq!(schema_version(&conn).unwrap(), Some(CURRENT_SCHEMA_VERSION));

        // The vectors_checksum column must now exist.
        conn.execute(
            "INSERT INTO kg_turboquant_cache (id, index_blob, vector_count, vectors_checksum) \
             VALUES (1, X'', 0, 0)",
            [],
        )
        .unwrap();
    }

    #[test]
    fn test_all_tables_created() {
        let conn = Connection::open_in_memory().unwrap();
        ensure_schema(&conn).unwrap();

        let tables = [
            "kg_entities",
            "kg_relations",
            "kg_vectors",
            "kg_hyperedges",
            "kg_hyperedge_entities",
            "kg_turboquant_cache",
            "kg_schema_version",
            "kg_versions",
            "kg_dependencies",
            "kg_confidence_log",
        ];

        for table in &tables {
            let count: i64 = conn
                .query_row(
                    "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
                    [table],
                    |r| r.get(0),
                )
                .unwrap();
            assert_eq!(count, 1, "table {table} should exist");
        }
    }

    #[test]
    fn test_v3_entity_columns_exist() {
        let conn = Connection::open_in_memory().unwrap();
        ensure_schema(&conn).unwrap();

        // Verify new v3 columns are writable
        conn.execute(
            "INSERT INTO kg_entities \
             (entity_type, name, confidence, access_count, base_confidence, decay_rate) \
             VALUES ('test', 'T', 0.9, 5, 0.9, 0.05)",
            [],
        )
        .unwrap();

        let (conf, acc): (f64, i64) = conn
            .query_row(
                "SELECT confidence, access_count FROM kg_entities WHERE name = 'T'",
                [],
                |r| Ok((r.get(0)?, r.get(1)?)),
            )
            .unwrap();
        assert!((conf - 0.9).abs() < 1e-9);
        assert_eq!(acc, 5);
    }

    #[test]
    fn test_v3_new_tables_writable() {
        let conn = Connection::open_in_memory().unwrap();
        ensure_schema(&conn).unwrap();

        conn.execute(
            "INSERT INTO kg_entities (entity_type, name) VALUES ('a', 'X')",
            [],
        )
        .unwrap();
        let id: i64 = conn.last_insert_rowid();

        conn.execute(
            "INSERT INTO kg_confidence_log (entity_id, old_value, new_value, reason) \
             VALUES (?1, 1.0, 0.8, 'test')",
            [id],
        )
        .unwrap();

        let count: i64 = conn
            .query_row("SELECT COUNT(*) FROM kg_confidence_log", [], |r| r.get(0))
            .unwrap();
        assert_eq!(count, 1);
    }

    #[test]
    fn test_create_schema_alias() {
        // create_schema must behave identically to ensure_schema.
        let conn = Connection::open_in_memory().unwrap();
        create_schema(&conn).unwrap();
        assert_eq!(schema_version(&conn).unwrap(), Some(CURRENT_SCHEMA_VERSION));
    }

    #[test]
    fn test_v4_validity_columns_exist() {
        let conn = Connection::open_in_memory().unwrap();
        ensure_schema(&conn).unwrap();

        // Verify validity column on kg_entities (NULL by default)
        conn.execute(
            "INSERT INTO kg_entities (entity_type, name) VALUES ('test', 'V')",
            [],
        )
        .unwrap();
        let validity: Option<i64> = conn
            .query_row(
                "SELECT validity FROM kg_entities WHERE name = 'V'",
                [],
                |r| r.get(0),
            )
            .unwrap();
        assert_eq!(validity, None);

        // Verify validity column on kg_relations
        let eid: i64 = conn.last_insert_rowid();
        conn.execute(
            "INSERT INTO kg_entities (entity_type, name) VALUES ('test', 'V2')",
            [],
        )
        .unwrap();
        let eid2: i64 = conn.last_insert_rowid();
        conn.execute(
            "INSERT INTO kg_relations (source_id, target_id, rel_type) VALUES (?1, ?2, 'rel')",
            rusqlite::params![eid, eid2],
        )
        .unwrap();
        let rel_validity: Option<i64> = conn
            .query_row(
                "SELECT validity FROM kg_relations WHERE source_id = ?1",
                [eid],
                |r| r.get(0),
            )
            .unwrap();
        assert_eq!(rel_validity, None);
    }

    #[test]
    fn test_v4_versions_table_writable() {
        let conn = Connection::open_in_memory().unwrap();
        ensure_schema(&conn).unwrap();

        conn.execute(
            "INSERT INTO kg_versions (name, branch, description, bit_slot) \
             VALUES ('v1', 'main', 'first', 0)",
            [],
        )
        .unwrap();

        let (name, branch, desc): (String, String, Option<String>) = conn
            .query_row(
                "SELECT name, branch, description FROM kg_versions WHERE name = 'v1'",
                [],
                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
            )
            .unwrap();
        assert_eq!(name, "v1");
        assert_eq!(branch, "main");
        assert_eq!(desc.as_deref(), Some("first"));
    }
}