crtx-store 0.1.0

SQLite persistence: migrations, repositories, transactions.
Documentation
//! SQLite migration application and tracking.

use rusqlite::{params, OptionalExtension};

use crate::{Pool, StoreResult};

const MIGRATIONS: &[(&str, &str)] = &[
    ("001_init", crate::INITIAL_MIGRATION_SQL),
    (
        "002_authority_timeline",
        include_str!("../migrations/002_authority_timeline.sql"),
    ),
    (
        "003_schema_v2_expand",
        include_str!("../migrations/003_schema_v2_expand.sql"),
    ),
    (
        "004_principle_promotion_policy_record",
        include_str!("../migrations/004_principle_promotion_policy_record.sql"),
    ),
    (
        "005_outcome_relation_scope",
        include_str!("../migrations/005_outcome_relation_scope.sql"),
    ),
    (
        "006_fts5_memories",
        include_str!("../migrations/006_fts5_memories.sql"),
    ),
    (
        "007_embeddings",
        include_str!("../migrations/007_embeddings.sql"),
    ),
    (
        "008_decay_jobs",
        include_str!("../migrations/008_decay_jobs.sql"),
    ),
    (
        "009_decay_supersessions",
        include_str!("../migrations/009_decay_supersessions.sql"),
    ),
    (
        "010_pending_mcp_commit",
        include_str!("../migrations/010_pending_mcp_commit.sql"),
    ),
];

const KNOWN_MIGRATION_NAMES: &[&str] = &[
    "001_init",
    "002_authority_timeline",
    "003_schema_v2_expand",
    "004_principle_promotion_policy_record",
    "005_outcome_relation_scope",
    "006_fts5_memories",
    "007_embeddings",
    "008_decay_jobs",
    "009_decay_supersessions",
    "010_pending_mcp_commit",
];

/// Names of migrations known to this binary, in application order.
#[must_use]
pub fn known_migration_names() -> &'static [&'static str] {
    KNOWN_MIGRATION_NAMES
}

/// Applies migrations that do not already exist in `_migrations`.
///
/// Returns the number of migrations applied during this call.
///
/// # Idempotency note for `003_schema_v2_expand`
///
/// Migration `003_schema_v2_expand` adds nullable v2 columns via `ALTER TABLE`.
/// SQLite does not support `ADD COLUMN IF NOT EXISTS`. Stores created with an
/// older binary may already have some or all of these columns (added by
/// `cortex_store::migrate_v2::apply_expand_backfill_skeleton`). To prevent a
/// "duplicate column name" error this function special-cases `003_schema_v2_expand`
/// and guards each `ALTER TABLE ADD COLUMN` with a `PRAGMA table_info` check.
pub fn apply_pending(pool: &Pool) -> StoreResult<usize> {
    pool.execute_batch(
        "PRAGMA foreign_keys = ON;
         PRAGMA journal_mode = WAL;
         CREATE TABLE IF NOT EXISTS _migrations (
             name TEXT PRIMARY KEY,
             applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
         );",
    )?;

    let mut applied = 0;
    for (name, sql) in MIGRATIONS {
        let existing: Option<String> = pool
            .query_row(
                "SELECT name FROM _migrations WHERE name = ?1;",
                params![name],
                |row| row.get(0),
            )
            .optional()?;

        if existing.is_some() {
            continue;
        }

        if *name == "003_schema_v2_expand" {
            apply_003_schema_v2_expand_guarded(pool)?;
        } else {
            pool.execute_batch(sql)?;
        }
        pool.execute("INSERT INTO _migrations (name) VALUES (?1);", params![name])?;
        applied += 1;
    }

    Ok(applied)
}

