Skip to main content

sqlite_knowledge_graph/
schema.rs

1//! Database schema creation and management.
2//!
3//! # Versioned migration system
4//!
5//! The schema is versioned via the `kg_schema_version` table (single-row).
6//! Call [`ensure_schema`] on every database open to apply any pending migrations
7//! automatically.
8//!
9//! | Version | Changes |
10//! |---------|---------|
11//! | 1       | Initial schema: entities, relations, vectors, hyperedges, turboquant cache |
12//! | 2       | Add `vectors_checksum` column to `kg_turboquant_cache` |
13
14use rusqlite::Connection;
15
16use crate::error::{Error, Result};
17
18/// Latest known schema version.  Bump this whenever a new migration is added.
19const CURRENT_SCHEMA_VERSION: i32 = 2;
20
21// ─────────────────────────────────────────────────────────────────────────────
22// Public API
23// ─────────────────────────────────────────────────────────────────────────────
24
25/// Ensure the database schema is up to date, running any pending migrations.
26///
27/// Safe to call on:
28/// - A brand-new empty database (applies all migrations from scratch).
29/// - An existing database without version tracking (detects v1 tables, starts
30///   from v1 so existing data is preserved).
31/// - An already fully-migrated database (fast no-op path).
32pub fn ensure_schema(conn: &Connection) -> Result<()> {
33    // Bootstrap: create the version table.  This is idempotent.
34    conn.execute_batch("CREATE TABLE IF NOT EXISTS kg_schema_version (version INTEGER NOT NULL);")?;
35
36    // Read the stored version, if any.
37    let stored: Option<i32> = conn
38        .query_row("SELECT version FROM kg_schema_version", [], |r| r.get(0))
39        .ok();
40
41    let current_version = match stored {
42        Some(v) => v,
43        None => {
44            // No version row yet.  Distinguish a legacy DB (core tables already
45            // exist) from a fresh one so we never re-create existing tables.
46            if schema_exists(conn)? {
47                1 // Legacy database: all v1 tables are present, start from there.
48            } else {
49                0 // Brand-new database: nothing applied yet.
50            }
51        }
52    };
53
54    if current_version >= CURRENT_SCHEMA_VERSION {
55        return Ok(()); // Already up to date — fast path.
56    }
57
58    // Apply all pending migrations inside a single transaction.
59    let tx = conn.unchecked_transaction()?;
60    for v in (current_version + 1)..=CURRENT_SCHEMA_VERSION {
61        apply_migration(&tx, v)?;
62    }
63
64    // Persist the new version (replace any existing row).
65    tx.execute("DELETE FROM kg_schema_version", [])?;
66    tx.execute(
67        "INSERT INTO kg_schema_version (version) VALUES (?1)",
68        [CURRENT_SCHEMA_VERSION],
69    )?;
70
71    tx.commit()?;
72    Ok(())
73}
74
75/// Create the knowledge graph schema in the database.
76///
77/// Alias for [`ensure_schema`] kept for backward compatibility.
78#[inline]
79pub fn create_schema(conn: &Connection) -> Result<()> {
80    ensure_schema(conn)
81}
82
83/// Return the current schema version stored in the database.
84///
85/// Returns `None` if `kg_schema_version` has not been populated yet (e.g. a
86/// legacy DB that has never been opened with this library version).
87pub fn schema_version(conn: &Connection) -> Result<Option<i32>> {
88    let v = conn
89        .query_row("SELECT version FROM kg_schema_version", [], |r| r.get(0))
90        .ok();
91    Ok(v)
92}
93
94/// Check if the core schema tables exist (used for legacy-DB detection).
95pub fn schema_exists(conn: &Connection) -> Result<bool> {
96    let mut stmt = conn
97        .prepare("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='kg_entities'")?;
98    let count: i64 = stmt.query_row([], |row| row.get(0))?;
99    Ok(count > 0)
100}
101
102// ─────────────────────────────────────────────────────────────────────────────
103// Migration runner
104// ─────────────────────────────────────────────────────────────────────────────
105
106fn apply_migration(conn: &Connection, version: i32) -> Result<()> {
107    match version {
108        1 => migration_v1(conn),
109        2 => migration_v2(conn),
110        _ => Err(Error::Other(format!(
111            "Unknown schema migration version: {}",
112            version
113        ))),
114    }
115}
116
117// ─────────────────────────────────────────────────────────────────────────────
118// Migrations
119// ─────────────────────────────────────────────────────────────────────────────
120
121/// Migration v1 — initial schema (all core tables and indexes).
122fn migration_v1(conn: &Connection) -> Result<()> {
123    conn.execute_batch(
124        r#"
125        CREATE TABLE IF NOT EXISTS kg_entities (
126            id INTEGER PRIMARY KEY AUTOINCREMENT,
127            entity_type TEXT NOT NULL,
128            name TEXT NOT NULL,
129            properties TEXT,
130            created_at INTEGER DEFAULT (strftime('%s', 'now')),
131            updated_at INTEGER DEFAULT (strftime('%s', 'now'))
132        );
133
134        CREATE TABLE IF NOT EXISTS kg_relations (
135            id INTEGER PRIMARY KEY AUTOINCREMENT,
136            source_id INTEGER NOT NULL,
137            target_id INTEGER NOT NULL,
138            rel_type TEXT NOT NULL,
139            weight REAL DEFAULT 1.0,
140            properties TEXT,
141            created_at INTEGER DEFAULT (strftime('%s', 'now')),
142            FOREIGN KEY (source_id) REFERENCES kg_entities(id) ON DELETE CASCADE,
143            FOREIGN KEY (target_id) REFERENCES kg_entities(id) ON DELETE CASCADE
144        );
145
146        CREATE TABLE IF NOT EXISTS kg_vectors (
147            entity_id INTEGER NOT NULL PRIMARY KEY,
148            vector BLOB NOT NULL,
149            dimension INTEGER NOT NULL,
150            created_at INTEGER DEFAULT (strftime('%s', 'now')),
151            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
152        );
153
154        CREATE INDEX IF NOT EXISTS idx_entities_type ON kg_entities(entity_type);
155        CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name);
156        CREATE INDEX IF NOT EXISTS idx_relations_source ON kg_relations(source_id);
157        CREATE INDEX IF NOT EXISTS idx_relations_target ON kg_relations(target_id);
158        CREATE INDEX IF NOT EXISTS idx_relations_type ON kg_relations(rel_type);
159
160        CREATE TABLE IF NOT EXISTS kg_hyperedges (
161            id INTEGER PRIMARY KEY AUTOINCREMENT,
162            hyperedge_type TEXT NOT NULL,
163            entity_ids TEXT NOT NULL,
164            weight REAL DEFAULT 1.0,
165            arity INTEGER NOT NULL,
166            properties TEXT,
167            created_at INTEGER DEFAULT (strftime('%s', 'now')),
168            updated_at INTEGER DEFAULT (strftime('%s', 'now'))
169        );
170
171        CREATE TABLE IF NOT EXISTS kg_hyperedge_entities (
172            hyperedge_id INTEGER NOT NULL,
173            entity_id INTEGER NOT NULL,
174            position INTEGER NOT NULL,
175            PRIMARY KEY (hyperedge_id, entity_id),
176            FOREIGN KEY (hyperedge_id) REFERENCES kg_hyperedges(id) ON DELETE CASCADE,
177            FOREIGN KEY (entity_id) REFERENCES kg_entities(id) ON DELETE CASCADE
178        );
179
180        CREATE INDEX IF NOT EXISTS idx_hyperedges_type ON kg_hyperedges(hyperedge_type);
181        CREATE INDEX IF NOT EXISTS idx_hyperedges_arity ON kg_hyperedges(arity);
182        CREATE INDEX IF NOT EXISTS idx_he_entities_entity ON kg_hyperedge_entities(entity_id);
183        CREATE INDEX IF NOT EXISTS idx_he_entities_hyperedge ON kg_hyperedge_entities(hyperedge_id);
184
185        CREATE TABLE IF NOT EXISTS kg_turboquant_cache (
186            id INTEGER PRIMARY KEY CHECK (id = 1),
187            index_blob BLOB NOT NULL,
188            vector_count INTEGER NOT NULL
189        );
190        "#,
191    )?;
192    Ok(())
193}
194
195/// Migration v2 — add `vectors_checksum` to `kg_turboquant_cache`.
196///
197/// The checksum (`COALESCE(SUM(entity_id), 0)` over `kg_vectors`) is a
198/// lightweight fingerprint that detects cache staleness even when the
199/// *count* of vectors stays the same — for example when one vector is
200/// deleted and a different one is inserted.
201fn migration_v2(conn: &Connection) -> Result<()> {
202    conn.execute_batch(
203        "ALTER TABLE kg_turboquant_cache \
204         ADD COLUMN vectors_checksum INTEGER NOT NULL DEFAULT 0;",
205    )?;
206    Ok(())
207}
208
209// ─────────────────────────────────────────────────────────────────────────────
210// Tests
211// ─────────────────────────────────────────────────────────────────────────────
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use rusqlite::Connection;
217
218    #[test]
219    fn test_fresh_db_reaches_current_version() {
220        let conn = Connection::open_in_memory().unwrap();
221        ensure_schema(&conn).unwrap();
222        let v = schema_version(&conn).unwrap();
223        assert_eq!(v, Some(CURRENT_SCHEMA_VERSION));
224    }
225
226    #[test]
227    fn test_idempotent_second_call() {
228        let conn = Connection::open_in_memory().unwrap();
229        ensure_schema(&conn).unwrap();
230        // Should not error and version stays the same.
231        ensure_schema(&conn).unwrap();
232        let v = schema_version(&conn).unwrap();
233        assert_eq!(v, Some(CURRENT_SCHEMA_VERSION));
234    }
235
236    #[test]
237    fn test_legacy_db_migrates_from_v1() {
238        let conn = Connection::open_in_memory().unwrap();
239
240        // Simulate a v1 database: apply only migration_v1 manually (no version row).
241        migration_v1(&conn).unwrap();
242        assert!(schema_exists(&conn).unwrap());
243        assert_eq!(schema_version(&conn).unwrap(), None); // no version yet
244
245        // Now run ensure_schema: should detect legacy v1 and apply only v2.
246        ensure_schema(&conn).unwrap();
247        assert_eq!(schema_version(&conn).unwrap(), Some(CURRENT_SCHEMA_VERSION));
248
249        // The vectors_checksum column must now exist.
250        conn.execute(
251            "INSERT INTO kg_turboquant_cache (id, index_blob, vector_count, vectors_checksum) \
252             VALUES (1, X'', 0, 0)",
253            [],
254        )
255        .unwrap();
256    }
257
258    #[test]
259    fn test_all_tables_created() {
260        let conn = Connection::open_in_memory().unwrap();
261        ensure_schema(&conn).unwrap();
262
263        let tables = [
264            "kg_entities",
265            "kg_relations",
266            "kg_vectors",
267            "kg_hyperedges",
268            "kg_hyperedge_entities",
269            "kg_turboquant_cache",
270            "kg_schema_version",
271        ];
272
273        for table in &tables {
274            let count: i64 = conn
275                .query_row(
276                    "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
277                    [table],
278                    |r| r.get(0),
279                )
280                .unwrap();
281            assert_eq!(count, 1, "table {table} should exist");
282        }
283    }
284
285    #[test]
286    fn test_create_schema_alias() {
287        // create_schema must behave identically to ensure_schema.
288        let conn = Connection::open_in_memory().unwrap();
289        create_schema(&conn).unwrap();
290        assert_eq!(schema_version(&conn).unwrap(), Some(CURRENT_SCHEMA_VERSION));
291    }
292}