solo-storage 0.5.0

Solo: SQLite + SQLCipher persistence layer
Documentation
// SPDX-License-Identifier: Apache-2.0

//! SQL schema migrations. Runs once at startup against the SQLCipher database
//! after `PRAGMA key` has been bound.
//!
//! Migrations are append-only — once a version has shipped to a user, never
//! change its SQL. Bug fixes go in subsequent migrations.
//!
//! The runner advances `schema_migrations` row-by-row inside a single
//! `BEGIN IMMEDIATE` transaction per migration, so a crash mid-migration
//! either applies the whole thing or none of it.

use rusqlite::{Connection, TransactionBehavior, params};
use solo_core::{Error, Result};

/// One migration step. The `up` SQL may contain multiple statements (it's
/// passed to `execute_batch`).
#[derive(Debug)]
struct Migration {
    version: u32,
    description: &'static str,
    up: &'static str,
}

/// All migrations, in order. Append new entries; never modify existing ones.
const MIGRATIONS: &[Migration] = &[
    Migration {
        version: 1,
        description: "initial schema (v0): episodes + triples + steward outputs + pending_index + FTS",
        up: include_str!("migrations/0001_initial.sql"),
    },
    Migration {
        version: 2,
        description: "triples.cluster_id FK + index for absorb→regen cascade",
        up: include_str!("migrations/0002_triples_cluster_id.sql"),
    },
];

/// Run every migration that hasn't been applied yet.
///
/// Idempotent — calling on an up-to-date database is a no-op + ~1ms read of
/// `schema_migrations`. Returns the highest version applied (after the run).
pub fn run_migrations(conn: &mut Connection) -> Result<u32> {
    // schema_migrations is created out-of-band so the first migration doesn't
    // have to bootstrap its own tracking row. CREATE IF NOT EXISTS makes this
    // safe to call before checking existing state.
    conn.execute_batch(
        "CREATE TABLE IF NOT EXISTS schema_migrations (
             version     INTEGER PRIMARY KEY,
             description TEXT    NOT NULL,
             applied_at  INTEGER NOT NULL
         );",
    )
    .map_err(|e| Error::storage(format!("create schema_migrations: {e}")))?;

    let current = current_version(conn)?;
    let mut highest = current;

    for m in MIGRATIONS {
        if m.version <= current {
            continue;
        }
        apply_one(conn, m)?;
        highest = m.version;
        tracing::info!(
            version = m.version,
            description = m.description,
            "applied migration"
        );
    }

    Ok(highest)
}

/// Highest applied version, or 0 if nothing has been applied yet.
pub fn current_version(conn: &Connection) -> Result<u32> {
    let v: Option<u32> = conn
        .query_row(
            "SELECT MAX(version) FROM schema_migrations",
            [],
            |row| row.get::<_, Option<u32>>(0),
        )
        .map_err(|e| Error::storage(format!("query current version: {e}")))?;
    Ok(v.unwrap_or(0))
}