/// Apply `003_schema_v2_expand` with per-column idempotency guards.
///
/// `ALTER TABLE … ADD COLUMN` fails with "duplicate column name" if the column
/// already exists. SQLite has no `ADD COLUMN IF NOT EXISTS` syntax. This
/// function guards each column addition with a `PRAGMA table_info` check so
/// that stores where `apply_expand_backfill_skeleton` ran before this migration
/// was bundled into `apply_pending` are not broken.
///
/// The two `CREATE TABLE` statements at the end of the migration use
/// `CREATE TABLE IF NOT EXISTS` so they are safe to run unconditionally.
fn apply_003_schema_v2_expand_guarded(pool: &Pool) -> StoreResult<()> {
    add_column_if_missing(
        pool,
        "events",
        "source_attestation_json",
        "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
         CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
    )?;
    add_column_if_missing(
        pool,
        "episodes",
        "summary_spans_json",
        "ALTER TABLE episodes ADD COLUMN summary_spans_json TEXT NULL \
         CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
    )?;
    add_column_if_missing(
        pool,
        "memories",
        "summary_spans_json",
        "ALTER TABLE memories ADD COLUMN summary_spans_json TEXT NULL \
         CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
    )?;
    add_column_if_missing(
        pool,
        "memories",
        "cross_session_use_count",
        "ALTER TABLE memories ADD COLUMN cross_session_use_count INTEGER NULL \
         CHECK (cross_session_use_count IS NULL OR cross_session_use_count >= 0);",
    )?;
    add_column_if_missing(
        pool,
        "memories",
        "first_used_at",
        "ALTER TABLE memories ADD COLUMN first_used_at TEXT NULL;",
    )?;
    add_column_if_missing(
        pool,
        "memories",
        "last_cross_session_use_at",
        "ALTER TABLE memories ADD COLUMN last_cross_session_use_at TEXT NULL;",
    )?;
    add_column_if_missing(
        pool,
        "memories",
        "last_validation_at",
        "ALTER TABLE memories ADD COLUMN last_validation_at TEXT NULL;",
    )?;
    add_column_if_missing(
        pool,
        "memories",
        "validation_epoch",
        "ALTER TABLE memories ADD COLUMN validation_epoch INTEGER NULL \
         CHECK (validation_epoch IS NULL OR validation_epoch >= 0);",
    )?;
    add_column_if_missing(
        pool,
        "memories",
        "blessed_until",
        "ALTER TABLE memories ADD COLUMN blessed_until TEXT NULL;",
    )?;
    add_column_if_missing(
        pool,
        "context_packs",
        "consumer_advisory_json",
        "ALTER TABLE context_packs ADD COLUMN consumer_advisory_json TEXT NULL \
         CHECK (consumer_advisory_json IS NULL OR json_valid(consumer_advisory_json));",
    )?;

    // CREATE TABLE statements are safe to apply without guards only when the
    // table genuinely does not exist. Use CREATE TABLE IF NOT EXISTS so that
    // stores where apply_expand_backfill_skeleton already created these tables
    // do not error.
    pool.execute_batch(
        "CREATE TABLE IF NOT EXISTS memory_session_uses (
            memory_id TEXT NOT NULL REFERENCES memories(id),
            session_id TEXT NOT NULL,
            first_used_at TEXT NOT NULL,
            last_used_at TEXT NOT NULL,
            use_count INTEGER NOT NULL CHECK (use_count >= 0),
            PRIMARY KEY (memory_id, session_id)
        );
        CREATE TABLE IF NOT EXISTS outcome_memory_relations (
            outcome_ref TEXT NOT NULL,
            memory_id TEXT NOT NULL REFERENCES memories(id),
            relation TEXT NOT NULL,
            recorded_at TEXT NOT NULL,
            source_event_id TEXT NULL REFERENCES events(id),
            PRIMARY KEY (outcome_ref, memory_id, relation)
        );",
    )?;

    Ok(())
}

fn column_exists(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
    let sql = format!("PRAGMA table_info({table});");
    let mut stmt = pool.prepare(&sql)?;
    let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
    for found in columns {
        if found? == column {
            return Ok(true);
        }
    }
    Ok(false)
}

fn add_column_if_missing(pool: &Pool, table: &str, column: &str, ddl: &str) -> StoreResult<()> {
    if column_exists(pool, table, column)? {
        return Ok(());
    }
    pool.execute_batch(ddl)?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn known_migration_names_match_migration_bundle() {
        let bundled_names = MIGRATIONS.iter().map(|(name, _)| *name).collect::<Vec<_>>();

        assert_eq!(known_migration_names(), bundled_names.as_slice());
    }

    /// Reproduces the duplicate-column bug: if a store already has
    /// `source_attestation_json` on `events` (added by
    /// `apply_expand_backfill_skeleton` before `003_schema_v2_expand` was
    /// bundled), `apply_pending` must not error with "duplicate column name".
    #[test]
    fn apply_pending_is_idempotent_when_003_columns_pre_exist() {
        use rusqlite::Connection;

        let pool = Connection::open_in_memory().expect("open in-memory sqlite");

        // Bootstrap the schema manually as 001_init does.
        pool.execute_batch(crate::INITIAL_MIGRATION_SQL)
            .expect("init schema");

        // Simulate apply_expand_backfill_skeleton: add the columns that
        // 003_schema_v2_expand would normally add, without recording the migration.
        pool.execute_batch(
            "CREATE TABLE IF NOT EXISTS _migrations (
                name TEXT PRIMARY KEY,
                applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
             );
             INSERT OR IGNORE INTO _migrations (name) VALUES ('001_init');",
        )
        .expect("bootstrap _migrations");

        // Pre-add the column that triggers the duplicate-column error.
        pool.execute_batch(
            "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
             CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
        )
        .expect("pre-add source_attestation_json");

        // apply_pending must not error even though source_attestation_json exists.
        apply_pending(&pool).expect("apply_pending must be idempotent when columns pre-exist");

        // All migrations should now be recorded.
        let names: Vec<String> = pool
            .prepare("SELECT name FROM _migrations ORDER BY name;")
            .unwrap()
            .query_map([], |row| row.get(0))
            .unwrap()
            .collect::<Result<_, _>>()
            .unwrap();
        assert!(
            names.contains(&"003_schema_v2_expand".to_string()),
            "003_schema_v2_expand must be recorded after guarded apply"
        );
    }

    /// Running apply_pending a second time on a fully-migrated store must be a
    /// no-op (returns 0 applied).
    #[test]
    fn apply_pending_second_run_is_noop() {
        use rusqlite::Connection;

        let pool = Connection::open_in_memory().expect("open in-memory sqlite");
        let first = apply_pending(&pool).expect("first apply_pending");
        assert!(first > 0, "first apply should apply migrations");
        let second = apply_pending(&pool).expect("second apply_pending");
        assert_eq!(second, 0, "second apply_pending must be a no-op");
    }
}