use rusqlite::{Connection, TransactionBehavior, params};
use solo_core::{Error, Result};
#[derive(Debug)]
struct Migration {
version: u32,
description: &'static str,
up: &'static str,
}
const MIGRATIONS: &[Migration] = &[
Migration {
version: 1,
description: "initial schema (v0): episodes + triples + steward outputs + pending_index + FTS",
up: include_str!("migrations/0001_initial.sql"),
},
Migration {
version: 2,
description: "triples.cluster_id FK + index for absorb→regen cascade",
up: include_str!("migrations/0002_triples_cluster_id.sql"),
},
];
pub fn run_migrations(conn: &mut Connection) -> Result<u32> {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at INTEGER NOT NULL
);",
)
.map_err(|e| Error::storage(format!("create schema_migrations: {e}")))?;
let current = current_version(conn)?;
let mut highest = current;
for m in MIGRATIONS {
if m.version <= current {
continue;
}
apply_one(conn, m)?;
highest = m.version;
tracing::info!(
version = m.version,
description = m.description,
"applied migration"
);
}
Ok(highest)
}
pub fn current_version(conn: &Connection) -> Result<u32> {
let v: Option<u32> = conn
.query_row(
"SELECT MAX(version) FROM schema_migrations",
[],
|row| row.get::<_, Option<u32>>(0),
)
.map_err(|e| Error::storage(format!("query current version: {e}")))?;
Ok(v.unwrap_or(0))
}
fn apply_one(conn: &mut Connection, m: &Migration) -> Result<()> {
let tx = conn
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for migration {}: {e}", m.version)))?;
tx.execute_batch(m.up)
.map_err(|e| Error::storage(format!("apply migration {}: {e}", m.version)))?;
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
tx.execute(
"INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
params![m.version, m.description, now_ms],
)
.map_err(|e| Error::storage(format!("insert schema_migrations row {}: {e}", m.version)))?;
tx.commit()
.map_err(|e| Error::storage(format!("commit migration {}: {e}", m.version)))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn open_in_memory() -> Connection {
Connection::open_in_memory().expect("open in-memory DB")
}
#[test]
fn empty_db_runs_all_migrations() {
let mut conn = open_in_memory();
let v = run_migrations(&mut conn).unwrap();
assert_eq!(v, 2);
assert_eq!(current_version(&conn).unwrap(), 2);
}
#[test]
fn migration_0002_adds_triples_cluster_id_column() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let cols: Vec<(String, String)> = conn
.prepare("PRAGMA table_info('triples')")
.unwrap()
.query_map([], |row| {
Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect();
let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
assert!(
names.contains(&"cluster_id"),
"triples missing cluster_id after 0002; got {names:?}"
);
let idx_exists: u32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='index' AND name='idx_triples_cluster'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(idx_exists, 1, "idx_triples_cluster missing after 0002");
}
#[test]
fn migration_0002_cluster_delete_cascades_to_triples() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let cid = "00000000-0000-0000-0000-000000000077";
let tid = "00000000-0000-0000-0000-000000000099";
conn.execute(
"INSERT INTO clusters (cluster_id, coherence, created_at_ms) VALUES (?, ?, ?)",
params![cid, 0.9, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO triples (
triple_id, subject_id, predicate, object_id, object_kind,
valid_from_ms, valid_to_ms, confidence, provenance_json,
created_at_ms, updated_at_ms, cluster_id
) VALUES (?, 'subj', 'pred', 'obj', 'literal', ?, NULL, 0.9, '{}', ?, ?, ?)",
params![tid, now_ms, now_ms, now_ms, cid],
)
.unwrap();
let n_before: u32 = conn
.query_row(
"SELECT COUNT(*) FROM triples WHERE triple_id = ?",
params![tid],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_before, 1);
conn.execute("DELETE FROM clusters WHERE cluster_id = ?", params![cid])
.unwrap();
let n_after: u32 = conn
.query_row(
"SELECT COUNT(*) FROM triples WHERE triple_id = ?",
params![tid],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_after, 0, "CASCADE on clusters should drop the triple");
}
#[test]
fn second_run_is_a_noop() {
let mut conn = open_in_memory();
let v1 = run_migrations(&mut conn).unwrap();
let v2 = run_migrations(&mut conn).unwrap();
assert_eq!(v1, v2);
let count: u32 = conn
.query_row(
"SELECT COUNT(*) FROM schema_migrations WHERE version = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 1, "schema_migrations row must not be inserted twice");
}
#[test]
fn all_canonical_tables_present() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let want = [
"schema_migrations",
"embedders",
"episodes",
"embeddings",
"pending_index",
"triples",
"clusters",
"cluster_episodes",
"semantic_abstractions",
"contradictions",
];
for table in want {
let exists: u32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?",
params![table],
|row| row.get(0),
)
.unwrap();
assert_eq!(exists, 1, "missing canonical table: {table}");
}
}
#[test]
fn fts_virtual_table_present() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let exists: u32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='episodes_fts'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(exists, 1, "episodes_fts virtual table missing");
}
#[test]
fn pending_index_schema_matches_adr() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let cols: Vec<(String, String)> = conn
.prepare("PRAGMA table_info('pending_index')")
.unwrap()
.query_map([], |row| {
Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect();
let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
for required in ["memory_id", "embedding", "embedding_dim", "enqueued_at"] {
assert!(names.contains(&required), "pending_index missing column {required}");
}
}
#[test]
fn fts_trigger_keeps_episodes_content_indexed() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms
) VALUES (?, ?, 'user_message', 'the rain in spain falls mainly on the plain',
'{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params!["00000000-0000-0000-0000-000000000001", now_ms, now_ms, now_ms],
)
.unwrap();
let hit: u32 = conn
.query_row(
"SELECT COUNT(*) FROM episodes_fts WHERE episodes_fts MATCH 'spain'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(hit, 1);
}
#[test]
fn cascade_delete_removes_pending_index_row() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let mid = "00000000-0000-0000-0000-000000000042";
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms
) VALUES (?, ?, 'user_message', 'hello', '{}', 1.0, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms, now_ms, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
VALUES (?, x'00', 1, ?)",
params![mid, now_ms],
)
.unwrap();
conn.execute("DELETE FROM episodes WHERE memory_id = ?", params![mid])
.unwrap();
let remaining: u32 = conn
.query_row(
"SELECT COUNT(*) FROM pending_index WHERE memory_id = ?",
params![mid],
|row| row.get(0),
)
.unwrap();
assert_eq!(remaining, 0, "CASCADE should have removed the pending row");
}
}