fn apply_one(conn: &mut Connection, m: &Migration) -> Result<()> {
    let tx = conn
        .transaction_with_behavior(TransactionBehavior::Immediate)
        .map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for migration {}: {e}", m.version)))?;
    tx.execute_batch(m.up)
        .map_err(|e| Error::storage(format!("apply migration {}: {e}", m.version)))?;
    let now_ms: i64 = chrono::Utc::now().timestamp_millis();
    tx.execute(
        "INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
        params![m.version, m.description, now_ms],
    )
    .map_err(|e| Error::storage(format!("insert schema_migrations row {}: {e}", m.version)))?;
    tx.commit()
        .map_err(|e| Error::storage(format!("commit migration {}: {e}", m.version)))?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    fn open_in_memory() -> Connection {
        Connection::open_in_memory().expect("open in-memory DB")
    }

    #[test]
    fn empty_db_runs_all_migrations() {
        let mut conn = open_in_memory();
        let v = run_migrations(&mut conn).unwrap();
        assert_eq!(v, 2);
        assert_eq!(current_version(&conn).unwrap(), 2);
    }

    #[test]
    fn migration_0002_adds_triples_cluster_id_column() {
        let mut conn = open_in_memory();
        run_migrations(&mut conn).unwrap();
        // PRAGMA table_info gives (cid, name, type, notnull, dflt_value, pk)
        let cols: Vec<(String, String)> = conn
            .prepare("PRAGMA table_info('triples')")
            .unwrap()
            .query_map([], |row| {
                Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
            })
            .unwrap()
            .map(|r| r.unwrap())
            .collect();
        let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
        assert!(
            names.contains(&"cluster_id"),
            "triples missing cluster_id after 0002; got {names:?}"
        );
        // Index exists.
        let idx_exists: u32 = conn
            .query_row(
                "SELECT COUNT(*) FROM sqlite_master \
                 WHERE type='index' AND name='idx_triples_cluster'",
                [],
                |r| r.get(0),
            )
            .unwrap();
        assert_eq!(idx_exists, 1, "idx_triples_cluster missing after 0002");
    }

    #[test]
    fn migration_0002_cluster_delete_cascades_to_triples() {
        let mut conn = open_in_memory();
        run_migrations(&mut conn).unwrap();
        conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
        let now_ms: i64 = chrono::Utc::now().timestamp_millis();
        // Seed minimal cluster + triple.
        let cid = "00000000-0000-0000-0000-000000000077";
        let tid = "00000000-0000-0000-0000-000000000099";
        conn.execute(
            "INSERT INTO clusters (cluster_id, coherence, created_at_ms) VALUES (?, ?, ?)",
            params![cid, 0.9, now_ms],
        )
        .unwrap();
        conn.execute(
            "INSERT INTO triples (
                triple_id, subject_id, predicate, object_id, object_kind,
                valid_from_ms, valid_to_ms, confidence, provenance_json,
                created_at_ms, updated_at_ms, cluster_id
             ) VALUES (?, 'subj', 'pred', 'obj', 'literal', ?, NULL, 0.9, '{}', ?, ?, ?)",
            params![tid, now_ms, now_ms, now_ms, cid],
        )
        .unwrap();
        // Pre-condition: triple exists.
        let n_before: u32 = conn
            .query_row(
                "SELECT COUNT(*) FROM triples WHERE triple_id = ?",
                params![tid],
                |r| r.get(0),
            )
            .unwrap();
        assert_eq!(n_before, 1);
        // Drop the cluster — CASCADE should remove the triple.
        conn.execute("DELETE FROM clusters WHERE cluster_id = ?", params![cid])
            .unwrap();
        let n_after: u32 = conn
            .query_row(
                "SELECT COUNT(*) FROM triples WHERE triple_id = ?",
                params![tid],
                |r| r.get(0),
            )
            .unwrap();
        assert_eq!(n_after, 0, "CASCADE on clusters should drop the triple");
    }

    #[test]
    fn second_run_is_a_noop() {
        let mut conn = open_in_memory();
        let v1 = run_migrations(&mut conn).unwrap();
        let v2 = run_migrations(&mut conn).unwrap();
        assert_eq!(v1, v2);
        let count: u32 = conn
            .query_row(
                "SELECT COUNT(*) FROM schema_migrations WHERE version = 1",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(count, 1, "schema_migrations row must not be inserted twice");
    }

    #[test]
    fn all_canonical_tables_present() {
        let mut conn = open_in_memory();
        run_migrations(&mut conn).unwrap();
        let want = [
            "schema_migrations",
            "embedders",
            "episodes",
            "embeddings",
            "pending_index",
            "triples",
            "clusters",
            "cluster_episodes",
            "semantic_abstractions",
            "contradictions",
        ];
        for table in want {
            let exists: u32 = conn
                .query_row(
                    "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?",
                    params![table],
                    |row| row.get(0),
                )
                .unwrap();
            assert_eq!(exists, 1, "missing canonical table: {table}");
        }
    }

    #[test]
    fn fts_virtual_table_present() {
        let mut conn = open_in_memory();
        run_migrations(&mut conn).unwrap();
        let exists: u32 = conn
            .query_row(
                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='episodes_fts'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(exists, 1, "episodes_fts virtual table missing");
    }

    #[test]
    fn pending_index_schema_matches_adr() {
        let mut conn = open_in_memory();
        run_migrations(&mut conn).unwrap();
        // The pending_index schema is canonical per ADR-0003 §pending_index.
        // memory_id PK, embedding BLOB, embedding_dim INTEGER, enqueued_at INTEGER.
        let cols: Vec<(String, String)> = conn
            .prepare("PRAGMA table_info('pending_index')")
            .unwrap()
            .query_map([], |row| {
                Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
            })
            .unwrap()
            .map(|r| r.unwrap())
            .collect();
        let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
        for required in ["memory_id", "embedding", "embedding_dim", "enqueued_at"] {
            assert!(names.contains(&required), "pending_index missing column {required}");
        }
    }

    #[test]
    fn fts_trigger_keeps_episodes_content_indexed() {
        let mut conn = open_in_memory();
        run_migrations(&mut conn).unwrap();
        // Insert a minimal episode row.
        let now_ms: i64 = chrono::Utc::now().timestamp_millis();
        conn.execute(
            "INSERT INTO episodes (
                memory_id, ts_ms, source_type, content,
                encoding_context_json, confidence, strength, salience,
                tier, created_at_ms, updated_at_ms
             ) VALUES (?, ?, 'user_message', 'the rain in spain falls mainly on the plain',
                       '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
            params!["00000000-0000-0000-0000-000000000001", now_ms, now_ms, now_ms],
        )
        .unwrap();
        // FTS table should now have a row matching 'spain'.
        let hit: u32 = conn
            .query_row(
                "SELECT COUNT(*) FROM episodes_fts WHERE episodes_fts MATCH 'spain'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(hit, 1);
    }

    #[test]
    fn cascade_delete_removes_pending_index_row() {
        let mut conn = open_in_memory();
        run_migrations(&mut conn).unwrap();
        conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
        let now_ms: i64 = chrono::Utc::now().timestamp_millis();
        let mid = "00000000-0000-0000-0000-000000000042";
        conn.execute(
            "INSERT INTO episodes (
                memory_id, ts_ms, source_type, content,
                encoding_context_json, confidence, strength, salience,
                tier, created_at_ms, updated_at_ms
             ) VALUES (?, ?, 'user_message', 'hello', '{}', 1.0, 0.5, 0.5, 'hot', ?, ?)",
            params![mid, now_ms, now_ms, now_ms],
        )
        .unwrap();
        conn.execute(
            "INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
             VALUES (?, x'00', 1, ?)",
            params![mid, now_ms],
        )
        .unwrap();
        conn.execute("DELETE FROM episodes WHERE memory_id = ?", params![mid])
            .unwrap();
        let remaining: u32 = conn
            .query_row(
                "SELECT COUNT(*) FROM pending_index WHERE memory_id = ?",
                params![mid],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(remaining, 0, "CASCADE should have removed the pending row");
    }
}