use rusqlite::{params, OptionalExtension};
use crate::{Pool, StoreResult};
const MIGRATIONS: &[(&str, &str)] = &[
("001_init", crate::INITIAL_MIGRATION_SQL),
(
"002_authority_timeline",
include_str!("../migrations/002_authority_timeline.sql"),
),
(
"003_schema_v2_expand",
include_str!("../migrations/003_schema_v2_expand.sql"),
),
(
"004_principle_promotion_policy_record",
include_str!("../migrations/004_principle_promotion_policy_record.sql"),
),
(
"005_outcome_relation_scope",
include_str!("../migrations/005_outcome_relation_scope.sql"),
),
(
"006_fts5_memories",
include_str!("../migrations/006_fts5_memories.sql"),
),
(
"007_embeddings",
include_str!("../migrations/007_embeddings.sql"),
),
(
"008_decay_jobs",
include_str!("../migrations/008_decay_jobs.sql"),
),
(
"009_decay_supersessions",
include_str!("../migrations/009_decay_supersessions.sql"),
),
(
"010_pending_mcp_commit",
include_str!("../migrations/010_pending_mcp_commit.sql"),
),
];
const KNOWN_MIGRATION_NAMES: &[&str] = &[
"001_init",
"002_authority_timeline",
"003_schema_v2_expand",
"004_principle_promotion_policy_record",
"005_outcome_relation_scope",
"006_fts5_memories",
"007_embeddings",
"008_decay_jobs",
"009_decay_supersessions",
"010_pending_mcp_commit",
];
#[must_use]
pub fn known_migration_names() -> &'static [&'static str] {
KNOWN_MIGRATION_NAMES
}
pub fn apply_pending(pool: &Pool) -> StoreResult<usize> {
pool.execute_batch(
"PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS _migrations (
name TEXT PRIMARY KEY,
applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);",
)?;
let mut applied = 0;
for (name, sql) in MIGRATIONS {
let existing: Option<String> = pool
.query_row(
"SELECT name FROM _migrations WHERE name = ?1;",
params![name],
|row| row.get(0),
)
.optional()?;
if existing.is_some() {
continue;
}
if *name == "003_schema_v2_expand" {
apply_003_schema_v2_expand_guarded(pool)?;
} else {
pool.execute_batch(sql)?;
}
pool.execute("INSERT INTO _migrations (name) VALUES (?1);", params![name])?;
applied += 1;
}
Ok(applied)
}
fn apply_003_schema_v2_expand_guarded(pool: &Pool) -> StoreResult<()> {
add_column_if_missing(
pool,
"events",
"source_attestation_json",
"ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
)?;
add_column_if_missing(
pool,
"episodes",
"summary_spans_json",
"ALTER TABLE episodes ADD COLUMN summary_spans_json TEXT NULL \
CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
)?;
add_column_if_missing(
pool,
"memories",
"summary_spans_json",
"ALTER TABLE memories ADD COLUMN summary_spans_json TEXT NULL \
CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
)?;
add_column_if_missing(
pool,
"memories",
"cross_session_use_count",
"ALTER TABLE memories ADD COLUMN cross_session_use_count INTEGER NULL \
CHECK (cross_session_use_count IS NULL OR cross_session_use_count >= 0);",
)?;
add_column_if_missing(
pool,
"memories",
"first_used_at",
"ALTER TABLE memories ADD COLUMN first_used_at TEXT NULL;",
)?;
add_column_if_missing(
pool,
"memories",
"last_cross_session_use_at",
"ALTER TABLE memories ADD COLUMN last_cross_session_use_at TEXT NULL;",
)?;
add_column_if_missing(
pool,
"memories",
"last_validation_at",
"ALTER TABLE memories ADD COLUMN last_validation_at TEXT NULL;",
)?;
add_column_if_missing(
pool,
"memories",
"validation_epoch",
"ALTER TABLE memories ADD COLUMN validation_epoch INTEGER NULL \
CHECK (validation_epoch IS NULL OR validation_epoch >= 0);",
)?;
add_column_if_missing(
pool,
"memories",
"blessed_until",
"ALTER TABLE memories ADD COLUMN blessed_until TEXT NULL;",
)?;
add_column_if_missing(
pool,
"context_packs",
"consumer_advisory_json",
"ALTER TABLE context_packs ADD COLUMN consumer_advisory_json TEXT NULL \
CHECK (consumer_advisory_json IS NULL OR json_valid(consumer_advisory_json));",
)?;
pool.execute_batch(
"CREATE TABLE IF NOT EXISTS memory_session_uses (
memory_id TEXT NOT NULL REFERENCES memories(id),
session_id TEXT NOT NULL,
first_used_at TEXT NOT NULL,
last_used_at TEXT NOT NULL,
use_count INTEGER NOT NULL CHECK (use_count >= 0),
PRIMARY KEY (memory_id, session_id)
);
CREATE TABLE IF NOT EXISTS outcome_memory_relations (
outcome_ref TEXT NOT NULL,
memory_id TEXT NOT NULL REFERENCES memories(id),
relation TEXT NOT NULL,
recorded_at TEXT NOT NULL,
source_event_id TEXT NULL REFERENCES events(id),
PRIMARY KEY (outcome_ref, memory_id, relation)
);",
)?;
Ok(())
}
fn column_exists(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
let sql = format!("PRAGMA table_info({table});");
let mut stmt = pool.prepare(&sql)?;
let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
for found in columns {
if found? == column {
return Ok(true);
}
}
Ok(false)
}
fn add_column_if_missing(pool: &Pool, table: &str, column: &str, ddl: &str) -> StoreResult<()> {
if column_exists(pool, table, column)? {
return Ok(());
}
pool.execute_batch(ddl)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn known_migration_names_match_migration_bundle() {
let bundled_names = MIGRATIONS.iter().map(|(name, _)| *name).collect::<Vec<_>>();
assert_eq!(known_migration_names(), bundled_names.as_slice());
}
#[test]
fn apply_pending_is_idempotent_when_003_columns_pre_exist() {
use rusqlite::Connection;
let pool = Connection::open_in_memory().expect("open in-memory sqlite");
pool.execute_batch(crate::INITIAL_MIGRATION_SQL)
.expect("init schema");
pool.execute_batch(
"CREATE TABLE IF NOT EXISTS _migrations (
name TEXT PRIMARY KEY,
applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
INSERT OR IGNORE INTO _migrations (name) VALUES ('001_init');",
)
.expect("bootstrap _migrations");
pool.execute_batch(
"ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
)
.expect("pre-add source_attestation_json");
apply_pending(&pool).expect("apply_pending must be idempotent when columns pre-exist");
let names: Vec<String> = pool
.prepare("SELECT name FROM _migrations ORDER BY name;")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<Result<_, _>>()
.unwrap();
assert!(
names.contains(&"003_schema_v2_expand".to_string()),
"003_schema_v2_expand must be recorded after guarded apply"
);
}
#[test]
fn apply_pending_second_run_is_noop() {
use rusqlite::Connection;
let pool = Connection::open_in_memory().expect("open in-memory sqlite");
let first = apply_pending(&pool).expect("first apply_pending");
assert!(first > 0, "first apply should apply migrations");
let second = apply_pending(&pool).expect("second apply_pending");
assert_eq!(second, 0, "second apply_pending must be a no-op");
}
}