Skip to main content

sqlite_knowledge_graph/version/
store.rs

1//! Version CRUD operations.
2
3use rusqlite::{ffi, params, OptionalExtension};
4
5use super::{bit_from_slot, Version, MAX_VERSIONS};
6use crate::error::{Error, Result};
7
8/// Create a new version. Returns the version ID.
9///
10/// Allocates the lowest free `bit_slot` in `[0, 63]`.  Returns
11/// [`Error::VersionLimitExceeded`] when all 64 slots are occupied by live
12/// versions (deleting a version frees its slot for reuse).
13pub fn create_version(
14    conn: &rusqlite::Connection,
15    name: &str,
16    branch: &str,
17    parent_id: Option<i64>,
18    description: Option<&str>,
19) -> Result<i64> {
20    let slot = allocate_slot(conn)?;
21    conn.execute(
22        "INSERT INTO kg_versions (name, branch, parent_id, description, bit_slot) \
23         VALUES (?1, ?2, ?3, ?4, ?5)",
24        params![name, branch, parent_id, description, slot],
25    )
26    .map_err(|e| match e {
27        // Map the name UNIQUE violation to a typed error. Match the structured
28        // extended code rather than the English message text (which varies by
29        // SQLite/rusqlite version and locale); the `table.column` identifier is
30        // SQLite-generated and distinguishes it from the bit_slot UNIQUE column.
31        rusqlite::Error::SqliteFailure(err, Some(msg))
32            if err.extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE
33                && msg.contains("kg_versions.name") =>
34        {
35            Error::DuplicateVersionName(name.to_string())
36        }
37        other => Error::from(other),
38    })?;
39    Ok(conn.last_insert_rowid())
40}
41
42/// Find the lowest unused `bit_slot` in `[0, 63]`.
43fn allocate_slot(conn: &rusqlite::Connection) -> Result<i64> {
44    let mut stmt = conn.prepare("SELECT bit_slot FROM kg_versions")?;
45    let used: std::collections::HashSet<i64> = stmt
46        .query_map([], |r| r.get(0))?
47        .filter_map(|r| r.ok())
48        .collect();
49    (0..MAX_VERSIONS)
50        .find(|slot| !used.contains(slot))
51        .ok_or(Error::VersionLimitExceeded)
52}
53
54/// Delete a version by ID, clearing its bit from every entity and relation so
55/// the freed slot can be safely reused by a future version.  Runs in a single
56/// transaction: either the version row and all its bits go, or nothing does.
57pub fn delete_version(conn: &rusqlite::Connection, version_id: i64) -> Result<()> {
58    let bit = version_bit_for(conn, version_id)?; // also validates existence
59
60    let tx = conn.unchecked_transaction()?;
61    clear_bit(&tx, "kg_entities", bit)?;
62    clear_bit(&tx, "kg_relations", bit)?;
63    tx.execute("DELETE FROM kg_versions WHERE id = ?1", [version_id])?;
64    tx.commit()?;
65    Ok(())
66}
67
68/// Clear `bit` from the validity column of every row in `table`, collapsing a
69/// resulting 0 back to NULL (the unversioned sentinel).
70fn clear_bit(conn: &rusqlite::Connection, table: &str, bit: i64) -> Result<()> {
71    // `table` is a hard-coded literal at every call site, never user input.
72    conn.execute(
73        &format!(
74            "UPDATE {table} SET validity = CASE \
75             WHEN (validity & ~?1) = 0 THEN NULL ELSE validity & ~?1 END \
76             WHERE validity IS NOT NULL AND (validity & ?1) != 0"
77        ),
78        [bit],
79    )?;
80    Ok(())
81}
82
83/// Resolve a version id to its validity bitmask (`1 << bit_slot`).
84///
85/// Returns [`Error::VersionNotFound`] if the version does not exist, or
86/// [`Error::CorruptBitSlot`] if the stored `bit_slot` is outside `[0, 63]`.
87pub fn version_bit_for(conn: &rusqlite::Connection, version_id: i64) -> Result<i64> {
88    let slot: i64 = conn
89        .query_row(
90            "SELECT bit_slot FROM kg_versions WHERE id = ?1",
91            [version_id],
92            |r| r.get(0),
93        )
94        .optional()?
95        .ok_or(Error::VersionNotFound(version_id))?;
96    // A corrupted/manual DB row could carry an out-of-range slot; surface it as
97    // an error instead of panicking on the shift inside `bit_from_slot`.
98    bit_from_slot(slot).ok_or(Error::CorruptBitSlot { version_id, slot })
99}
100
101/// Return every version whose `bit_slot` is set in `bits`, newest first.
102pub fn versions_for_bits(conn: &rusqlite::Connection, bits: i64) -> Result<Vec<Version>> {
103    let mut stmt = conn.prepare(
104        "SELECT id, name, branch, parent_id, description, created_at, is_merged \
105         FROM kg_versions WHERE (?1 & (1 << bit_slot)) != 0 ORDER BY created_at DESC",
106    )?;
107    let rows = stmt.query_map([bits], row_to_version)?;
108    let mut versions = Vec::new();
109    for row in rows {
110        versions.push(row?);
111    }
112    Ok(versions)
113}
114
115/// List versions, optionally filtered by branch.
116pub fn list_versions(conn: &rusqlite::Connection, branch: Option<&str>) -> Result<Vec<Version>> {
117    let query = if branch.is_some() {
118        "SELECT id, name, branch, parent_id, description, created_at, is_merged \
119         FROM kg_versions WHERE branch = ?1 ORDER BY created_at DESC"
120    } else {
121        "SELECT id, name, branch, parent_id, description, created_at, is_merged \
122         FROM kg_versions ORDER BY created_at DESC"
123    };
124
125    let mut stmt = conn.prepare(query)?;
126
127    let rows = if let Some(b) = branch {
128        stmt.query_map(params![b], row_to_version)?
129    } else {
130        stmt.query_map([], row_to_version)?
131    };
132
133    let mut versions = Vec::new();
134    for row in rows {
135        versions.push(row?);
136    }
137    Ok(versions)
138}
139
140/// Get a version by ID. Returns [`Error::VersionNotFound`] if no such version exists.
141pub fn get_version(conn: &rusqlite::Connection, version_id: i64) -> Result<Version> {
142    conn.query_row(
143        "SELECT id, name, branch, parent_id, description, created_at, is_merged \
144         FROM kg_versions WHERE id = ?1",
145        [version_id],
146        row_to_version,
147    )
148    .map_err(|e| match e {
149        rusqlite::Error::QueryReturnedNoRows => Error::VersionNotFound(version_id),
150        other => Error::from(other),
151    })
152}
153
154/// Check that a version exists. Returns error if not found.
155pub fn ensure_version_exists(conn: &rusqlite::Connection, version_id: i64) -> Result<()> {
156    let exists: bool = conn
157        .query_row(
158            "SELECT COUNT(*) > 0 FROM kg_versions WHERE id = ?1",
159            [version_id],
160            |r| r.get(0),
161        )
162        .map_err(Error::from)?;
163    if !exists {
164        return Err(Error::VersionNotFound(version_id));
165    }
166    Ok(())
167}
168
169/// Check that an entity exists. Returns error if not found.
170pub fn ensure_entity_exists(conn: &rusqlite::Connection, entity_id: i64) -> Result<()> {
171    let exists: bool = conn
172        .query_row(
173            "SELECT COUNT(*) > 0 FROM kg_entities WHERE id = ?1",
174            [entity_id],
175            |r| r.get(0),
176        )
177        .map_err(Error::from)?;
178    if !exists {
179        return Err(Error::EntityNotFound(entity_id));
180    }
181    Ok(())
182}
183
184/// Check that a relation exists. Returns error if not found.
185pub fn ensure_relation_exists(conn: &rusqlite::Connection, relation_id: i64) -> Result<()> {
186    let exists: bool = conn.query_row(
187        "SELECT COUNT(*) > 0 FROM kg_relations WHERE id = ?1",
188        [relation_id],
189        |r| r.get(0),
190    )?;
191    if !exists {
192        return Err(Error::RelationNotFound(relation_id));
193    }
194    Ok(())
195}
196
197fn row_to_version(row: &rusqlite::Row) -> rusqlite::Result<Version> {
198    Ok(Version {
199        id: row.get(0)?,
200        name: row.get(1)?,
201        branch: row.get(2)?,
202        parent_id: row.get(3)?,
203        description: row.get(4)?,
204        created_at: row.get(5)?,
205        is_merged: row.get::<_, i64>(6)? != 0,
206    })
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use rusqlite::Connection;
213
214    fn setup() -> Connection {
215        let conn = Connection::open_in_memory().unwrap();
216        crate::schema::create_schema(&conn).unwrap();
217        conn
218    }
219
220    #[test]
221    fn test_create_version() {
222        let conn = setup();
223        let id = create_version(&conn, "v1", "main", None, Some("first")).unwrap();
224        assert!(id > 0);
225
226        let v = get_version(&conn, id).unwrap();
227        assert_eq!(v.name, "v1");
228        assert_eq!(v.branch, "main");
229        assert_eq!(v.description.as_deref(), Some("first"));
230        assert!(!v.is_merged);
231    }
232
233    #[test]
234    fn test_duplicate_name_rejected() {
235        let conn = setup();
236        create_version(&conn, "v1", "main", None, None).unwrap();
237        let err = create_version(&conn, "v1", "main", None, None).unwrap_err();
238        assert!(matches!(err, Error::DuplicateVersionName(_)));
239    }
240
241    #[test]
242    fn test_delete_version() {
243        let conn = setup();
244        let id = create_version(&conn, "v1", "main", None, None).unwrap();
245        delete_version(&conn, id).unwrap();
246        assert!(get_version(&conn, id).is_err());
247    }
248
249    #[test]
250    fn test_delete_nonexistent() {
251        let conn = setup();
252        let err = delete_version(&conn, 999).unwrap_err();
253        assert!(matches!(err, Error::VersionNotFound(999)));
254    }
255
256    #[test]
257    fn test_list_all_versions() {
258        let conn = setup();
259        create_version(&conn, "v1", "main", None, None).unwrap();
260        create_version(&conn, "v2", "main", None, None).unwrap();
261        create_version(&conn, "v1-feat", "feature", None, None).unwrap();
262
263        let all = list_versions(&conn, None).unwrap();
264        assert_eq!(all.len(), 3);
265    }
266
267    #[test]
268    fn test_list_by_branch() {
269        let conn = setup();
270        create_version(&conn, "v1", "main", None, None).unwrap();
271        create_version(&conn, "v2", "main", None, None).unwrap();
272        create_version(&conn, "v1-feat", "feature", None, None).unwrap();
273
274        let main = list_versions(&conn, Some("main")).unwrap();
275        assert_eq!(main.len(), 2);
276        assert!(main.iter().all(|v| v.branch == "main"));
277    }
278
279    fn add_entity(conn: &Connection) -> i64 {
280        conn.execute(
281            "INSERT INTO kg_entities (entity_type, name) VALUES ('t', 'X')",
282            [],
283        )
284        .unwrap();
285        conn.last_insert_rowid()
286    }
287
288    fn validity(conn: &Connection, eid: i64) -> Option<i64> {
289        conn.query_row(
290            "SELECT validity FROM kg_entities WHERE id = ?1",
291            [eid],
292            |r| r.get(0),
293        )
294        .unwrap()
295    }
296
297    #[test]
298    fn test_first_version_uses_slot_zero() {
299        let conn = setup();
300        let id = create_version(&conn, "v1", "main", None, None).unwrap();
301        assert_eq!(version_bit_for(&conn, id).unwrap(), 1); // 1 << 0
302    }
303
304    #[test]
305    fn test_delete_clears_bits_and_collapses_to_null() {
306        let conn = setup();
307        let eid = add_entity(&conn);
308        let v1 = create_version(&conn, "v1", "main", None, None).unwrap();
309        crate::version::snapshot::version_add_entity(&conn, v1, eid).unwrap();
310        assert_eq!(validity(&conn, eid), Some(1));
311
312        delete_version(&conn, v1).unwrap();
313        // The only version is gone, so the entity returns to unversioned (NULL).
314        assert_eq!(validity(&conn, eid), None);
315    }
316
317    #[test]
318    fn test_slot_reclaimed_without_leaking_stale_bits() {
319        let conn = setup();
320        let eid = add_entity(&conn);
321        let v1 = create_version(&conn, "v1", "main", None, None).unwrap();
322        crate::version::snapshot::version_add_entity(&conn, v1, eid).unwrap();
323
324        // Deleting v1 frees slot 0; the next version must reuse it cleanly.
325        delete_version(&conn, v1).unwrap();
326        let v2 = create_version(&conn, "v2", "main", None, None).unwrap();
327        assert_eq!(version_bit_for(&conn, v2).unwrap(), 1); // slot 0 reused
328
329        // The entity must NOT have leaked into v2 via the recycled bit.
330        let in_v2 = crate::version::query::version_entities(&conn, v2, None, None).unwrap();
331        assert!(in_v2.is_empty());
332    }
333
334    #[test]
335    fn test_version_limit_exceeded() {
336        let conn = setup();
337        for i in 0..64 {
338            create_version(&conn, &format!("v{i}"), "main", None, None).unwrap();
339        }
340        let err = create_version(&conn, "v64", "main", None, None).unwrap_err();
341        assert!(matches!(err, Error::VersionLimitExceeded));
342    }
343
344    #[test]
345    fn test_version_bit_for_unknown_errors() {
346        let conn = setup();
347        let err = version_bit_for(&conn, 999).unwrap_err();
348        assert!(matches!(err, Error::VersionNotFound(999)));
349    }
350
351    #[test]
352    fn test_version_bit_for_corrupt_slot_errors() {
353        let conn = setup();
354        let id = create_version(&conn, "v1", "main", None, None).unwrap();
355
356        // Simulate a corrupted/manual row by bypassing the CHECK constraint so the
357        // out-of-range slot reaches the read path instead of being rejected at write.
358        conn.execute_batch("PRAGMA ignore_check_constraints = ON")
359            .unwrap();
360        conn.execute("UPDATE kg_versions SET bit_slot = 64 WHERE id = ?1", [id])
361            .unwrap();
362
363        let err = version_bit_for(&conn, id).unwrap_err();
364        assert!(
365            matches!(err, Error::CorruptBitSlot { version_id, slot } if version_id == id && slot == 64)
366        );
367    }
368}