use rusqlite::Connection;
use crate::error::SqliteError;
pub struct Migration {
pub id: &'static str,
pub up_sql: &'static str,
pub down_sql: Option<&'static str>,
pub is_already_applied: Option<fn(&Connection) -> bool>,
}
pub struct ServiceSchemaPlan {
pub service: &'static str,
pub sqlite: &'static [Migration],
pub postgres: &'static [Migration],
}
const SCHEMA_VERSION_TABLE: &str = "\
CREATE TABLE IF NOT EXISTS _schema_versions (\
service TEXT NOT NULL,\
migration_id TEXT NOT NULL,\
applied_at INTEGER NOT NULL,\
PRIMARY KEY (service, migration_id)\
);\
";
pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
conn.execute_batch(SCHEMA_VERSION_TABLE)?;
for migration in plan.sqlite {
if let Some(check) = migration.is_already_applied {
if check(conn) {
continue;
}
}
let already: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
rusqlite::params![plan.service, migration.id],
|row| row.get(0),
)?;
if already {
continue;
}
conn.execute_batch(migration.up_sql)?;
conn.execute(
"INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![
plan.service,
migration.id,
chrono::Utc::now().timestamp_micros(),
],
)?;
}
Ok(())
}
pub struct VersionedMigration {
pub version: u32,
pub name: &'static str,
pub up: &'static str,
}
const V1_UP: &str = "\
CREATE TABLE IF NOT EXISTS entities (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
name TEXT NOT NULL,\
description TEXT,\
properties TEXT,\
tags TEXT NOT NULL DEFAULT '[]',\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
CREATE TABLE IF NOT EXISTS graph_edges (\
namespace TEXT NOT NULL,\
id TEXT NOT NULL,\
source_id TEXT NOT NULL,\
target_id TEXT NOT NULL,\
relation TEXT NOT NULL,\
weight REAL NOT NULL DEFAULT 1.0,\
created_at INTEGER NOT NULL,\
metadata TEXT,\
PRIMARY KEY (namespace, id)\
);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
CREATE TABLE IF NOT EXISTS notes (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
content TEXT NOT NULL DEFAULT '',\
salience REAL NOT NULL DEFAULT 0.5,\
decay_factor REAL NOT NULL DEFAULT 0.0,\
expires_at INTEGER,\
properties TEXT,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
CREATE TABLE IF NOT EXISTS events (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
verb TEXT NOT NULL,\
substrate TEXT NOT NULL,\
actor TEXT NOT NULL,\
outcome TEXT NOT NULL,\
data TEXT,\
duration_us INTEGER NOT NULL DEFAULT 0,\
target_id TEXT,\
created_at INTEGER NOT NULL\
);\
CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
";
const V4_DEDUPE_GRAPH_EDGE_TRIPLES: &str = "\
DELETE FROM graph_edges \
WHERE rowid NOT IN (\
SELECT MIN(rowid) \
FROM graph_edges \
GROUP BY namespace, source_id, target_id, relation\
);\
CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple \
ON graph_edges(namespace, source_id, target_id, relation);\
";
const V5_ADD_ENTITY_TYPE_TO_ENTITIES: &str = "\
ALTER TABLE entities ADD COLUMN entity_type TEXT NULL;\
CREATE INDEX IF NOT EXISTS idx_entities_kind_entity_type \
ON entities(namespace, kind, entity_type);\
";
const V9_EDGE_LIFECYCLE_AND_TARGET_BACKEND: &str = "\
DROP INDEX IF EXISTS idx_graph_edges_unique_triple;\
DROP INDEX IF EXISTS idx_graph_edges_ns_source;\
DROP INDEX IF EXISTS idx_graph_edges_ns_target;\
DROP INDEX IF EXISTS idx_graph_edges_ns_relation;\
DROP INDEX IF EXISTS idx_graph_edges_ns_src_rel;\
DROP INDEX IF EXISTS idx_graph_edges_ns_tgt_rel;\
CREATE TABLE graph_edges_new (\
namespace TEXT NOT NULL,\
id TEXT NOT NULL,\
source_id TEXT NOT NULL,\
target_id TEXT NOT NULL,\
relation TEXT NOT NULL,\
weight REAL NOT NULL DEFAULT 1.0,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER,\
metadata TEXT,\
target_backend TEXT,\
PRIMARY KEY (namespace, id)\
);\
INSERT INTO graph_edges_new \
(namespace, id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, metadata, target_backend) \
SELECT namespace, id, source_id, target_id, relation, weight, created_at, created_at, NULL, metadata, NULL \
FROM graph_edges;\
DROP TABLE graph_edges;\
ALTER TABLE graph_edges_new RENAME TO graph_edges;\
CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple ON graph_edges(namespace, source_id, target_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_target_backend ON graph_edges(target_backend) WHERE target_backend IS NOT NULL;\
";
const V10_NOTE_STATUS_AND_NULLABLE_METRICS: &str = "\
ALTER TABLE notes ADD COLUMN status TEXT NOT NULL DEFAULT 'active';\
";
const V11_ENTITY_TOMBSTONE_COLUMNS: &str = "\
ALTER TABLE entities ADD COLUMN merged_into TEXT;\
ALTER TABLE entities ADD COLUMN merge_event_id TEXT;\
CREATE INDEX IF NOT EXISTS idx_entities_merged_into ON entities(namespace, merged_into);\
";
const V12_NULLABLE_NOTE_METRICS: &str = "\
CREATE TABLE notes_new (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
status TEXT NOT NULL DEFAULT 'active',\
name TEXT,\
content TEXT NOT NULL DEFAULT '',\
salience REAL,\
decay_factor REAL,\
expires_at INTEGER,\
properties TEXT,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
INSERT INTO notes_new \
(id, namespace, kind, status, name, content, salience, decay_factor, \
expires_at, properties, created_at, updated_at, deleted_at) \
SELECT \
id, namespace, kind, status, name, content, salience, decay_factor, \
expires_at, properties, created_at, updated_at, deleted_at \
FROM notes;\
DROP TABLE notes;\
ALTER TABLE notes_new RENAME TO notes;\
CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
";
const V13_EVENT_OBSERVABILITY_PROVENANCE: &str = "__v13_computed_at_runtime__";
pub const EMBEDDING_MODELS_DDL: &str = "\
CREATE TABLE IF NOT EXISTS _embedding_models (\
id BLOB PRIMARY KEY,\
engine_name TEXT NOT NULL,\
model_id TEXT NOT NULL,\
key_version TEXT NOT NULL,\
dim INTEGER NOT NULL,\
output_dim INTEGER,\
status TEXT NOT NULL CHECK (status IN ('pending', 'active', 'superseded', 'archived')),\
activated_at INTEGER,\
superseded_at INTEGER,\
superseded_by BLOB,\
canonical_key BLOB NOT NULL UNIQUE,\
created_at INTEGER NOT NULL\
);\
CREATE UNIQUE INDEX IF NOT EXISTS idx_embed_models_one_active \
ON _embedding_models(engine_name) WHERE status = 'active';\
CREATE INDEX IF NOT EXISTS idx_embed_models_engine_status \
ON _embedding_models(engine_name, status);";
const V14_EMBEDDING_MODEL_REGISTRY: &str = "__v14_computed_at_runtime__";
const V15_PROPOSALS_OPEN: &str = "\
CREATE TABLE IF NOT EXISTS proposals_open (\
proposal_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
proposer TEXT NOT NULL,\
title TEXT NOT NULL,\
status TEXT NOT NULL CHECK (status IN ('open', 'changes_requested', 'approved', 'rejected', 'applied', 'withdrawn')),\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
expiry INTEGER,\
last_decision TEXT,\
review_count INTEGER NOT NULL DEFAULT 0,\
approve_count INTEGER NOT NULL DEFAULT 0,\
reject_count INTEGER NOT NULL DEFAULT 0\
);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_ns_status ON proposals_open(namespace, status);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_proposer ON proposals_open(namespace, proposer);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_updated ON proposals_open(namespace, updated_at DESC);\
";
pub const MIGRATIONS: &[VersionedMigration] = &[
VersionedMigration {
version: 1,
name: "initial_schema",
up: V1_UP,
},
VersionedMigration {
version: 2,
name: "add_name_to_notes",
up: "ALTER TABLE notes ADD COLUMN name TEXT;",
},
VersionedMigration {
version: 3,
name: "add_events_namespace_created_index",
up: "CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);",
},
VersionedMigration {
version: 4,
name: "dedupe_graph_edge_triples",
up: V4_DEDUPE_GRAPH_EDGE_TRIPLES,
},
VersionedMigration {
version: 5,
name: "add_entity_type_to_entities",
up: V5_ADD_ENTITY_TYPE_TO_ENTITIES,
},
VersionedMigration {
version: 6,
name: "reserved_adr043_embedding_pipeline_extensions",
up: "SELECT 1;",
},
VersionedMigration {
version: 7,
name: "reserved_adr046_event_sourced_proposals_index",
up: "SELECT 1;",
},
VersionedMigration {
version: 8,
name: "reserved_adr041_event_observations_and_session_id",
up: "SELECT 1;",
},
VersionedMigration {
version: 9,
name: "edge_lifecycle_and_target_backend",
up: V9_EDGE_LIFECYCLE_AND_TARGET_BACKEND,
},
VersionedMigration {
version: 10,
name: "note_status_and_nullable_metrics",
up: V10_NOTE_STATUS_AND_NULLABLE_METRICS,
},
VersionedMigration {
version: 11,
name: "entity_tombstone_columns",
up: V11_ENTITY_TOMBSTONE_COLUMNS,
},
VersionedMigration {
version: 12,
name: "nullable_note_metrics",
up: V12_NULLABLE_NOTE_METRICS,
},
VersionedMigration {
version: 13,
name: "event_observability_provenance",
up: V13_EVENT_OBSERVABILITY_PROVENANCE,
},
VersionedMigration {
version: 14,
name: "embedding_model_registry",
up: V14_EMBEDDING_MODEL_REGISTRY,
},
VersionedMigration {
version: 15,
name: "proposals_open",
up: V15_PROPOSALS_OPEN,
},
];
const MIGRATION_TRACKING_TABLE: &str = "\
CREATE TABLE IF NOT EXISTS _schema_migrations (\
version INTEGER PRIMARY KEY,\
name TEXT NOT NULL,\
applied_at INTEGER NOT NULL\
);\
";
pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
for (i, m) in MIGRATIONS.iter().enumerate() {
let expected = (i + 1) as u32;
if m.version != expected {
return Err(SqliteError::InvalidData(format!(
"MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
got version {}",
m.version
)));
}
}
conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
let current_version: u32 = conn
.query_row(
"SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
[],
|row| row.get(0),
)
.unwrap_or(0);
let mut applied_version = current_version;
for migration in MIGRATIONS {
if migration.version <= current_version {
continue;
}
if migration.version == 2 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 5 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('entities') WHERE name = 'entity_type'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 10 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'status'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 11 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('entities') WHERE name = 'merged_into'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 12 {
let already_nullable: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') \
WHERE name = 'salience' AND \"notnull\" = 0",
[],
|row| row.get(0),
)
.unwrap_or(false);
if already_nullable {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
let tx = conn.transaction().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let up_sql = if migration.version == 13 {
build_v13_event_observability_sql(&tx).map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?
} else if migration.version == 14 {
build_v14_embedding_model_registry_sql(&tx).map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?
} else {
migration.up.to_string()
};
tx.execute_batch(&up_sql)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let now = chrono::Utc::now().timestamp_micros();
tx.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.commit().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
}
Ok(applied_version)
}
fn table_has_column(
conn: &Connection,
table: &'static str,
column: &'static str,
) -> Result<bool, rusqlite::Error> {
conn.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
rusqlite::params![table, column],
|row| row.get(0),
)
}
fn build_v13_event_observability_sql(conn: &Connection) -> Result<String, rusqlite::Error> {
let mut sql = String::new();
for (column, ddl) in [
(
"kind",
"ALTER TABLE events ADD COLUMN kind TEXT NOT NULL DEFAULT 'audit';",
),
(
"payload",
"ALTER TABLE events ADD COLUMN payload TEXT NOT NULL DEFAULT '{}';",
),
(
"payload_schema_version",
"ALTER TABLE events ADD COLUMN payload_schema_version INTEGER NOT NULL DEFAULT 1;",
),
(
"profile_state_version",
"ALTER TABLE events ADD COLUMN profile_state_version INTEGER;",
),
(
"session_id",
"ALTER TABLE events ADD COLUMN session_id TEXT;",
),
(
"aggregate_kind",
"ALTER TABLE events ADD COLUMN aggregate_kind TEXT;",
),
(
"aggregate_id",
"ALTER TABLE events ADD COLUMN aggregate_id TEXT;",
),
] {
if !table_has_column(conn, "events", column)? {
sql.push_str(ddl);
}
}
if table_has_column(conn, "events", "data")? && table_has_column(conn, "events", "payload")? {
sql.push_str("UPDATE events SET payload = data WHERE data IS NOT NULL AND data <> '';");
}
sql.push_str(
"CREATE TABLE IF NOT EXISTS event_observations (\
event_id TEXT NOT NULL,\
entity_id TEXT NOT NULL,\
referent_kind TEXT NOT NULL,\
role TEXT NOT NULL,\
position INTEGER NOT NULL,\
PRIMARY KEY (event_id, role, position)\
);\
CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);\
CREATE INDEX IF NOT EXISTS idx_events_session ON events(namespace, session_id, created_at, id);\
CREATE INDEX IF NOT EXISTS idx_events_ns_created_id ON events(namespace, created_at DESC, id DESC);\
CREATE INDEX IF NOT EXISTS idx_events_payload_proposal_id ON events(json_extract(payload, '$.proposal_id'));\
CREATE INDEX IF NOT EXISTS idx_event_obs_entity ON event_observations(entity_id, role);\
CREATE INDEX IF NOT EXISTS idx_event_obs_event_role ON event_observations(event_id, role);",
);
Ok(sql)
}
fn build_v14_embedding_model_registry_sql(conn: &Connection) -> Result<String, rusqlite::Error> {
let mut sql = String::from(EMBEDDING_MODELS_DDL);
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master \
WHERE type = 'table' \
AND name LIKE 'vec_%' \
AND sql NOT LIKE '%VIRTUAL%' \
AND sql NOT LIKE '%vec0%' \
AND name NOT LIKE '%\\_chunks' ESCAPE '\\' \
AND name NOT LIKE '%\\_rowids' ESCAPE '\\' \
AND name NOT LIKE '%\\_info' ESCAPE '\\' \
AND name NOT LIKE '%\\_vector\\_chunks%' ESCAPE '\\'",
)?;
let vec_tables: Vec<String> = stmt
.query_map([], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
for table in &vec_tables {
let valid = table.starts_with("vec_")
&& table[4..]
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_');
if !valid {
continue;
}
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = 'embedding_model_id'",
rusqlite::params![table],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
continue;
}
sql.push_str(&format!(
"ALTER TABLE {t} ADD COLUMN embedding_model_id BLOB REFERENCES _embedding_models(id);\
CREATE INDEX IF NOT EXISTS idx_{t}_model ON {t}(embedding_model_id);",
t = table,
));
}
Ok(sql)
}
#[cfg(test)]
mod tests {
use super::*;
fn open_memory() -> Connection {
Connection::open_in_memory().expect("in-memory connection")
}
#[test]
fn fresh_db_migrates_to_latest() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 15);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 15);
let tbl_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(tbl_count, 1);
let col_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(col_count, 1, "V2 must add name column to notes");
let et_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('entities') WHERE name = 'entity_type'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(et_count, 1, "V5 must add entity_type column to entities");
let idx_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' \
AND name='idx_entities_kind_entity_type'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(idx_count, 1, "V5 must create idx_entities_kind_entity_type");
let status_col: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'status'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(status_col, 1, "V10 must add status column to notes");
let merged_into_col: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('entities') WHERE name = 'merged_into'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
merged_into_col, 1,
"V11 must add merged_into column to entities"
);
let salience_notnull: i64 = conn
.query_row(
"SELECT \"notnull\" FROM pragma_table_info('notes') WHERE name = 'salience'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(salience_notnull, 0, "V12 must make salience nullable");
for col in [
"kind",
"payload",
"payload_schema_version",
"profile_state_version",
"session_id",
"aggregate_kind",
"aggregate_id",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('events') WHERE name = ?1",
[col],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V13 must add events.{col}");
}
let obs_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='event_observations'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(obs_tbl, 1, "V13 must create event_observations table");
for idx in [
"idx_events_ns_created_id",
"idx_events_session",
"idx_events_payload_proposal_id",
"idx_event_obs_entity",
"idx_event_obs_event_role",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name=?1",
[idx],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V13 must create index {idx}");
}
let embed_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='_embedding_models'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(embed_tbl, 1, "V14 must create _embedding_models table");
for idx in [
"idx_embed_models_one_active",
"idx_embed_models_engine_status",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name=?1",
[idx],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V14 must create index {idx}");
}
let proposals_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='proposals_open'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(proposals_tbl, 1, "V15 must create proposals_open table");
for idx in [
"idx_proposals_open_ns_status",
"idx_proposals_open_proposer",
"idx_proposals_open_updated",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name=?1",
[idx],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V15 must create index {idx}");
}
}
#[test]
fn run_migrations_twice_is_idempotent() {
let mut conn = open_memory();
let v1 = run_migrations(&mut conn).expect("first run");
let v2 = run_migrations(&mut conn).expect("second run");
assert_eq!(v1, 15);
assert_eq!(v2, 15);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 15);
}
#[test]
fn migration_v9_adds_target_backend_index() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(
version, 15,
"F052: latest migration must be V15 (proposals_open)"
);
let col: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('graph_edges') WHERE name = 'target_backend'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
col, 1,
"F052: graph_edges must have target_backend column after V9 migration"
);
let idx: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_graph_edges_target_backend'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
idx, 1,
"F052: idx_graph_edges_target_backend partial index must exist after V9 migration"
);
}
#[test]
fn failed_migration_rolls_back() {
let bad_v16 = VersionedMigration {
version: 16,
name: "bad_migration",
up: "THIS IS NOT VALID SQL;",
};
let mut conn = open_memory();
run_migrations(&mut conn).expect("V1..V15 should apply cleanly");
let result = apply_single_migration(&mut conn, &bad_v16);
assert!(result.is_err(), "bad migration should return error");
let v16_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 16",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(v16_count, 0, "V16 must not be recorded after rollback");
let applied_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(applied_count, 15, "V1..V15 must still be recorded");
}
#[test]
fn store_ddl_then_migrations_is_idempotent() {
use crate::stores::entity::ensure_entities_schema;
use crate::stores::note::ensure_notes_schema;
let mut conn = open_memory();
ensure_notes_schema(&conn).expect("store DDL should create notes");
ensure_entities_schema(&conn).expect("store DDL should create entities");
let has_name: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap();
assert!(has_name, "NOTES_DDL should include name column");
let version = run_migrations(&mut conn).expect("migrations after store DDL");
assert_eq!(version, 15);
let v2_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v2_count, 1,
"V2 must be recorded even when column pre-exists"
);
let v5_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 5",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v5_count, 1,
"V5 must be recorded even when entity_type column pre-exists"
);
let v9_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 9",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v9_count, 1,
"V9 must be recorded after store-DDL + migrations"
);
let v10_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 10",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v10_count, 1,
"V10 must be recorded even when status column pre-exists via NOTES_DDL"
);
let v11_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 11",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v11_count, 1,
"V11 must be recorded even when merged_into column pre-exists via ENTITIES_DDL"
);
let v12_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 12",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v12_count, 1,
"V12 must be recorded even when salience is already nullable via NOTES_DDL"
);
let v13_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 13",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v13_count, 1,
"V13 must be recorded after store-DDL + migrations"
);
let v14_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 14",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v14_count, 1,
"V14 must be recorded after store-DDL + migrations"
);
let v15_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 15",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v15_count, 1,
"V15 must be recorded after store-DDL + migrations"
);
}
#[test]
fn v1_to_v12_allows_null_salience() {
let mut conn = open_memory();
conn.execute_batch(MIGRATION_TRACKING_TABLE).unwrap();
conn.execute_batch(
"CREATE TABLE entities (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
name TEXT NOT NULL,\
description TEXT,\
properties TEXT,\
tags TEXT NOT NULL DEFAULT '[]',\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE TABLE graph_edges (\
namespace TEXT NOT NULL,\
id TEXT NOT NULL,\
source_id TEXT NOT NULL,\
target_id TEXT NOT NULL,\
relation TEXT NOT NULL,\
weight REAL NOT NULL DEFAULT 1.0,\
created_at INTEGER NOT NULL,\
metadata TEXT,\
PRIMARY KEY (namespace, id)\
);\
CREATE TABLE notes (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
content TEXT NOT NULL DEFAULT '',\
salience REAL NOT NULL DEFAULT 0.5,\
decay_factor REAL NOT NULL DEFAULT 0.0,\
expires_at INTEGER,\
properties TEXT,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE TABLE events (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
verb TEXT NOT NULL,\
substrate TEXT NOT NULL,\
actor TEXT NOT NULL,\
outcome TEXT NOT NULL,\
data TEXT,\
duration_us INTEGER NOT NULL DEFAULT 0,\
target_id TEXT,\
created_at INTEGER NOT NULL\
);",
)
.unwrap();
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (1, 'initial_schema', ?1)",
rusqlite::params![now],
)
.unwrap();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 15);
let notnull: i64 = conn
.query_row(
"SELECT \"notnull\" FROM pragma_table_info('notes') WHERE name = 'salience'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(notnull, 0, "salience must be nullable after V12");
conn.execute(
"INSERT INTO notes (id, namespace, kind, status, content, created_at, updated_at) \
VALUES ('test-id', 'ns', 'observation', 'active', '', 1, 1)",
[],
)
.expect("inserting note with NULL salience must succeed after V12");
let stored_salience: Option<f64> = conn
.query_row(
"SELECT salience FROM notes WHERE id = 'test-id'",
[],
|row| row.get(0),
)
.unwrap();
assert!(
stored_salience.is_none(),
"salience must be NULL when not supplied"
);
}
#[test]
fn store_ddl_then_event_migration_is_idempotent() {
use crate::stores::event::ensure_events_schema;
let mut conn = open_memory();
ensure_events_schema(&conn).expect("store DDL should create events");
let version = run_migrations(&mut conn).expect("migrations after events store DDL");
assert_eq!(version, 15, "must reach V15 even when events DDL ran first");
let v13_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 13",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v13_count, 1, "V13 must be recorded");
let v14_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 14",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v14_count, 1, "V14 must be recorded");
let v15_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 15",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v15_count, 1, "V15 must be recorded");
}
#[test]
fn migration_v14_creates_embedding_model_registry() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(
version, 15,
"F227: latest migration must be V15 (proposals_open)"
);
let tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='_embedding_models'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(tbl, 1, "F227: _embedding_models table must exist after V14");
let one_active_idx: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_embed_models_one_active'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
one_active_idx, 1,
"V14 must create idx_embed_models_one_active partial unique index"
);
let engine_status_idx: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_embed_models_engine_status'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
engine_status_idx, 1,
"V14 must create idx_embed_models_engine_status index"
);
for col in [
"id",
"engine_name",
"model_id",
"key_version",
"dim",
"output_dim",
"status",
"activated_at",
"superseded_at",
"superseded_by",
"canonical_key",
"created_at",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('_embedding_models') WHERE name = ?1",
[col],
|r| r.get(0),
)
.unwrap();
assert!(
exists,
"F227: _embedding_models must have column '{col}' after V14"
);
}
}
#[test]
fn migration_v14_adds_embedding_model_id_to_existing_regular_vec_tables() {
let mut conn = open_memory();
conn.execute_batch(
"CREATE TABLE vec_legacy_model (\
subject_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
field TEXT NOT NULL\
);",
)
.unwrap();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 15);
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('vec_legacy_model') WHERE name = 'embedding_model_id'",
[],
|r| r.get(0),
)
.unwrap();
assert!(
col_exists,
"F228: V14 must add embedding_model_id to existing regular vec_ tables"
);
let version2 = run_migrations(&mut conn).expect("second run must succeed");
assert_eq!(version2, 15);
}
#[test]
fn migration_v14_does_not_alter_sqlite_vec_shadow_tables() {
let mut conn = open_memory();
conn.execute_batch(
"CREATE TABLE vec_test_chunks (x INTEGER);\
CREATE TABLE vec_test_rowids (x INTEGER);\
CREATE TABLE vec_test_info (x INTEGER);\
CREATE TABLE vec_test_vector_chunks00 (x INTEGER);",
)
.unwrap();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 15);
for shadow in [
"vec_test_chunks",
"vec_test_rowids",
"vec_test_info",
"vec_test_vector_chunks00",
] {
let col_added: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) \
WHERE name = 'embedding_model_id'",
rusqlite::params![shadow],
|r| r.get(0),
)
.unwrap();
assert!(
!col_added,
"CRIT-2: V14 must NOT add embedding_model_id to sqlite-vec shadow table '{shadow}'"
);
}
}
fn apply_single_migration(
conn: &mut Connection,
migration: &VersionedMigration,
) -> Result<(), SqliteError> {
let tx = conn.transaction().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.execute_batch(migration.up)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let now = chrono::Utc::now().timestamp_micros();
tx.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.commit().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
Ok(())
}
}