brainos-storage 0.3.0

SQLite and HNSW vector storage layer for Brain OS
Documentation
use tracing::info;

use super::{SqliteError, SqlitePool};

impl SqlitePool {
    /// Run all schema migrations.
    pub(crate) fn migrate(&self) -> Result<(), SqliteError> {
        self.with_conn(|conn| {
            conn.execute_batch(
                "CREATE TABLE IF NOT EXISTS _migrations (
                    version INTEGER PRIMARY KEY,
                    name TEXT NOT NULL,
                    applied_at TEXT NOT NULL DEFAULT (datetime('now'))
                );",
            )?;

            let current_version: i64 = conn
                .query_row(
                    "SELECT COALESCE(MAX(version), 0) FROM _migrations",
                    [],
                    |row| row.get(0),
                )
                .unwrap_or(0);

            let migrations = Self::migrations();

            for (version, name, sql) in &migrations {
                if *version > current_version {
                    info!("Running migration {version}: {name}");
                    conn.execute_batch(sql).map_err(|e| {
                        SqliteError::Migration(format!("Migration {version} ({name}) failed: {e}"))
                    })?;

                    conn.execute(
                        "INSERT INTO _migrations (version, name) VALUES (?1, ?2)",
                        rusqlite::params![version, name],
                    )?;
                }
            }

            if current_version < migrations.last().map_or(0, |m| m.0) {
                info!(
                    "Migrations complete (v{current_version} → v{})",
                    migrations.last().expect("BUG: migrations list is empty").0
                );
            }

            Ok(())
        })
    }

    /// All schema migrations in order.
    fn migrations() -> Vec<(i64, &'static str, &'static str)> {
        vec![
            (
                1,
                "create_sessions",
                "
                CREATE TABLE IF NOT EXISTS sessions (
                    id TEXT PRIMARY KEY,
                    started_at TEXT NOT NULL DEFAULT (datetime('now')),
                    ended_at TEXT,
                    channel TEXT NOT NULL DEFAULT 'cli',
                    metadata TEXT
                );
            ",
            ),
            (
                2,
                "create_episodes",
                "
                CREATE TABLE IF NOT EXISTS episodes (
                    id TEXT PRIMARY KEY,
                    session_id TEXT NOT NULL REFERENCES sessions(id),
                    role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system')),
                    content TEXT NOT NULL,
                    timestamp TEXT NOT NULL DEFAULT (datetime('now')),
                    importance REAL NOT NULL DEFAULT 0.5,
                    decay_rate REAL NOT NULL DEFAULT 0.1,
                    reinforcement_count INTEGER NOT NULL DEFAULT 0,
                    last_accessed TEXT,
                    metadata TEXT
                );

                CREATE INDEX IF NOT EXISTS idx_episodes_session ON episodes(session_id);
                CREATE INDEX IF NOT EXISTS idx_episodes_timestamp ON episodes(timestamp DESC);
                CREATE INDEX IF NOT EXISTS idx_episodes_importance ON episodes(importance DESC);
            ",
            ),
            (
                3,
                "create_episodes_fts",
                "
                CREATE VIRTUAL TABLE IF NOT EXISTS episodes_fts USING fts5(
                    content,
                    content_rowid='rowid',
                    tokenize='porter unicode61'
                );
            ",
            ),
            (
                4,
                "create_semantic_facts",
                "
                CREATE TABLE IF NOT EXISTS semantic_facts (
                    id TEXT PRIMARY KEY,
                    category TEXT NOT NULL,
                    subject TEXT NOT NULL,
                    predicate TEXT NOT NULL,
                    object TEXT NOT NULL,
                    confidence REAL NOT NULL DEFAULT 1.0,
                    source_episode_id TEXT REFERENCES episodes(id) ON DELETE SET NULL,
                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
                    updated_at TEXT NOT NULL DEFAULT (datetime('now')),
                    superseded_by TEXT REFERENCES semantic_facts(id) ON DELETE SET NULL
                );

                CREATE INDEX IF NOT EXISTS idx_facts_category ON semantic_facts(category);
                CREATE INDEX IF NOT EXISTS idx_facts_subject ON semantic_facts(subject);
            ",
            ),
            (
                5,
                "create_user_profile",
                "
                CREATE TABLE IF NOT EXISTS user_profile (
                    key TEXT PRIMARY KEY,
                    value TEXT NOT NULL,
                    source TEXT,
                    updated_at TEXT NOT NULL DEFAULT (datetime('now'))
                );
            ",
            ),
            (
                6,
                "create_procedures",
                "
                CREATE TABLE IF NOT EXISTS procedures (
                    id              TEXT PRIMARY KEY,
                    trigger_pattern TEXT NOT NULL,
                    steps_json      TEXT NOT NULL DEFAULT '[]',
                    created_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
                    updated_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
                    use_count       INTEGER NOT NULL DEFAULT 0
                );
                CREATE INDEX IF NOT EXISTS idx_procedures_trigger
                    ON procedures(trigger_pattern);
            ",
            ),
            (
                7,
                "create_audit_log",
                "
                CREATE TABLE IF NOT EXISTS audit_log (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL DEFAULT (datetime('now')),
                    action TEXT NOT NULL,
                    details TEXT,
                    prev_hash TEXT,
                    hash TEXT NOT NULL
                );

                CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC);
            ",
            ),
            (
                10,
                "add_namespace_to_semantic_facts",
                "
                ALTER TABLE semantic_facts ADD COLUMN namespace TEXT NOT NULL DEFAULT 'personal';
                CREATE INDEX IF NOT EXISTS idx_facts_namespace ON semantic_facts(namespace);
            ",
            ),
            (
                11,
                "add_namespace_to_episodes",
                "
                ALTER TABLE episodes ADD COLUMN namespace TEXT NOT NULL DEFAULT 'personal';
                CREATE INDEX IF NOT EXISTS idx_episodes_namespace ON episodes(namespace);
            ",
            ),
            (
                12,
                "rebuild_procedures_table",
                "
                DROP TABLE IF EXISTS procedures;
                CREATE TABLE IF NOT EXISTS procedures (
                    id              TEXT PRIMARY KEY,
                    trigger_pattern TEXT NOT NULL,
                    steps_json      TEXT NOT NULL DEFAULT '[]',
                    created_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
                    updated_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
                    use_count       INTEGER NOT NULL DEFAULT 0
                );
                CREATE INDEX IF NOT EXISTS idx_procedures_trigger
                    ON procedures(trigger_pattern);
            ",
            ),
            (
                13,
                "create_episode_promotions",
                "
                CREATE TABLE IF NOT EXISTS episode_promotions (
                    episode_id TEXT PRIMARY KEY REFERENCES episodes(id) ON DELETE CASCADE,
                    fact_id TEXT NOT NULL REFERENCES semantic_facts(id) ON DELETE CASCADE,
                    promoted_at TEXT NOT NULL DEFAULT (datetime('now'))
                );
            ",
            ),
            (
                14,
                "create_scheduled_intents",
                "
                CREATE TABLE IF NOT EXISTS scheduled_intents (
                    id TEXT PRIMARY KEY,
                    description TEXT NOT NULL,
                    cron TEXT,
                    namespace TEXT NOT NULL DEFAULT 'personal',
                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
                    status TEXT NOT NULL DEFAULT 'scheduled',
                    metadata TEXT
                );
                CREATE INDEX IF NOT EXISTS idx_scheduled_intents_namespace
                    ON scheduled_intents(namespace);
                CREATE INDEX IF NOT EXISTS idx_scheduled_intents_status
                    ON scheduled_intents(status);
            ",
            ),
            (
                15,
                "create_notification_outbox",
                "
                CREATE TABLE IF NOT EXISTS notification_outbox (
                    id TEXT PRIMARY KEY,
                    content TEXT NOT NULL,
                    priority INTEGER NOT NULL DEFAULT 1,
                    triggered_by TEXT NOT NULL DEFAULT '',
                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
                    delivered_at TEXT,
                    channel TEXT
                );
                CREATE INDEX IF NOT EXISTS idx_outbox_pending
                    ON notification_outbox(delivered_at, priority, created_at)
                    WHERE delivered_at IS NULL;
            ",
            ),
            (
                16,
                "add_agent_column",
                "
                ALTER TABLE episodes ADD COLUMN agent TEXT;
                ALTER TABLE semantic_facts ADD COLUMN agent TEXT;
            ",
            ),
            (
                17,
                "fix_orphaned_facts",
                "
                -- Clear orphaned source_episode_id references
                UPDATE semantic_facts SET source_episode_id = NULL
                WHERE source_episode_id NOT IN (SELECT id FROM episodes);
                -- Clear orphaned superseded_by references
                UPDATE semantic_facts SET superseded_by = NULL
                WHERE superseded_by NOT IN (SELECT id FROM semantic_facts);
            ",
            ),
            (
                18,
                "add_performance_indexes",
                "
                -- Composite index for open-loop and habit detection
                -- (filters by role = 'user' AND timestamp >= ?)
                CREATE INDEX IF NOT EXISTS idx_episodes_role_timestamp
                    ON episodes(role, timestamp);

                -- Partial index for active (non-superseded) facts
                -- (count, list_all, list_by_namespace all filter superseded_by IS NULL)
                CREATE INDEX IF NOT EXISTS idx_facts_active
                    ON semantic_facts(superseded_by)
                    WHERE superseded_by IS NULL;
            ",
            ),
        ]
    }

    /// Get the current schema version.
    pub fn schema_version(&self) -> Result<i64, SqliteError> {
        self.with_conn(|conn| {
            let version: i64 = conn
                .query_row(
                    "SELECT COALESCE(MAX(version), 0) FROM _migrations",
                    [],
                    |row| row.get(0),
                )
                .unwrap_or(0);
            Ok(version)
        })
    }
}