Skip to main content

khive_db/
migrations.rs

1//! Schema migration system for the SQLite storage layer.
2//!
3//! Two APIs coexist:
4//! - **Legacy per-service migrations** (`ServiceSchemaPlan` / `apply_schema_plan`):
5//!   used by pack-scoped schemas.
6//! - **Versioned migrations** (`MIGRATIONS` / `run_migrations`): the forward-only
7//!   migration pipeline for the core tables.
8
9use rusqlite::Connection;
10
11use crate::error::SqliteError;
12
13// =============================================================================
14// Legacy per-service migration API (preserved for backward compatibility)
15// =============================================================================
16
17/// A single legacy migration step within a `ServiceSchemaPlan`.
18pub struct Migration {
19    /// Unique identifier for this migration.
20    pub id: &'static str,
21    /// SQL to apply (forward direction).
22    pub up_sql: &'static str,
23    /// SQL to revert (optional).
24    pub down_sql: Option<&'static str>,
25    /// Optional predicate: returns true if migration was already applied
26    /// through a mechanism other than the migration tracker.
27    pub is_already_applied: Option<fn(&Connection) -> bool>,
28}
29
30/// A pack-scoped schema plan containing migrations for SQLite and Postgres.
31pub struct ServiceSchemaPlan {
32    /// Service name used as a key in the `_schema_versions` tracking table.
33    pub service: &'static str,
34    /// SQLite-specific migration steps, applied in order.
35    pub sqlite: &'static [Migration],
36    /// Postgres-specific migration steps (reserved for future use).
37    pub postgres: &'static [Migration],
38}
39
40const SCHEMA_VERSION_TABLE: &str = include_str!("../sql/schema-version-table.sql");
41
42/// Apply a pack-scoped schema plan, tracking each migration in `_schema_versions`.
43pub fn apply_schema_plan(conn: &Connection, plan: &ServiceSchemaPlan) -> Result<(), SqliteError> {
44    conn.execute_batch(SCHEMA_VERSION_TABLE)?;
45
46    for migration in plan.sqlite {
47        // Check if custom predicate says it's already applied
48        if let Some(check) = migration.is_already_applied {
49            if check(conn) {
50                continue;
51            }
52        }
53
54        // Check if tracked as applied
55        let already: bool = conn.query_row(
56            "SELECT COUNT(*) > 0 FROM _schema_versions WHERE service = ?1 AND migration_id = ?2",
57            rusqlite::params![plan.service, migration.id],
58            |row| row.get(0),
59        )?;
60
61        if already {
62            continue;
63        }
64
65        // Apply
66        conn.execute_batch(migration.up_sql)?;
67
68        // Record
69        conn.execute(
70            "INSERT INTO _schema_versions (service, migration_id, applied_at) VALUES (?1, ?2, ?3)",
71            rusqlite::params![
72                plan.service,
73                migration.id,
74                chrono::Utc::now().timestamp_micros(),
75            ],
76        )?;
77    }
78
79    Ok(())
80}
81
82// =============================================================================
83// Versioned migration system
84// =============================================================================
85
86/// A single forward-only schema migration.
87///
88/// Migrations are applied in order from the current DB version to the target
89/// version. Each migration runs in its own transaction; a failure rolls back
90/// that migration and leaves the DB at the prior version.
91pub struct VersionedMigration {
92    /// Monotonically increasing version number, starting at 1.
93    pub version: u32,
94    /// Short human-readable name for the migration (used in the audit table).
95    pub name: &'static str,
96    /// SQL to apply this migration. May contain multiple statements separated
97    /// by semicolons; `execute_batch` runs them all.
98    pub up: &'static str,
99}
100
101// V1: complete schema, loaded from sql/schema.sql.
102// Fresh-start repo (v0.2.8) — all schema in one migration, no incremental versions.
103const V1_UP: &str = include_str!("../sql/schema.sql");
104
105const V2_UP: &str = include_str!("../sql/002-narrow-fts-sections-update-trigger.sql");
106
107const V3_UP: &str = include_str!("../sql/003-backfill-domain-mirror-atoms.sql");
108
109/// DDL for the `_embedding_models` registry table.
110///
111/// Shared between the V1 schema and the belt-and-suspenders creation in
112/// `StorageBackend::vectors_for_namespace`. Both sites reference this constant so
113/// the schema cannot silently diverge if the registry evolves.
114pub const EMBEDDING_MODELS_DDL: &str = include_str!("../sql/embedding-models-ddl.sql");
115
116/// All versioned migrations in ascending order, applied by `run_migrations`.
117pub const MIGRATIONS: &[VersionedMigration] = &[
118    VersionedMigration {
119        version: 1,
120        name: "initial_schema",
121        up: V1_UP,
122    },
123    VersionedMigration {
124        version: 2,
125        name: "narrow_fts_sections_update_trigger",
126        up: V2_UP,
127    },
128    VersionedMigration {
129        version: 3,
130        name: "backfill_domain_mirror_atoms",
131        up: V3_UP,
132    },
133];
134
135const MIGRATION_TRACKING_TABLE: &str = include_str!("../sql/schema-migrations-table.sql");
136
137/// Apply all unapplied migrations in order. Idempotent; each migration runs in its own transaction.
138/// Errors on non-contiguous version array or failed migration.
139/// Read the applied schema version from an open connection **without** running
140/// migrations. Returns 0 when the `_schema_migrations` ledger is absent (an
141/// un-migrated or empty database). Never writes.
142pub fn read_schema_version(conn: &Connection) -> u32 {
143    conn.query_row(
144        "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
145        [],
146        |row| row.get(0),
147    )
148    .unwrap_or(0)
149}
150
151/// Open `path` read-only and report its applied schema version without creating
152/// or migrating the file. The caller must ensure `path` exists — opening a
153/// missing file read-only errors rather than creating it. This is the path used
154/// by schema-inspection commands that must not mutate the database.
155pub fn inspect_schema_version(path: &std::path::Path) -> Result<u32, SqliteError> {
156    let conn = Connection::open_with_flags(
157        path,
158        rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
159    )?;
160    Ok(read_schema_version(&conn))
161}
162
163pub fn run_migrations(conn: &mut Connection) -> Result<u32, SqliteError> {
164    conn.execute_batch(MIGRATION_TRACKING_TABLE)?;
165
166    let current_version: u32 = conn
167        .query_row(
168            "SELECT COALESCE(MAX(version), 0) FROM _schema_migrations",
169            [],
170            |row| row.get(0),
171        )
172        .unwrap_or(0);
173
174    // A database whose recorded version is ahead of the latest known migration
175    // predates the consolidated V1 baseline (ADR-015) — e.g. it still carries the
176    // pre-consolidation V2..V22 ledger — or was written by a newer build. Either
177    // way the baseline schema would be silently skipped, leaving the process on a
178    // stale schema. Fail loudly instead of corrupting silently.
179    let latest_version = MIGRATIONS.last().map(|m| m.version).unwrap_or(0);
180    if current_version > latest_version {
181        return Err(SqliteError::InvalidData(format!(
182            "database schema version {current_version} is ahead of the latest known migration \
183             {latest_version}. This database predates the consolidated baseline (ADR-015) or was \
184             written by a newer build. Recreate it from the current schema; in-place downgrade is \
185             not supported."
186        )));
187    }
188
189    let mut applied_version = current_version;
190
191    for migration in MIGRATIONS {
192        if migration.version <= current_version {
193            continue;
194        }
195
196        let tx = conn.transaction().map_err(|e| SqliteError::Migration {
197            version: migration.version,
198            error: e.to_string(),
199        })?;
200
201        tx.execute_batch(migration.up)
202            .map_err(|e| SqliteError::Migration {
203                version: migration.version,
204                error: e.to_string(),
205            })?;
206
207        let now = chrono::Utc::now().timestamp_micros();
208        tx.execute(
209            "INSERT INTO _schema_migrations (version, name, applied_at) VALUES (?1, ?2, ?3)",
210            rusqlite::params![migration.version, migration.name, now],
211        )
212        .map_err(|e| SqliteError::Migration {
213            version: migration.version,
214            error: e.to_string(),
215        })?;
216
217        tx.commit().map_err(|e| SqliteError::Migration {
218            version: migration.version,
219            error: e.to_string(),
220        })?;
221
222        applied_version = migration.version;
223    }
224
225    Ok(applied_version)
226}
227
228#[derive(Debug)]
229pub struct EmbeddingModelRegistryRecord {
230    /// Vector engine name (e.g. `"paraphrase"`).
231    pub engine_name: String,
232    /// Model identifier (e.g. `"all-minilm-l6-v2"`).
233    pub model_id: String,
234    /// Canonical deduplication key combining engine and model.
235    pub key_version: String,
236    /// Embedding dimensionality.
237    pub dimensions: u32,
238    /// Lifecycle status (`"active"` or `"superseded"`).
239    pub status: String,
240    /// Epoch timestamp when the model was activated.
241    pub activated_at: Option<i64>,
242    /// Epoch timestamp when the model was superseded.
243    pub superseded_at: Option<i64>,
244}
245
246/// Query the `_embedding_models` registry.
247///
248/// Opens the database at `db` (defaults to `~/.khive/khive.db`) and
249/// returns all registry rows, optionally filtered by `engine_name`.
250/// Returns an empty vec if the database or table does not exist.
251pub fn query_embedding_models(
252    db: Option<&std::path::Path>,
253    engine_filter: Option<&str>,
254) -> Result<Vec<EmbeddingModelRegistryRecord>, SqliteError> {
255    let path = db.map(std::path::Path::to_path_buf).unwrap_or_else(|| {
256        std::env::var("HOME")
257            .map(std::path::PathBuf::from)
258            .unwrap_or_else(|_| std::path::PathBuf::from("."))
259            .join(".khive/khive.db")
260    });
261    if !path.exists() {
262        return Ok(Vec::new());
263    }
264    let conn = Connection::open(path)?;
265    query_embedding_models_conn(&conn, engine_filter)
266}
267
268/// Query `_embedding_models` from an existing connection (testable without a file).
269///
270/// Returns an empty vec if the table does not exist.
271pub(crate) fn query_embedding_models_conn(
272    conn: &Connection,
273    engine_filter: Option<&str>,
274) -> Result<Vec<EmbeddingModelRegistryRecord>, SqliteError> {
275    let exists: bool = conn.query_row(
276        "SELECT COUNT(*) > 0 FROM sqlite_master \
277         WHERE type='table' AND name='_embedding_models'",
278        [],
279        |row| row.get(0),
280    )?;
281    if !exists {
282        return Ok(Vec::new());
283    }
284
285    let sql = if engine_filter.is_some() {
286        "SELECT engine_name, model_id, key_version, dim, status, activated_at, superseded_at \
287         FROM _embedding_models WHERE engine_name = ?1 \
288         ORDER BY engine_name, activated_at IS NULL, activated_at"
289    } else {
290        "SELECT engine_name, model_id, key_version, dim, status, activated_at, superseded_at \
291         FROM _embedding_models \
292         ORDER BY engine_name, activated_at IS NULL, activated_at"
293    };
294    let mut stmt = conn.prepare(sql)?;
295    let map_row = |row: &rusqlite::Row<'_>| {
296        let dim_raw: i64 = row.get(3)?;
297        let dimensions = u32::try_from(dim_raw).map_err(|_| {
298            rusqlite::Error::FromSqlConversionFailure(
299                3,
300                rusqlite::types::Type::Integer,
301                Box::new(std::io::Error::other(format!(
302                    "_embedding_models.dim value {dim_raw} is outside the valid u32 range [0, {}]",
303                    u32::MAX,
304                ))),
305            )
306        })?;
307        Ok(EmbeddingModelRegistryRecord {
308            engine_name: row.get(0)?,
309            model_id: row.get(1)?,
310            key_version: row.get(2)?,
311            dimensions,
312            status: row.get(4)?,
313            activated_at: row.get(5)?,
314            superseded_at: row.get(6)?,
315        })
316    };
317
318    if let Some(engine) = engine_filter {
319        stmt.query_map([engine], map_row)?
320            .collect::<Result<Vec<_>, _>>()
321            .map_err(Into::into)
322    } else {
323        stmt.query_map([], map_row)?
324            .collect::<Result<Vec<_>, _>>()
325            .map_err(Into::into)
326    }
327}
328
329// =============================================================================
330// Tests
331// =============================================================================
332
333#[cfg(test)]
334#[path = "migrations_tests.rs"]
335mod tests;