use rusqlite::{Connection, TransactionBehavior, params};
use solo_core::{Error, Result};
#[derive(Debug)]
struct Migration {
version: u32,
description: &'static str,
up: &'static str,
}
const MIGRATIONS: &[Migration] = &[
Migration {
version: 1,
description: "initial schema (v0): episodes + triples + steward outputs + pending_index + FTS",
up: include_str!("migrations/0001_initial.sql"),
},
Migration {
version: 2,
description: "triples.cluster_id FK + index for absorb→regen cascade",
up: include_str!("migrations/0002_triples_cluster_id.sql"),
},
Migration {
version: 3,
description: "documents + document_chunks + chunk_embeddings + pending_index.kind discriminator",
up: include_str!("migrations/0003_documents.sql"),
},
];
pub fn run_migrations(conn: &mut Connection) -> Result<u32> {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at INTEGER NOT NULL
);",
)
.map_err(|e| Error::storage(format!("create schema_migrations: {e}")))?;
let current = current_version(conn)?;
let mut highest = current;
for m in MIGRATIONS {
if m.version <= current {
continue;
}
apply_one(conn, m)?;
highest = m.version;
tracing::info!(
version = m.version,
description = m.description,
"applied migration"
);
}
Ok(highest)
}
pub fn current_version(conn: &Connection) -> Result<u32> {
let v: Option<u32> = conn
.query_row(
"SELECT MAX(version) FROM schema_migrations",
[],
|row| row.get::<_, Option<u32>>(0),
)
.map_err(|e| Error::storage(format!("query current version: {e}")))?;
Ok(v.unwrap_or(0))
}
fn apply_one(conn: &mut Connection, m: &Migration) -> Result<()> {
let tx = conn
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for migration {}: {e}", m.version)))?;
tx.execute_batch(m.up)
.map_err(|e| Error::storage(format!("apply migration {}: {e}", m.version)))?;
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
tx.execute(
"INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
params![m.version, m.description, now_ms],
)
.map_err(|e| Error::storage(format!("insert schema_migrations row {}: {e}", m.version)))?;
tx.commit()
.map_err(|e| Error::storage(format!("commit migration {}: {e}", m.version)))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn open_in_memory() -> Connection {
Connection::open_in_memory().expect("open in-memory DB")
}
#[test]
fn empty_db_runs_all_migrations() {
let mut conn = open_in_memory();
let v = run_migrations(&mut conn).unwrap();
assert_eq!(v, 3);
assert_eq!(current_version(&conn).unwrap(), 3);
}
#[test]
fn migration_0002_adds_triples_cluster_id_column() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let cols: Vec<(String, String)> = conn
.prepare("PRAGMA table_info('triples')")
.unwrap()
.query_map([], |row| {
Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect();
let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
assert!(
names.contains(&"cluster_id"),
"triples missing cluster_id after 0002; got {names:?}"
);
let idx_exists: u32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='index' AND name='idx_triples_cluster'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(idx_exists, 1, "idx_triples_cluster missing after 0002");
}
#[test]
fn migration_0002_cluster_delete_cascades_to_triples() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let cid = "00000000-0000-0000-0000-000000000077";
let tid = "00000000-0000-0000-0000-000000000099";
conn.execute(
"INSERT INTO clusters (cluster_id, coherence, created_at_ms) VALUES (?, ?, ?)",
params![cid, 0.9, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO triples (
triple_id, subject_id, predicate, object_id, object_kind,
valid_from_ms, valid_to_ms, confidence, provenance_json,
created_at_ms, updated_at_ms, cluster_id
) VALUES (?, 'subj', 'pred', 'obj', 'literal', ?, NULL, 0.9, '{}', ?, ?, ?)",
params![tid, now_ms, now_ms, now_ms, cid],
)
.unwrap();
let n_before: u32 = conn
.query_row(
"SELECT COUNT(*) FROM triples WHERE triple_id = ?",
params![tid],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_before, 1);
conn.execute("DELETE FROM clusters WHERE cluster_id = ?", params![cid])
.unwrap();
let n_after: u32 = conn
.query_row(
"SELECT COUNT(*) FROM triples WHERE triple_id = ?",
params![tid],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_after, 0, "CASCADE on clusters should drop the triple");
}
#[test]
fn second_run_is_a_noop() {
let mut conn = open_in_memory();
let v1 = run_migrations(&mut conn).unwrap();
let v2 = run_migrations(&mut conn).unwrap();
assert_eq!(v1, v2);
let count: u32 = conn
.query_row(
"SELECT COUNT(*) FROM schema_migrations WHERE version = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 1, "schema_migrations row must not be inserted twice");
}
#[test]
fn all_canonical_tables_present() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let want = [
"schema_migrations",
"embedders",
"episodes",
"embeddings",
"pending_index",
"triples",
"clusters",
"cluster_episodes",
"semantic_abstractions",
"contradictions",
];
for table in want {
let exists: u32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?",
params![table],
|row| row.get(0),
)
.unwrap();
assert_eq!(exists, 1, "missing canonical table: {table}");
}
}
#[test]
fn fts_virtual_table_present() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let exists: u32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='episodes_fts'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(exists, 1, "episodes_fts virtual table missing");
}
#[test]
fn pending_index_schema_matches_adr() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let cols: Vec<(String, String)> = conn
.prepare("PRAGMA table_info('pending_index')")
.unwrap()
.query_map([], |row| {
Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect();
let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
for required in ["memory_id", "embedding", "embedding_dim", "enqueued_at"] {
assert!(names.contains(&required), "pending_index missing column {required}");
}
}
#[test]
fn fts_trigger_keeps_episodes_content_indexed() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms
) VALUES (?, ?, 'user_message', 'the rain in spain falls mainly on the plain',
'{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params!["00000000-0000-0000-0000-000000000001", now_ms, now_ms, now_ms],
)
.unwrap();
let hit: u32 = conn
.query_row(
"SELECT COUNT(*) FROM episodes_fts WHERE episodes_fts MATCH 'spain'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(hit, 1);
}
#[test]
fn cascade_delete_removes_pending_index_row() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let mid = "00000000-0000-0000-0000-000000000042";
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms
) VALUES (?, ?, 'user_message', 'hello', '{}', 1.0, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms, now_ms, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
VALUES (?, x'00', 1, ?)",
params![mid, now_ms],
)
.unwrap();
conn.execute("DELETE FROM episodes WHERE memory_id = ?", params![mid])
.unwrap();
let remaining: u32 = conn
.query_row(
"SELECT COUNT(*) FROM pending_index WHERE memory_id = ?",
params![mid],
|row| row.get(0),
)
.unwrap();
assert_eq!(remaining, 0, "CASCADE should have removed the pending row");
}
fn insert_test_document(conn: &Connection, doc_id: &str) {
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO documents (doc_id, source, mime_type, ingested_at_ms)
VALUES (?, ?, ?, ?)",
params![doc_id, "/tmp/test.md", "text/markdown", now_ms],
)
.unwrap();
}
fn insert_test_chunk(conn: &Connection, doc_id: &str, idx: i64, content: &str) -> (String, i64) {
let chunk_id = format!("00000000-0000-0000-0000-{:012x}", idx + 0x100);
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO document_chunks (
chunk_id, doc_id, chunk_index, content, token_count,
start_offset, end_offset, created_at_ms
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
params![
chunk_id,
doc_id,
idx,
content,
content.split_whitespace().count() as i64,
0i64,
content.len() as i64,
now_ms,
],
)
.unwrap();
let rowid = conn.last_insert_rowid();
(chunk_id, rowid)
}
#[test]
fn migration_0003_creates_documents_and_chunks_tables() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
for table in ["documents", "document_chunks", "chunk_embeddings", "document_chunks_fts"] {
let exists: u32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type IN ('table','virtual','vtable') AND name=?",
params![table],
|row| row.get(0),
)
.unwrap();
assert_eq!(exists, 1, "missing table after 0003: {table}");
}
}
#[test]
fn migration_0003_pending_index_has_kind_column() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let cols: Vec<(String, String)> = conn
.prepare("PRAGMA table_info('pending_index')")
.unwrap()
.query_map([], |row| {
Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect();
let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
for required in ["kind", "memory_id", "chunk_id", "embedding", "embedding_dim", "enqueued_at"] {
assert!(names.contains(&required), "pending_index missing column {required} after 0003");
}
}
#[test]
fn migration_0003_backfills_existing_pending_rows_as_episode_kind() {
let mut conn = open_in_memory();
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at INTEGER NOT NULL
);",
)
.unwrap();
for m in &MIGRATIONS[..2] {
apply_one(&mut conn, m).unwrap();
}
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let mid = "00000000-0000-0000-0000-0000000000aa";
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms
) VALUES (?, ?, 'user_message', 'pre-0003 row', '{}', 1.0, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms, now_ms, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
VALUES (?, x'00', 1, ?)",
params![mid, now_ms],
)
.unwrap();
run_migrations(&mut conn).unwrap();
let (kind, mem_id, chunk_id): (String, Option<String>, Option<String>) = conn
.query_row(
"SELECT kind, memory_id, chunk_id FROM pending_index",
[],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.unwrap();
assert_eq!(kind, "episode");
assert_eq!(mem_id.as_deref(), Some(mid));
assert!(chunk_id.is_none(), "back-filled row must have NULL chunk_id");
}
#[test]
fn migration_0003_documents_cascade_drops_chunks() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
let doc = "00000000-0000-0000-0000-0000000000d1";
insert_test_document(&conn, doc);
for i in 0..3 {
insert_test_chunk(&conn, doc, i, &format!("chunk {i}"));
}
let n_before: u32 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE doc_id = ?",
params![doc],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_before, 3);
conn.execute("DELETE FROM documents WHERE doc_id = ?", params![doc])
.unwrap();
let n_after: u32 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE doc_id = ?",
params![doc],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_after, 0, "CASCADE on documents must drop chunks");
}
#[test]
fn migration_0003_pending_index_kind_check_constraint_refuses_bogus_kind() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let res = conn.execute(
"INSERT INTO pending_index (kind, memory_id, embedding, embedding_dim, enqueued_at)
VALUES ('bogus', '00000000-0000-0000-0000-000000000001', x'00', 1, ?)",
params![now_ms],
);
assert!(res.is_err(), "kind='bogus' must violate CHECK constraint");
}
#[test]
fn migration_0003_pending_index_xor_refuses_both_episode_and_chunk_set() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let res = conn.execute(
"INSERT INTO pending_index (kind, memory_id, chunk_id, embedding, embedding_dim, enqueued_at)
VALUES ('episode', '00000000-0000-0000-0000-000000000001', '00000000-0000-0000-0000-000000000002', x'00', 1, ?)",
params![now_ms],
);
assert!(res.is_err(), "memory_id AND chunk_id both NOT NULL must violate XOR");
}
#[test]
fn migration_0003_chunk_fts_keeps_in_sync_on_insert() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let doc = "00000000-0000-0000-0000-0000000000d2";
insert_test_document(&conn, doc);
insert_test_chunk(&conn, doc, 0, "the rain in spain falls mainly on the plain");
let hit: u32 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks_fts WHERE document_chunks_fts MATCH 'spain'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(hit, 1, "FTS trigger must index the inserted chunk's content");
}
#[test]
fn migration_0003_chunk_fts_keeps_in_sync_on_delete() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let doc = "00000000-0000-0000-0000-0000000000d3";
insert_test_document(&conn, doc);
let (chunk_id, _) = insert_test_chunk(&conn, doc, 0, "blackbirds singing in the dead of night");
conn.execute("DELETE FROM document_chunks WHERE chunk_id = ?", params![chunk_id])
.unwrap();
let hit: u32 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks_fts WHERE document_chunks_fts MATCH 'blackbirds'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(hit, 0, "FTS trigger must remove the chunk from the index after DELETE");
}
#[test]
fn migration_0003_unique_doc_id_chunk_index_enforced() {
let mut conn = open_in_memory();
run_migrations(&mut conn).unwrap();
let doc = "00000000-0000-0000-0000-0000000000d4";
insert_test_document(&conn, doc);
insert_test_chunk(&conn, doc, 0, "first");
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let res = conn.execute(
"INSERT INTO document_chunks (
chunk_id, doc_id, chunk_index, content, token_count,
start_offset, end_offset, created_at_ms
) VALUES (?, ?, 0, 'duplicate', 1, 0, 9, ?)",
params!["00000000-0000-0000-0000-00000000aaaa", doc, now_ms],
);
assert!(res.is_err(), "(doc_id, chunk_index) must be UNIQUE");
}
}