1use rusqlite::Connection;
10
11use crate::error::SqliteError;
12
13pub struct Migration {
19 pub id: &'static str,
21 pub up_sql: &'static str,
23 pub down_sql: Option<&'static str>,
25 pub is_already_applied: Option<fn(&Connection) -> bool>,
28}
29
30pub struct ServiceSchemaPlan {
32 pub service: &'static str,
34 pub sqlite: &'static [Migration],
36 pub postgres: &'static [Migration],
38}
39
40const SCHEMA_VERSION_TABLE: &str = include_str!("../sql/schema-version-table.sql");
41
42pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
44 conn.execute_batch(SCHEMA_VERSION_TABLE)?;
45
46 for migration in plan.sqlite {
47 if let Some(check) = migration.is_already_applied {
49 if check(conn) {
50 continue;
51 }
52 }
53
54 let already: bool = conn.query_row(
56 "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
57 rusqlite::params![plan.service, migration.id],
58 |row| row.get(0),
59 )?;
60
61 if already {
62 continue;
63 }
64
65 conn.execute_batch(migration.up_sql)?;
67
68 conn.execute(
70 "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
71 rusqlite::params![
72 plan.service,
73 migration.id,
74 chrono::Utc::now().timestamp_micros(),
75 ],
76 )?;
77 }
78
79 Ok(())
80}
81
82pub struct VersionedMigration {
92 pub version: u32,
94 pub name: &'static str,
96 pub up: &'static str,
99}
100
101const V1_UP: &str = include_str!("../sql/schema.sql");
104
105const V2_UP: &str = include_str!("../sql/002-narrow-fts-sections-update-trigger.sql");
106
107const V3_UP: &str = include_str!("../sql/003-backfill-domain-mirror-atoms.sql");
108
109pub const EMBEDDING_MODELS_DDL: &str = include_str!("../sql/embedding-models-ddl.sql");
115
116pub const MIGRATIONS: &[VersionedMigration] = &[
118 VersionedMigration {
119 version: 1,
120 name: "initial_schema",
121 up: V1_UP,
122 },
123 VersionedMigration {
124 version: 2,
125 name: "narrow_fts_sections_update_trigger",
126 up: V2_UP,
127 },
128 VersionedMigration {
129 version: 3,
130 name: "backfill_domain_mirror_atoms",
131 up: V3_UP,
132 },
133];
134
135const MIGRATION_TRACKING_TABLE: &str = include_str!("../sql/schema-migrations-table.sql");
136
137pub fn read_schema_version(conn: &Connection) -> u32 {
143 conn.query_row(
144 "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
145 [],
146 |row| row.get(0),
147 )
148 .unwrap_or(0)
149}
150
151pub fn inspect_schema_version(path: &std::path::Path) -> Result<u32, SqliteError> {
156 let conn = Connection::open_with_flags(
157 path,
158 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
159 )?;
160 Ok(read_schema_version(&conn))
161}
162
163pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
164 conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
165
166 let current_version: u32 = conn
167 .query_row(
168 "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
169 [],
170 |row| row.get(0),
171 )
172 .unwrap_or(0);
173
174 let latest_version = MIGRATIONS.last().map(|m| m.version).unwrap_or(0);
180 if current_version > latest_version {
181 return Err(SqliteError::InvalidData(format!(
182 "database schema version {current_version} is ahead of the latest known migration \
183 {latest_version}. This database predates the consolidated baseline (ADR-015) or was \
184 written by a newer build. Recreate it from the current schema; in-place downgrade is \
185 not supported."
186 )));
187 }
188
189 let mut applied_version = current_version;
190
191 for migration in MIGRATIONS {
192 if migration.version <= current_version {
193 continue;
194 }
195
196 let tx = conn.transaction().map_err(|e| SqliteError::Migration {
197 version: migration.version,
198 error: e.to_string(),
199 })?;
200
201 tx.execute_batch(migration.up)
202 .map_err(|e| SqliteError::Migration {
203 version: migration.version,
204 error: e.to_string(),
205 })?;
206
207 let now = chrono::Utc::now().timestamp_micros();
208 tx.execute(
209 "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
210 rusqlite::params![migration.version, migration.name, now],
211 )
212 .map_err(|e| SqliteError::Migration {
213 version: migration.version,
214 error: e.to_string(),
215 })?;
216
217 tx.commit().map_err(|e| SqliteError::Migration {
218 version: migration.version,
219 error: e.to_string(),
220 })?;
221
222 applied_version = migration.version;
223 }
224
225 Ok(applied_version)
226}
227
228#[derive(Debug)]
229pub struct EmbeddingModelRegistryRecord {
230 pub engine_name: String,
232 pub model_id: String,
234 pub key_version: String,
236 pub dimensions: u32,
238 pub status: String,
240 pub activated_at: Option<i64>,
242 pub superseded_at: Option<i64>,
244}
245
246pub fn query_embedding_models(
252 db: Option<&std::path::Path>,
253 engine_filter: Option<&str>,
254) -> Result<Vec<EmbeddingModelRegistryRecord>, SqliteError> {
255 let path = db.map(std::path::Path::to_path_buf).unwrap_or_else(|| {
256 std::env::var("HOME")
257 .map(std::path::PathBuf::from)
258 .unwrap_or_else(|_| std::path::PathBuf::from("."))
259 .join(".khive/khive.db")
260 });
261 if !path.exists() {
262 return Ok(Vec::new());
263 }
264 let conn = Connection::open(path)?;
265 query_embedding_models_conn(&conn, engine_filter)
266}
267
268pub(crate) fn query_embedding_models_conn(
272 conn: &Connection,
273 engine_filter: Option<&str>,
274) -> Result<Vec<EmbeddingModelRegistryRecord>, SqliteError> {
275 let exists: bool = conn.query_row(
276 "SELECT COUNT(*) > 0 FROM sqlite_master \
277 WHERE type='table' AND name='_embedding_models'",
278 [],
279 |row| row.get(0),
280 )?;
281 if !exists {
282 return Ok(Vec::new());
283 }
284
285 let sql = if engine_filter.is_some() {
286 "SELECT engine_name, model_id, key_version, dim, status, activated_at, superseded_at \
287 FROM _embedding_models WHERE engine_name = ?1 \
288 ORDER BY engine_name, activated_at IS NULL, activated_at"
289 } else {
290 "SELECT engine_name, model_id, key_version, dim, status, activated_at, superseded_at \
291 FROM _embedding_models \
292 ORDER BY engine_name, activated_at IS NULL, activated_at"
293 };
294 let mut stmt = conn.prepare(sql)?;
295 let map_row = |row: &rusqlite::Row<'_>| {
296 let dim_raw: i64 = row.get(3)?;
297 let dimensions = u32::try_from(dim_raw).map_err(|_| {
298 rusqlite::Error::FromSqlConversionFailure(
299 3,
300 rusqlite::types::Type::Integer,
301 Box::new(std::io::Error::other(format!(
302 "_embedding_models.dim value {dim_raw} is outside the valid u32 range [0, {}]",
303 u32::MAX,
304 ))),
305 )
306 })?;
307 Ok(EmbeddingModelRegistryRecord {
308 engine_name: row.get(0)?,
309 model_id: row.get(1)?,
310 key_version: row.get(2)?,
311 dimensions,
312 status: row.get(4)?,
313 activated_at: row.get(5)?,
314 superseded_at: row.get(6)?,
315 })
316 };
317
318 if let Some(engine) = engine_filter {
319 stmt.query_map([engine], map_row)?
320 .collect::<Result<Vec<_>, _>>()
321 .map_err(Into::into)
322 } else {
323 stmt.query_map([], map_row)?
324 .collect::<Result<Vec<_>, _>>()
325 .map_err(Into::into)
326 }
327}
328
329#[cfg(test)]
334#[path = "migrations_tests.rs"]
335mod tests;