use rusqlite::Connection;
use crate::error::SqliteError;
pub struct Migration {
pub id: &'static str,
pub up_sql: &'static str,
pub down_sql: Option<&'static str>,
pub is_already_applied: Option<fn(&Connection) -> bool>,
}
pub struct ServiceSchemaPlan {
pub service: &'static str,
pub sqlite: &'static [Migration],
pub postgres: &'static [Migration],
}
const SCHEMA_VERSION_TABLE: &str = "\
CREATE TABLE IF NOT EXISTS _schema_versions (\
service TEXT NOT NULL,\
migration_id TEXT NOT NULL,\
applied_at INTEGER NOT NULL,\
PRIMARY KEY (service, migration_id)\
);\
";
pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
conn.execute_batch(SCHEMA_VERSION_TABLE)?;
for migration in plan.sqlite {
if let Some(check) = migration.is_already_applied {
if check(conn) {
continue;
}
}
let already: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
rusqlite::params![plan.service, migration.id],
|row| row.get(0),
)?;
if already {
continue;
}
conn.execute_batch(migration.up_sql)?;
conn.execute(
"INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![
plan.service,
migration.id,
chrono::Utc::now().timestamp_micros(),
],
)?;
}
Ok(())
}
pub struct VersionedMigration {
pub version: u32,
pub name: &'static str,
pub up: &'static str,
}
const V1_UP: &str = "\
CREATE TABLE IF NOT EXISTS entities (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
name TEXT NOT NULL,\
description TEXT,\
properties TEXT,\
tags TEXT NOT NULL DEFAULT '[]',\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);\
CREATE INDEX IF NOT EXISTS idx_entities_kind ON entities(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(namespace, name);\
CREATE INDEX IF NOT EXISTS idx_entities_created ON entities(created_at DESC);\
CREATE TABLE IF NOT EXISTS graph_edges (\
namespace TEXT NOT NULL,\
id TEXT NOT NULL,\
source_id TEXT NOT NULL,\
target_id TEXT NOT NULL,\
relation TEXT NOT NULL,\
weight REAL NOT NULL DEFAULT 1.0,\
created_at INTEGER NOT NULL,\
metadata TEXT,\
PRIMARY KEY (namespace, id)\
);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
CREATE TABLE IF NOT EXISTS notes (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
content TEXT NOT NULL DEFAULT '',\
salience REAL NOT NULL DEFAULT 0.5,\
decay_factor REAL NOT NULL DEFAULT 0.0,\
expires_at INTEGER,\
properties TEXT,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
CREATE TABLE IF NOT EXISTS events (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
verb TEXT NOT NULL,\
substrate TEXT NOT NULL,\
actor TEXT NOT NULL,\
outcome TEXT NOT NULL,\
data TEXT,\
duration_us INTEGER NOT NULL DEFAULT 0,\
target_id TEXT,\
created_at INTEGER NOT NULL\
);\
CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
";
const V4_DEDUPE_GRAPH_EDGE_TRIPLES: &str = "\
DELETE FROM graph_edges \
WHERE rowid NOT IN (\
SELECT MIN(rowid) \
FROM graph_edges \
GROUP BY namespace, source_id, target_id, relation\
);\
CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple \
ON graph_edges(namespace, source_id, target_id, relation);\
";
const V5_ADD_ENTITY_TYPE_TO_ENTITIES: &str = "\
ALTER TABLE entities ADD COLUMN entity_type TEXT NULL;\
CREATE INDEX IF NOT EXISTS idx_entities_kind_entity_type \
ON entities(namespace, kind, entity_type);\
";
const V9_EDGE_LIFECYCLE_AND_TARGET_BACKEND: &str = "\
DROP INDEX IF EXISTS idx_graph_edges_unique_triple;\
DROP INDEX IF EXISTS idx_graph_edges_ns_source;\
DROP INDEX IF EXISTS idx_graph_edges_ns_target;\
DROP INDEX IF EXISTS idx_graph_edges_ns_relation;\
DROP INDEX IF EXISTS idx_graph_edges_ns_src_rel;\
DROP INDEX IF EXISTS idx_graph_edges_ns_tgt_rel;\
CREATE TABLE graph_edges_new (\
namespace TEXT NOT NULL,\
id TEXT NOT NULL,\
source_id TEXT NOT NULL,\
target_id TEXT NOT NULL,\
relation TEXT NOT NULL,\
weight REAL NOT NULL DEFAULT 1.0,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER,\
metadata TEXT,\
target_backend TEXT,\
PRIMARY KEY (namespace, id)\
);\
INSERT INTO graph_edges_new \
(namespace, id, source_id, target_id, relation, weight, created_at, updated_at, deleted_at, metadata, target_backend) \
SELECT namespace, id, source_id, target_id, relation, weight, created_at, created_at, NULL, metadata, NULL \
FROM graph_edges;\
DROP TABLE graph_edges;\
ALTER TABLE graph_edges_new RENAME TO graph_edges;\
CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple ON graph_edges(namespace, source_id, target_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
CREATE INDEX IF NOT EXISTS idx_graph_edges_target_backend ON graph_edges(target_backend) WHERE target_backend IS NOT NULL;\
";
const V10_NOTE_STATUS_AND_NULLABLE_METRICS: &str = "\
ALTER TABLE notes ADD COLUMN status TEXT NOT NULL DEFAULT 'active';\
";
const V11_ENTITY_TOMBSTONE_COLUMNS: &str = "\
ALTER TABLE entities ADD COLUMN merged_into TEXT;\
ALTER TABLE entities ADD COLUMN merge_event_id TEXT;\
CREATE INDEX IF NOT EXISTS idx_entities_merged_into ON entities(namespace, merged_into);\
";
const V12_NULLABLE_NOTE_METRICS: &str = "\
CREATE TABLE notes_new (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
status TEXT NOT NULL DEFAULT 'active',\
name TEXT,\
content TEXT NOT NULL DEFAULT '',\
salience REAL,\
decay_factor REAL,\
expires_at INTEGER,\
properties TEXT,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
INSERT INTO notes_new \
(id, namespace, kind, status, name, content, salience, decay_factor, \
expires_at, properties, created_at, updated_at, deleted_at) \
SELECT \
id, namespace, kind, status, name, content, salience, decay_factor, \
expires_at, properties, created_at, updated_at, deleted_at \
FROM notes;\
DROP TABLE notes;\
ALTER TABLE notes_new RENAME TO notes;\
CREATE INDEX IF NOT EXISTS idx_notes_namespace ON notes(namespace);\
CREATE INDEX IF NOT EXISTS idx_notes_kind ON notes(namespace, kind);\
CREATE INDEX IF NOT EXISTS idx_notes_created ON notes(created_at DESC);\
";
const V13_EVENT_OBSERVABILITY_PROVENANCE: &str = "__v13_computed_at_runtime__";
pub const EMBEDDING_MODELS_DDL: &str = "\
CREATE TABLE IF NOT EXISTS _embedding_models (\
id BLOB PRIMARY KEY,\
engine_name TEXT NOT NULL,\
model_id TEXT NOT NULL,\
key_version TEXT NOT NULL,\
dim INTEGER NOT NULL,\
output_dim INTEGER,\
status TEXT NOT NULL CHECK (status IN ('pending', 'active', 'superseded', 'archived')),\
activated_at INTEGER,\
superseded_at INTEGER,\
superseded_by BLOB,\
canonical_key BLOB NOT NULL UNIQUE,\
created_at INTEGER NOT NULL\
);\
CREATE UNIQUE INDEX IF NOT EXISTS idx_embed_models_one_active \
ON _embedding_models(engine_name) WHERE status = 'active';\
CREATE INDEX IF NOT EXISTS idx_embed_models_engine_status \
ON _embedding_models(engine_name, status);";
const V14_EMBEDDING_MODEL_REGISTRY: &str = "__v14_computed_at_runtime__";
const V16_VECTOR_EMBEDDING_MODEL_TAG: &str = "__v16_computed_at_runtime__";
const V17_VECTOR_EMBEDDING_MODEL_TAG_PRESERVING_REBUILD: &str = "__v17_computed_at_runtime__";
const V15_PROPOSALS_OPEN: &str = "\
CREATE TABLE IF NOT EXISTS proposals_open (\
proposal_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
proposer TEXT NOT NULL,\
title TEXT NOT NULL,\
status TEXT NOT NULL CHECK (status IN ('open', 'changes_requested', 'approved', 'rejected', 'applied', 'withdrawn')),\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
expiry INTEGER,\
last_decision TEXT,\
review_count INTEGER NOT NULL DEFAULT 0,\
approve_count INTEGER NOT NULL DEFAULT 0,\
reject_count INTEGER NOT NULL DEFAULT 0\
);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_ns_status ON proposals_open(namespace, status);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_proposer ON proposals_open(namespace, proposer);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_updated ON proposals_open(namespace, updated_at DESC);\
";
const V19_KNOWLEDGE_ATOMS_AND_DOMAINS: &str = "\
CREATE TABLE IF NOT EXISTS knowledge_atoms (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
slug TEXT NOT NULL,\
name TEXT NOT NULL,\
description TEXT,\
content TEXT NOT NULL DEFAULT '',\
tags TEXT NOT NULL DEFAULT '[]',\
properties TEXT,\
finalized INTEGER NOT NULL DEFAULT 0,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_atoms_ns_slug \
ON knowledge_atoms(namespace, slug);\
CREATE INDEX IF NOT EXISTS idx_knowledge_atoms_ns \
ON knowledge_atoms(namespace);\
CREATE INDEX IF NOT EXISTS idx_knowledge_atoms_ns_created \
ON knowledge_atoms(namespace, created_at DESC);\
CREATE TABLE IF NOT EXISTS knowledge_domains (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
slug TEXT NOT NULL,\
name TEXT NOT NULL,\
description TEXT,\
tags TEXT NOT NULL DEFAULT '[]',\
members TEXT NOT NULL DEFAULT '[]',\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_domains_ns_slug \
ON knowledge_domains(namespace, slug);\
CREATE INDEX IF NOT EXISTS idx_knowledge_domains_ns \
ON knowledge_domains(namespace);\
CREATE VIRTUAL TABLE IF NOT EXISTS fts_knowledge \
USING fts5(\
id UNINDEXED,\
namespace UNINDEXED,\
slug,\
name,\
description,\
content,\
content=knowledge_atoms,\
content_rowid=rowid,\
tokenize='trigram case_sensitive 0'\
);\
CREATE TRIGGER IF NOT EXISTS fts_knowledge_ai \
AFTER INSERT ON knowledge_atoms \
WHEN new.deleted_at IS NULL BEGIN \
INSERT INTO fts_knowledge(rowid, id, namespace, slug, name, description, content) \
VALUES(new.rowid, new.id, new.namespace, new.slug, new.name, new.description, new.content); \
END; \
CREATE TRIGGER IF NOT EXISTS fts_knowledge_ad \
AFTER DELETE ON knowledge_atoms BEGIN \
INSERT INTO fts_knowledge(fts_knowledge, rowid, id, namespace, slug, name, description, content) \
VALUES('delete', old.rowid, old.id, old.namespace, old.slug, old.name, old.description, old.content); \
END; \
CREATE TRIGGER IF NOT EXISTS fts_knowledge_au \
AFTER UPDATE ON knowledge_atoms BEGIN \
INSERT INTO fts_knowledge(fts_knowledge, rowid, id, namespace, slug, name, description, content) \
VALUES('delete', old.rowid, old.id, old.namespace, old.slug, old.name, old.description, old.content); \
INSERT INTO fts_knowledge(rowid, id, namespace, slug, name, description, content) \
SELECT new.rowid, new.id, new.namespace, new.slug, new.name, new.description, new.content \
WHERE new.deleted_at IS NULL; \
END;\
";
const V20_BRAIN_PROFILE_PERSISTENCE: &str = "\
CREATE TABLE IF NOT EXISTS brain_profile_snapshots (\
profile_id TEXT NOT NULL,\
namespace TEXT NOT NULL DEFAULT 'default',\
snapshot_json TEXT NOT NULL,\
updated_at INTEGER NOT NULL,\
PRIMARY KEY (profile_id, namespace)\
);\
CREATE TABLE IF NOT EXISTS brain_event_log (\
id INTEGER PRIMARY KEY AUTOINCREMENT,\
profile_id TEXT NOT NULL,\
namespace TEXT NOT NULL DEFAULT 'default',\
event_kind TEXT NOT NULL,\
payload TEXT NOT NULL,\
created_at INTEGER NOT NULL\
);\
CREATE INDEX IF NOT EXISTS idx_brain_events_profile \
ON brain_event_log(profile_id, namespace, created_at);\
";
const V22_KNOWLEDGE_LIFECYCLE_STATUS: &str = "\
ALTER TABLE knowledge_atoms ADD COLUMN status TEXT NOT NULL DEFAULT 'draft';\
ALTER TABLE knowledge_atoms ADD COLUMN source_uri TEXT;\
ALTER TABLE knowledge_atoms ADD COLUMN source_type TEXT;\
ALTER TABLE knowledge_sections ADD COLUMN status TEXT NOT NULL DEFAULT 'draft';\
ALTER TABLE knowledge_domains ADD COLUMN status TEXT NOT NULL DEFAULT 'draft';\
CREATE INDEX IF NOT EXISTS idx_knowledge_atoms_ns_status \
ON knowledge_atoms(namespace, status);\
CREATE INDEX IF NOT EXISTS idx_knowledge_sections_status \
ON knowledge_sections(status);\
CREATE INDEX IF NOT EXISTS idx_knowledge_domains_ns_status \
ON knowledge_domains(namespace, status);\
UPDATE knowledge_atoms SET status = 'reviewed' WHERE finalized = 1;\
";
const V21_KNOWLEDGE_SECTIONS: &str = "\
CREATE TABLE IF NOT EXISTS knowledge_sections (\
id TEXT PRIMARY KEY,\
atom_id TEXT NOT NULL,\
namespace TEXT NOT NULL,\
section_type TEXT NOT NULL,\
heading TEXT NOT NULL DEFAULT '',\
content TEXT NOT NULL DEFAULT '',\
tokens INTEGER NOT NULL DEFAULT 0,\
sort_order INTEGER NOT NULL DEFAULT 0,\
embedding BLOB,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
FOREIGN KEY (atom_id) REFERENCES knowledge_atoms(id),\
UNIQUE(atom_id, section_type)\
);\
CREATE INDEX IF NOT EXISTS idx_knowledge_sections_atom \
ON knowledge_sections(atom_id);\
CREATE INDEX IF NOT EXISTS idx_knowledge_sections_ns_type \
ON knowledge_sections(namespace, section_type);\
CREATE INDEX IF NOT EXISTS idx_knowledge_sections_ns_atom \
ON knowledge_sections(namespace, atom_id);\
CREATE VIRTUAL TABLE IF NOT EXISTS fts_sections \
USING fts5(\
id UNINDEXED,\
namespace UNINDEXED,\
atom_id UNINDEXED,\
section_type UNINDEXED,\
heading,\
content,\
content=knowledge_sections,\
content_rowid=rowid,\
tokenize='trigram case_sensitive 0'\
);\
CREATE TRIGGER IF NOT EXISTS fts_sections_ai \
AFTER INSERT ON knowledge_sections BEGIN \
INSERT INTO fts_sections(rowid, id, namespace, atom_id, section_type, heading, content) \
VALUES(new.rowid, new.id, new.namespace, new.atom_id, new.section_type, new.heading, new.content); \
END; \
CREATE TRIGGER IF NOT EXISTS fts_sections_ad \
AFTER DELETE ON knowledge_sections BEGIN \
INSERT INTO fts_sections(fts_sections, rowid, id, namespace, atom_id, section_type, heading, content) \
VALUES('delete', old.rowid, old.id, old.namespace, old.atom_id, old.section_type, old.heading, old.content); \
END; \
CREATE TRIGGER IF NOT EXISTS fts_sections_au \
AFTER UPDATE ON knowledge_sections BEGIN \
INSERT INTO fts_sections(fts_sections, rowid, id, namespace, atom_id, section_type, heading, content) \
VALUES('delete', old.rowid, old.id, old.namespace, old.atom_id, old.section_type, old.heading, old.content); \
INSERT INTO fts_sections(rowid, id, namespace, atom_id, section_type, heading, content) \
VALUES(new.rowid, new.id, new.namespace, new.atom_id, new.section_type, new.heading, new.content); \
END;\
";
pub const MIGRATIONS: &[VersionedMigration] = &[
VersionedMigration {
version: 1,
name: "initial_schema",
up: V1_UP,
},
VersionedMigration {
version: 2,
name: "add_name_to_notes",
up: "ALTER TABLE notes ADD COLUMN name TEXT;",
},
VersionedMigration {
version: 3,
name: "add_events_namespace_created_index",
up: "CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);",
},
VersionedMigration {
version: 4,
name: "dedupe_graph_edge_triples",
up: V4_DEDUPE_GRAPH_EDGE_TRIPLES,
},
VersionedMigration {
version: 5,
name: "add_entity_type_to_entities",
up: V5_ADD_ENTITY_TYPE_TO_ENTITIES,
},
VersionedMigration {
version: 6,
name: "reserved_adr043_embedding_pipeline_extensions",
up: "SELECT 1;",
},
VersionedMigration {
version: 7,
name: "reserved_adr046_event_sourced_proposals_index",
up: "SELECT 1;",
},
VersionedMigration {
version: 8,
name: "reserved_adr041_event_observations_and_session_id",
up: "SELECT 1;",
},
VersionedMigration {
version: 9,
name: "edge_lifecycle_and_target_backend",
up: V9_EDGE_LIFECYCLE_AND_TARGET_BACKEND,
},
VersionedMigration {
version: 10,
name: "note_status_and_nullable_metrics",
up: V10_NOTE_STATUS_AND_NULLABLE_METRICS,
},
VersionedMigration {
version: 11,
name: "entity_tombstone_columns",
up: V11_ENTITY_TOMBSTONE_COLUMNS,
},
VersionedMigration {
version: 12,
name: "nullable_note_metrics",
up: V12_NULLABLE_NOTE_METRICS,
},
VersionedMigration {
version: 13,
name: "event_observability_provenance",
up: V13_EVENT_OBSERVABILITY_PROVENANCE,
},
VersionedMigration {
version: 14,
name: "embedding_model_registry",
up: V14_EMBEDDING_MODEL_REGISTRY,
},
VersionedMigration {
version: 15,
name: "proposals_open",
up: V15_PROPOSALS_OPEN,
},
VersionedMigration {
version: 16,
name: "vector_embedding_model_tag",
up: V16_VECTOR_EMBEDDING_MODEL_TAG,
},
VersionedMigration {
version: 17,
name: "vector_embedding_model_tag_preserving_rebuild",
up: V17_VECTOR_EMBEDDING_MODEL_TAG_PRESERVING_REBUILD,
},
VersionedMigration {
version: 18,
name: "proposals_open_add_applying_status",
up: "__v18_computed_at_runtime__",
},
VersionedMigration {
version: 19,
name: "knowledge_atoms_and_domains",
up: V19_KNOWLEDGE_ATOMS_AND_DOMAINS,
},
VersionedMigration {
version: 20,
name: "brain_profile_persistence",
up: V20_BRAIN_PROFILE_PERSISTENCE,
},
VersionedMigration {
version: 21,
name: "knowledge_sections",
up: V21_KNOWLEDGE_SECTIONS,
},
VersionedMigration {
version: 22,
name: "knowledge_lifecycle_status",
up: V22_KNOWLEDGE_LIFECYCLE_STATUS,
},
];
const MIGRATION_TRACKING_TABLE: &str = "\
CREATE TABLE IF NOT EXISTS _schema_migrations (\
version INTEGER PRIMARY KEY,\
name TEXT NOT NULL,\
applied_at INTEGER NOT NULL\
);\
";
pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
for (i, m) in MIGRATIONS.iter().enumerate() {
let expected = (i + 1) as u32;
if m.version != expected {
return Err(SqliteError::InvalidData(format!(
"MIGRATIONS array is not contiguous: expected version {expected} at index {i}, \
got version {}",
m.version
)));
}
}
conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
let current_version: u32 = conn
.query_row(
"SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
[],
|row| row.get(0),
)
.unwrap_or(0);
let mut applied_version = current_version;
for migration in MIGRATIONS {
if migration.version <= current_version {
continue;
}
if migration.version == 2 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 5 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('entities') WHERE name = 'entity_type'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 10 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'status'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 11 {
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('entities') WHERE name = 'merged_into'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
if migration.version == 12 {
let already_nullable: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') \
WHERE name = 'salience' AND \"notnull\" = 0",
[],
|row| row.get(0),
)
.unwrap_or(false);
if already_nullable {
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT OR IGNORE INTO _schema_migrations (version, name, applied_at) \
VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
continue;
}
}
let tx = conn.transaction().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let up_sql = if migration.version == 13 {
build_v13_event_observability_sql(&tx).map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?
} else if migration.version == 14 {
build_v14_embedding_model_registry_sql(&tx).map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?
} else if migration.version == 16 {
build_v16_vector_embedding_model_tag_sql(&tx).map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?
} else if migration.version == 17 {
build_v17_preserving_rebuild_sql(&tx).map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?
} else if migration.version == 18 {
build_v18_proposals_applying_sql(&tx).map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?
} else {
migration.up.to_string()
};
tx.execute_batch(&up_sql)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let now = chrono::Utc::now().timestamp_micros();
tx.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.commit().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
applied_version = migration.version;
}
Ok(applied_version)
}
fn table_has_column(
conn: &Connection,
table: &'static str,
column: &'static str,
) -> Result<bool, rusqlite::Error> {
conn.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
rusqlite::params![table, column],
|row| row.get(0),
)
}
fn build_v13_event_observability_sql(conn: &Connection) -> Result<String, rusqlite::Error> {
let mut sql = String::new();
for (column, ddl) in [
(
"kind",
"ALTER TABLE events ADD COLUMN kind TEXT NOT NULL DEFAULT 'audit';",
),
(
"payload",
"ALTER TABLE events ADD COLUMN payload TEXT NOT NULL DEFAULT '{}';",
),
(
"payload_schema_version",
"ALTER TABLE events ADD COLUMN payload_schema_version INTEGER NOT NULL DEFAULT 1;",
),
(
"profile_state_version",
"ALTER TABLE events ADD COLUMN profile_state_version INTEGER;",
),
(
"session_id",
"ALTER TABLE events ADD COLUMN session_id TEXT;",
),
(
"aggregate_kind",
"ALTER TABLE events ADD COLUMN aggregate_kind TEXT;",
),
(
"aggregate_id",
"ALTER TABLE events ADD COLUMN aggregate_id TEXT;",
),
] {
if !table_has_column(conn, "events", column)? {
sql.push_str(ddl);
}
}
if table_has_column(conn, "events", "data")? && table_has_column(conn, "events", "payload")? {
sql.push_str("UPDATE events SET payload = data WHERE data IS NOT NULL AND data <> '';");
}
sql.push_str(
"CREATE TABLE IF NOT EXISTS event_observations (\
event_id TEXT NOT NULL,\
entity_id TEXT NOT NULL,\
referent_kind TEXT NOT NULL,\
role TEXT NOT NULL,\
position INTEGER NOT NULL,\
PRIMARY KEY (event_id, role, position)\
);\
CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);\
CREATE INDEX IF NOT EXISTS idx_events_session ON events(namespace, session_id, created_at, id);\
CREATE INDEX IF NOT EXISTS idx_events_ns_created_id ON events(namespace, created_at DESC, id DESC);\
CREATE INDEX IF NOT EXISTS idx_events_payload_proposal_id ON events(json_extract(payload, '$.proposal_id'));\
CREATE INDEX IF NOT EXISTS idx_event_obs_entity ON event_observations(entity_id, role);\
CREATE INDEX IF NOT EXISTS idx_event_obs_event_role ON event_observations(event_id, role);",
);
Ok(sql)
}
fn build_v14_embedding_model_registry_sql(conn: &Connection) -> Result<String, rusqlite::Error> {
let mut sql = String::from(EMBEDDING_MODELS_DDL);
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master \
WHERE type = 'table' \
AND name LIKE 'vec_%' \
AND sql NOT LIKE '%VIRTUAL%' \
AND sql NOT LIKE '%vec0%' \
AND name NOT LIKE '%\\_chunks' ESCAPE '\\' \
AND name NOT LIKE '%\\_rowids' ESCAPE '\\' \
AND name NOT LIKE '%\\_info' ESCAPE '\\' \
AND name NOT LIKE '%\\_vector\\_chunks%' ESCAPE '\\' \
AND name NOT LIKE '%\\_metadata%' ESCAPE '\\'",
)?;
let vec_tables: Vec<String> = stmt
.query_map([], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
for table in &vec_tables {
let valid = table.starts_with("vec_")
&& table[4..]
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_');
if !valid {
continue;
}
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = 'embedding_model_id'",
rusqlite::params![table],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
continue;
}
sql.push_str(&format!(
"ALTER TABLE {t} ADD COLUMN embedding_model_id BLOB REFERENCES _embedding_models(id);\
CREATE INDEX IF NOT EXISTS idx_{t}_model ON {t}(embedding_model_id);",
t = table,
));
}
Ok(sql)
}
fn build_v16_vector_embedding_model_tag_sql(conn: &Connection) -> Result<String, rusqlite::Error> {
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master \
WHERE type = 'table' \
AND name LIKE 'vec_%' \
AND sql NOT LIKE '%VIRTUAL%' \
AND sql NOT LIKE '%vec0%' \
AND name NOT LIKE '%\\_chunks' ESCAPE '\\' \
AND name NOT LIKE '%\\_rowids' ESCAPE '\\' \
AND name NOT LIKE '%\\_info' ESCAPE '\\' \
AND name NOT LIKE '%\\_vector\\_chunks%' ESCAPE '\\' \
AND name NOT LIKE '%\\_metadata%' ESCAPE '\\'",
)?;
let vec_tables: Vec<String> = stmt
.query_map([], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
let mut sql = String::new();
for table in vec_tables {
let valid = table.starts_with("vec_")
&& table[4..]
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_');
if !valid {
continue;
}
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = 'embedding_model'",
rusqlite::params![&table],
|row| row.get(0),
)
.unwrap_or(false);
if col_exists {
continue;
}
sql.push_str(&format!(
"ALTER TABLE {t} ADD COLUMN embedding_model TEXT NOT NULL DEFAULT 'all-minilm-l6-v2';\
CREATE INDEX IF NOT EXISTS idx_{t}_subject_model ON {t}(subject_id, embedding_model);",
t = table,
));
}
if sql.is_empty() {
sql.push_str("SELECT 1;");
}
Ok(sql)
}
fn infer_model_from_table_name(table: &str) -> String {
let suffix = table.strip_prefix("vec_").unwrap_or("");
if !suffix.is_empty()
&& suffix
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_')
{
suffix.to_string()
} else {
"all-minilm-l6-v2".to_string()
}
}
pub fn build_v17_preserving_rebuild_sql(conn: &Connection) -> Result<String, rusqlite::Error> {
let mut stmt = conn.prepare(
"SELECT name, sql FROM sqlite_master \
WHERE type = 'table' \
AND name LIKE 'vec_%' \
AND sql LIKE '%VIRTUAL%' \
AND sql LIKE '%vec0%' \
AND name NOT LIKE '%\\_chunks' ESCAPE '\\' \
AND name NOT LIKE '%\\_rowids' ESCAPE '\\' \
AND name NOT LIKE '%\\_info' ESCAPE '\\' \
AND name NOT LIKE '%\\_vector\\_chunks%' ESCAPE '\\' \
AND name NOT LIKE '%\\_metadata%' ESCAPE '\\'",
)?;
let virtual_tables: Vec<(String, Option<String>)> = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.filter_map(|r| r.ok())
.collect();
let mut sql = String::new();
for (table, ddl_opt) in &virtual_tables {
let valid = table.starts_with("vec_")
&& table[4..]
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_');
if !valid {
continue;
}
let mut has_field = false;
let mut has_embedding_model = false;
let pragma = format!("PRAGMA table_xinfo({})", table);
let mut col_stmt = conn.prepare(&pragma)?;
let mut col_rows = col_stmt.query([])?;
while let Some(row) = col_rows.next()? {
let name: String = row.get(1)?;
match name.as_str() {
"field" => has_field = true,
"embedding_model" => has_embedding_model = true,
_ => {}
}
}
if has_field && has_embedding_model {
continue;
}
let dims = ddl_opt.as_deref().and_then(|ddl| {
let lower = ddl.to_ascii_lowercase();
let start = lower.find("float[")?;
let rest = &lower[start + 6..];
let end = rest.find(']')?;
rest[..end].trim().parse::<u32>().ok()
});
let dim = match dims {
Some(d) => d,
None => continue,
};
let inferred_model = infer_model_from_table_name(table);
let tmp = format!("tmp_{}", table);
let field_expr = if has_field {
"field".to_string()
} else {
"'' AS field".to_string()
};
let model_expr = if has_embedding_model {
"embedding_model".to_string()
} else {
format!("'{}' AS embedding_model", inferred_model)
};
sql.push_str(&format!(
"CREATE TABLE {tmp} (\
subject_id TEXT PRIMARY KEY, \
namespace TEXT NOT NULL, \
kind TEXT NOT NULL, \
field TEXT NOT NULL, \
embedding_model TEXT NOT NULL, \
embedding BLOB NOT NULL\
);",
tmp = tmp,
));
sql.push_str(&format!(
"INSERT INTO {tmp} (subject_id, namespace, kind, field, embedding_model, embedding) \
SELECT subject_id, namespace, kind, {field_expr}, {model_expr}, embedding \
FROM {table};",
tmp = tmp,
field_expr = field_expr,
model_expr = model_expr,
table = table,
));
sql.push_str(&format!("DROP TABLE {table};", table = table));
sql.push_str(&format!(
"CREATE VIRTUAL TABLE {table} USING vec0(\
subject_id TEXT PRIMARY KEY, \
namespace TEXT NOT NULL, \
kind TEXT NOT NULL, \
field TEXT NOT NULL, \
embedding_model TEXT NOT NULL, \
embedding float[{dim}] distance_metric=cosine\
);",
table = table,
dim = dim,
));
sql.push_str(&format!(
"INSERT INTO {table} (subject_id, namespace, kind, field, embedding_model, embedding) \
SELECT subject_id, namespace, kind, field, embedding_model, embedding \
FROM {tmp};",
table = table,
tmp = tmp,
));
sql.push_str(&format!("DROP TABLE {tmp};", tmp = tmp));
}
if sql.is_empty() {
sql.push_str("SELECT 1;");
}
Ok(sql)
}
#[derive(Clone, Debug)]
pub struct EmbeddingModelRegistryRecord {
pub engine_name: String,
pub model_id: String,
pub key_version: String,
pub dimensions: u32,
pub status: String,
pub activated_at: Option<i64>,
pub superseded_at: Option<i64>,
}
pub fn query_embedding_models(
db: Option<&std::path::Path>,
engine_filter: Option<&str>,
) -> Result<Vec<EmbeddingModelRegistryRecord>, SqliteError> {
let path = db.map(std::path::Path::to_path_buf).unwrap_or_else(|| {
std::env::var("HOME")
.map(std::path::PathBuf::from)
.unwrap_or_else(|_| std::path::PathBuf::from("."))
.join(".khive/khive-graph.db")
});
if !path.exists() {
return Ok(Vec::new());
}
let conn = Connection::open(path)?;
let exists: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master \
WHERE type='table' AND name='_embedding_models'",
[],
|row| row.get(0),
)?;
if !exists {
return Ok(Vec::new());
}
let sql = if engine_filter.is_some() {
"SELECT engine_name, model_id, key_version, dim, status, activated_at, superseded_at \
FROM _embedding_models WHERE engine_name = ?1 \
ORDER BY engine_name, activated_at IS NULL, activated_at"
} else {
"SELECT engine_name, model_id, key_version, dim, status, activated_at, superseded_at \
FROM _embedding_models \
ORDER BY engine_name, activated_at IS NULL, activated_at"
};
let mut stmt = conn.prepare(sql)?;
let map_row = |row: &rusqlite::Row<'_>| {
Ok(EmbeddingModelRegistryRecord {
engine_name: row.get(0)?,
model_id: row.get(1)?,
key_version: row.get(2)?,
dimensions: row.get::<_, i64>(3)? as u32,
status: row.get(4)?,
activated_at: row.get(5)?,
superseded_at: row.get(6)?,
})
};
if let Some(engine) = engine_filter {
stmt.query_map([engine], map_row)?
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)
} else {
stmt.query_map([], map_row)?
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)
}
}
pub(crate) fn build_v18_proposals_applying_sql(
conn: &Connection,
) -> Result<String, rusqlite::Error> {
let table_exists: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='proposals_open'",
[],
|row| row.get(0),
)?;
if !table_exists {
return Ok("SELECT 1;".to_string());
}
let ddl: String = conn.query_row(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='proposals_open'",
[],
|row| row.get(0),
)?;
if ddl.contains("'applying'") {
return Ok("SELECT 1;".to_string());
}
Ok("\
ALTER TABLE proposals_open RENAME TO proposals_open_v15;\
CREATE TABLE proposals_open (\
proposal_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
proposer TEXT NOT NULL,\
title TEXT NOT NULL,\
status TEXT NOT NULL CHECK (status IN ('open', 'changes_requested', 'approved', 'applying', 'rejected', 'applied', 'withdrawn')),\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
expiry INTEGER,\
last_decision TEXT,\
review_count INTEGER NOT NULL DEFAULT 0,\
approve_count INTEGER NOT NULL DEFAULT 0,\
reject_count INTEGER NOT NULL DEFAULT 0\
);\
INSERT INTO proposals_open \
SELECT proposal_id, namespace, proposer, title, status, created_at, updated_at, \
expiry, last_decision, review_count, approve_count, reject_count \
FROM proposals_open_v15;\
DROP TABLE proposals_open_v15;\
CREATE INDEX IF NOT EXISTS idx_proposals_open_ns_status ON proposals_open(namespace, status);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_proposer ON proposals_open(namespace, proposer);\
CREATE INDEX IF NOT EXISTS idx_proposals_open_updated ON proposals_open(namespace, updated_at DESC);\
"
.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
fn open_memory() -> Connection {
Connection::open_in_memory().expect("in-memory connection")
}
#[test]
fn fresh_db_migrates_to_latest() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 22);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22)",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 22);
let tbl_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='entities'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(tbl_count, 1);
let col_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(col_count, 1, "V2 must add name column to notes");
let et_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('entities') WHERE name = 'entity_type'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(et_count, 1, "V5 must add entity_type column to entities");
let idx_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' \
AND name='idx_entities_kind_entity_type'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(idx_count, 1, "V5 must create idx_entities_kind_entity_type");
let status_col: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('notes') WHERE name = 'status'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(status_col, 1, "V10 must add status column to notes");
let merged_into_col: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('entities') WHERE name = 'merged_into'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
merged_into_col, 1,
"V11 must add merged_into column to entities"
);
let salience_notnull: i64 = conn
.query_row(
"SELECT \"notnull\" FROM pragma_table_info('notes') WHERE name = 'salience'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(salience_notnull, 0, "V12 must make salience nullable");
for col in [
"kind",
"payload",
"payload_schema_version",
"profile_state_version",
"session_id",
"aggregate_kind",
"aggregate_id",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('events') WHERE name = ?1",
[col],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V13 must add events.{col}");
}
let obs_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='event_observations'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(obs_tbl, 1, "V13 must create event_observations table");
for idx in [
"idx_events_ns_created_id",
"idx_events_session",
"idx_events_payload_proposal_id",
"idx_event_obs_entity",
"idx_event_obs_event_role",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name=?1",
[idx],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V13 must create index {idx}");
}
let embed_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='_embedding_models'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(embed_tbl, 1, "V14 must create _embedding_models table");
for idx in [
"idx_embed_models_one_active",
"idx_embed_models_engine_status",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name=?1",
[idx],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V14 must create index {idx}");
}
let proposals_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='proposals_open'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(proposals_tbl, 1, "V15 must create proposals_open table");
for idx in [
"idx_proposals_open_ns_status",
"idx_proposals_open_proposer",
"idx_proposals_open_updated",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name=?1",
[idx],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V15 must create index {idx}");
}
let snap_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='brain_profile_snapshots'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(snap_tbl, 1, "V20 must create brain_profile_snapshots table");
let log_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='brain_event_log'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(log_tbl, 1, "V20 must create brain_event_log table");
let sections_tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='knowledge_sections'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(sections_tbl, 1, "V21 must create knowledge_sections table");
for idx in [
"idx_knowledge_sections_atom",
"idx_knowledge_sections_ns_type",
"idx_knowledge_sections_ns_atom",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name=?1",
[idx],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V21 must create index {idx}");
}
for col in [
"id",
"atom_id",
"namespace",
"section_type",
"heading",
"content",
"tokens",
"sort_order",
"embedding",
"created_at",
"updated_at",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('knowledge_sections') WHERE name = ?1",
[col],
|r| r.get(0),
)
.unwrap();
assert!(exists, "V21 must add knowledge_sections.{col}");
}
}
#[test]
fn run_migrations_twice_is_idempotent() {
let mut conn = open_memory();
let v1 = run_migrations(&mut conn).expect("first run");
let v2 = run_migrations(&mut conn).expect("second run");
assert_eq!(v1, 22);
assert_eq!(v2, 22);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM _schema_migrations", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 22);
}
#[test]
fn migration_v9_adds_target_backend_index() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(
version, 22,
"F052: latest migration must be V22 (knowledge_lifecycle_status)"
);
let col: i64 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('graph_edges') WHERE name = 'target_backend'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
col, 1,
"F052: graph_edges must have target_backend column after V9 migration"
);
let idx: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_graph_edges_target_backend'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
idx, 1,
"F052: idx_graph_edges_target_backend partial index must exist after V9 migration"
);
}
#[test]
fn failed_migration_rolls_back() {
let bad_v23 = VersionedMigration {
version: 23,
name: "bad_migration",
up: "THIS IS NOT VALID SQL;",
};
let mut conn = open_memory();
run_migrations(&mut conn).expect("V1..V22 should apply cleanly");
let result = apply_single_migration(&mut conn, &bad_v23);
assert!(result.is_err(), "bad migration should return error");
let v23_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 23",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(v23_count, 0, "V23 must not be recorded after rollback");
let applied_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22)",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
applied_count, 22,
"V1..V22 must still be recorded after V23 rollback"
);
}
#[test]
fn store_ddl_then_migrations_is_idempotent() {
use crate::stores::entity::ensure_entities_schema;
use crate::stores::note::ensure_notes_schema;
let mut conn = open_memory();
ensure_notes_schema(&conn).expect("store DDL should create notes");
ensure_entities_schema(&conn).expect("store DDL should create entities");
let has_name: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('notes') WHERE name = 'name'",
[],
|row| row.get(0),
)
.unwrap();
assert!(has_name, "NOTES_DDL should include name column");
let version = run_migrations(&mut conn).expect("migrations after store DDL");
assert_eq!(version, 22);
let v2_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 2",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v2_count, 1,
"V2 must be recorded even when column pre-exists"
);
let v5_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 5",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v5_count, 1,
"V5 must be recorded even when entity_type column pre-exists"
);
let v9_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 9",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v9_count, 1,
"V9 must be recorded after store-DDL + migrations"
);
let v10_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 10",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v10_count, 1,
"V10 must be recorded even when status column pre-exists via NOTES_DDL"
);
let v11_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 11",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v11_count, 1,
"V11 must be recorded even when merged_into column pre-exists via ENTITIES_DDL"
);
let v12_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 12",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v12_count, 1,
"V12 must be recorded even when salience is already nullable via NOTES_DDL"
);
let v13_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 13",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v13_count, 1,
"V13 must be recorded after store-DDL + migrations"
);
let v14_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 14",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v14_count, 1,
"V14 must be recorded after store-DDL + migrations"
);
let v15_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 15",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v15_count, 1,
"V15 must be recorded after store-DDL + migrations"
);
let v21_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 21",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
v21_count, 1,
"V21 must be recorded after store-DDL + migrations"
);
}
#[test]
fn v1_to_v12_allows_null_salience() {
let mut conn = open_memory();
conn.execute_batch(MIGRATION_TRACKING_TABLE).unwrap();
conn.execute_batch(
"CREATE TABLE entities (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
name TEXT NOT NULL,\
description TEXT,\
properties TEXT,\
tags TEXT NOT NULL DEFAULT '[]',\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE TABLE graph_edges (\
namespace TEXT NOT NULL,\
id TEXT NOT NULL,\
source_id TEXT NOT NULL,\
target_id TEXT NOT NULL,\
relation TEXT NOT NULL,\
weight REAL NOT NULL DEFAULT 1.0,\
created_at INTEGER NOT NULL,\
metadata TEXT,\
PRIMARY KEY (namespace, id)\
);\
CREATE TABLE notes (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
content TEXT NOT NULL DEFAULT '',\
salience REAL NOT NULL DEFAULT 0.5,\
decay_factor REAL NOT NULL DEFAULT 0.0,\
expires_at INTEGER,\
properties TEXT,\
created_at INTEGER NOT NULL,\
updated_at INTEGER NOT NULL,\
deleted_at INTEGER\
);\
CREATE TABLE events (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
verb TEXT NOT NULL,\
substrate TEXT NOT NULL,\
actor TEXT NOT NULL,\
outcome TEXT NOT NULL,\
data TEXT,\
duration_us INTEGER NOT NULL DEFAULT 0,\
target_id TEXT,\
created_at INTEGER NOT NULL\
);",
)
.unwrap();
let now = chrono::Utc::now().timestamp_micros();
conn.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (1, 'initial_schema', ?1)",
rusqlite::params![now],
)
.unwrap();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 22);
let notnull: i64 = conn
.query_row(
"SELECT \"notnull\" FROM pragma_table_info('notes') WHERE name = 'salience'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(notnull, 0, "salience must be nullable after V12");
conn.execute(
"INSERT INTO notes (id, namespace, kind, status, content, created_at, updated_at) \
VALUES ('test-id', 'ns', 'observation', 'active', '', 1, 1)",
[],
)
.expect("inserting note with NULL salience must succeed after V12");
let stored_salience: Option<f64> = conn
.query_row(
"SELECT salience FROM notes WHERE id = 'test-id'",
[],
|row| row.get(0),
)
.unwrap();
assert!(
stored_salience.is_none(),
"salience must be NULL when not supplied"
);
}
#[test]
fn store_ddl_then_event_migration_is_idempotent() {
use crate::stores::event::ensure_events_schema;
let mut conn = open_memory();
ensure_events_schema(&conn).expect("store DDL should create events");
let version = run_migrations(&mut conn).expect("migrations after events store DDL");
assert_eq!(version, 22, "must reach V22 even when events DDL ran first");
let v13_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 13",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v13_count, 1, "V13 must be recorded");
let v14_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 14",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v14_count, 1, "V14 must be recorded");
let v15_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 15",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v15_count, 1, "V15 must be recorded");
}
#[test]
fn migration_v14_creates_embedding_model_registry() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(
version, 22,
"F227: latest migration must be V22 (knowledge_lifecycle_status)"
);
let tbl: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='_embedding_models'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(tbl, 1, "F227: _embedding_models table must exist after V14");
let one_active_idx: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_embed_models_one_active'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
one_active_idx, 1,
"V14 must create idx_embed_models_one_active partial unique index"
);
let engine_status_idx: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_embed_models_engine_status'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
engine_status_idx, 1,
"V14 must create idx_embed_models_engine_status index"
);
for col in [
"id",
"engine_name",
"model_id",
"key_version",
"dim",
"output_dim",
"status",
"activated_at",
"superseded_at",
"superseded_by",
"canonical_key",
"created_at",
] {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('_embedding_models') WHERE name = ?1",
[col],
|r| r.get(0),
)
.unwrap();
assert!(
exists,
"F227: _embedding_models must have column '{col}' after V14"
);
}
}
#[test]
fn migration_v14_adds_embedding_model_id_to_existing_regular_vec_tables() {
let mut conn = open_memory();
conn.execute_batch(
"CREATE TABLE vec_legacy_model (\
subject_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
field TEXT NOT NULL\
);",
)
.unwrap();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 22);
let col_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('vec_legacy_model') WHERE name = 'embedding_model_id'",
[],
|r| r.get(0),
)
.unwrap();
assert!(
col_exists,
"F228: V14 must add embedding_model_id to existing regular vec_ tables"
);
let version2 = run_migrations(&mut conn).expect("second run must succeed");
assert_eq!(version2, 22);
}
#[test]
fn migration_v14_does_not_alter_sqlite_vec_shadow_tables() {
let mut conn = open_memory();
conn.execute_batch(
"CREATE TABLE vec_test_chunks (x INTEGER);\
CREATE TABLE vec_test_rowids (x INTEGER);\
CREATE TABLE vec_test_info (x INTEGER);\
CREATE TABLE vec_test_vector_chunks00 (x INTEGER);",
)
.unwrap();
let version = run_migrations(&mut conn).expect("migrations should succeed");
assert_eq!(version, 22);
for shadow in [
"vec_test_chunks",
"vec_test_rowids",
"vec_test_info",
"vec_test_vector_chunks00",
] {
let col_added: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) \
WHERE name = 'embedding_model_id'",
rusqlite::params![shadow],
|r| r.get(0),
)
.unwrap();
assert!(
!col_added,
"CRIT-2: V14 must NOT add embedding_model_id to sqlite-vec shadow table '{shadow}'"
);
}
}
#[test]
fn v17_preserving_rebuild_preserves_rows_and_infers_model() {
let conn = open_memory();
conn.execute_batch(MIGRATION_TRACKING_TABLE).unwrap();
assert_eq!(
infer_model_from_table_name("vec_paraphrase"),
"paraphrase",
"suffix 'paraphrase' should be returned as-is"
);
assert_eq!(
infer_model_from_table_name("vec_all_minilm_l6_v2"),
"all_minilm_l6_v2",
"underscore-containing suffix should be returned as-is"
);
conn.execute_batch(
"CREATE TABLE vec_paraphrase (\
subject_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
embedding BLOB NOT NULL\
);",
)
.unwrap();
conn.execute_batch(
"INSERT INTO vec_paraphrase (subject_id, namespace, kind, embedding) \
VALUES ('id-1', 'ns', 'entity', X'0000803F');",
)
.unwrap();
conn.execute_batch(
"CREATE TABLE tmp_vec_paraphrase (\
subject_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
field TEXT NOT NULL,\
embedding_model TEXT NOT NULL,\
embedding BLOB NOT NULL\
);\
INSERT INTO tmp_vec_paraphrase \
(subject_id, namespace, kind, field, embedding_model, embedding) \
SELECT subject_id, namespace, kind, '' AS field, 'paraphrase' AS embedding_model, embedding \
FROM vec_paraphrase;\
DROP TABLE vec_paraphrase;\
CREATE TABLE vec_paraphrase (\
subject_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
field TEXT NOT NULL,\
embedding_model TEXT NOT NULL,\
embedding BLOB NOT NULL\
);\
INSERT INTO vec_paraphrase \
(subject_id, namespace, kind, field, embedding_model, embedding) \
SELECT subject_id, namespace, kind, field, embedding_model, embedding \
FROM tmp_vec_paraphrase;\
DROP TABLE tmp_vec_paraphrase;",
)
.unwrap();
let (ns, model): (String, String) = conn
.query_row(
"SELECT namespace, embedding_model FROM vec_paraphrase WHERE subject_id = 'id-1'",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap();
assert_eq!(ns, "ns");
assert_eq!(
model, "paraphrase",
"V17 must infer model 'paraphrase' from table name 'vec_paraphrase'"
);
}
#[test]
fn v17_infer_model_known_suffix() {
assert_eq!(infer_model_from_table_name("vec_paraphrase"), "paraphrase");
}
#[test]
fn v17_infer_model_fallback_for_unknown_suffix() {
assert_eq!(
infer_model_from_table_name("vec_"),
"all-minilm-l6-v2",
"empty suffix must fall back to all-minilm-l6-v2"
);
assert_eq!(
infer_model_from_table_name("other_table"),
"all-minilm-l6-v2",
"non-vec_ prefix must fall back to all-minilm-l6-v2"
);
}
#[test]
fn v17_migration_is_noop_on_fresh_db() {
let mut conn = open_memory();
let version = run_migrations(&mut conn).expect("migrations must succeed on fresh DB");
assert_eq!(version, 22);
let v17: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 17",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v17, 1, "V17 must be recorded on fresh DB");
let v18: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _schema_migrations WHERE version = 18",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v18, 1, "V18 must be recorded on fresh DB");
}
#[test]
fn v17_skips_tables_that_already_have_both_columns() {
let conn = open_memory();
conn.execute_batch(
"CREATE TABLE vec_modern (\
subject_id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
kind TEXT NOT NULL,\
field TEXT NOT NULL,\
embedding_model TEXT NOT NULL DEFAULT 'all-minilm-l6-v2',\
embedding BLOB NOT NULL\
);\
INSERT INTO vec_modern VALUES ('id-2', 'ns', 'entity', 'content', 'my-model', X'00');",
)
.unwrap();
let sql = build_v17_preserving_rebuild_sql(&conn).unwrap();
assert_eq!(
sql, "SELECT 1;",
"V17 must produce no-op SQL when no vec0 virtual tables need rebuilding"
);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM vec_modern", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1);
}
fn apply_single_migration(
conn: &mut Connection,
migration: &VersionedMigration,
) -> Result<(), SqliteError> {
let tx = conn.transaction().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.execute_batch(migration.up)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
let now = chrono::Utc::now().timestamp_micros();
tx.execute(
"INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
rusqlite::params![migration.version, migration.name, now],
)
.map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
tx.commit().map_err(|e| SqliteError::Migration {
version: migration.version,
error: e.to_string(),
})?;
Ok(())
}
}