Skip to main content

sqlite_graphrag/commands/
migrate.rs

1//! Handler for the `migrate` CLI subcommand.
2
3use crate::errors::AppError;
4use crate::output;
5use crate::paths::AppPaths;
6use crate::storage::connection::open_rw;
7use rusqlite::OptionalExtension;
8use serde::Serialize;
9use siphasher::sip::SipHasher13;
10use std::hash::{Hash, Hasher};
11use std::path::Path;
12
13#[derive(clap::Args)]
14#[command(after_long_help = "EXAMPLES:\n  \
15    # Apply pending schema migrations\n  \
16    sqlite-graphrag migrate\n\n  \
17    # Show already-applied migrations without applying new ones\n  \
18    sqlite-graphrag migrate --status\n\n  \
19    # Migrate a database at a custom path\n  \
20    sqlite-graphrag migrate --db /path/to/graphrag.sqlite\n\n  \
21    # Rewrite recorded migration checksums to match the current file content.\n  \
22    # Use this after upgrading across a version that intentionally changed a\n  \
23    # migration file (v1.0.76 is the first release where this is exposed).\n  \
24    sqlite-graphrag migrate --rehash\n\n  \
25    # Full upgrade: rehash, apply V013 (drop vec tables), verify schema.\n  \
26    # Required once for users upgrading from v1.0.74 or v1.0.75.\n  \
27    sqlite-graphrag migrate --to-llm-only")]
28pub struct MigrateArgs {
29    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
30    pub db: Option<String>,
31    /// Explicit JSON flag. Accepted as a no-op because output is already JSON by default.
32    #[arg(long, default_value_t = false)]
33    pub json: bool,
34    /// Show already applied migrations without applying new ones.
35    #[arg(long, default_value_t = false)]
36    pub status: bool,
37    /// Rewrite recorded migration checksums to match the current file content
38    /// without re-applying the SQL. Idempotent; safe to re-run.
39    #[arg(long, default_value_t = false)]
40    pub rehash: bool,
41    /// One-shot upgrade for v1.0.74 / v1.0.75 databases: rehash checksums,
42    /// apply the V013 vec-table-drop migration, and report a structured
43    /// summary. Combines `--rehash` and the regular migration runner.
44    #[arg(long, default_value_t = false)]
45    pub to_llm_only: bool,
46    /// Required for `--to-llm-only` to acknowledge that the operation is
47    /// destructive: it permanently removes the `vec_memories`,
48    /// `vec_entities`, and `vec_chunks` virtual tables. The BLOB-backed
49    /// `memory_embeddings` / `entity_embeddings` / `chunk_embeddings`
50    /// tables remain and are the source of truth going forward.
51    #[arg(long, default_value_t = false)]
52    pub drop_vec_tables: bool,
53}
54
55#[derive(Serialize)]
56struct MigrateResponse {
57    db_path: String,
58    /// Latest applied migration number from `refinery_schema_history`.
59    /// Emitted as JSON number for cross-command consistency with `health`/`stats`/`init` (since v1.0.35).
60    schema_version: u32,
61    status: String,
62    /// Total execution time in milliseconds from handler start to serialisation.
63    elapsed_ms: u64,
64}
65
66#[derive(Serialize)]
67struct MigrateStatusResponse {
68    db_path: String,
69    applied_migrations: Vec<MigrationEntry>,
70    /// Latest applied migration number. JSON number since v1.0.35.
71    schema_version: u32,
72    elapsed_ms: u64,
73}
74
75#[derive(Serialize)]
76struct MigrationEntry {
77    version: i64,
78    name: String,
79    applied_on: Option<String>,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    checksum: Option<String>,
82}
83
84#[derive(Serialize)]
85struct RehashReport {
86    db_path: String,
87    schema_version: u32,
88    /// One row per migration whose recorded checksum was rewritten.
89    /// Empty array when nothing changed (already up to date).
90    rewritten: Vec<RehashEntry>,
91    /// Number of entries inspected.
92    inspected: usize,
93    status: String,
94    elapsed_ms: u64,
95}
96
97#[derive(Serialize, Debug)]
98struct RehashEntry {
99    version: i64,
100    name: String,
101    old_checksum: String,
102    new_checksum: String,
103}
104
105#[derive(Serialize)]
106struct ToLlmOnlyReport {
107    db_path: String,
108    schema_version: u32,
109    rehashed: Vec<RehashEntry>,
110    /// True if the vec0 virtual tables existed in the database before the
111    /// command ran. After this command they will be gone.
112    vec_tables_were_present: bool,
113    /// True if V013 was applied during this invocation.
114    v013_applied: bool,
115    status: String,
116    elapsed_ms: u64,
117}
118
119pub fn run(args: MigrateArgs) -> Result<(), AppError> {
120    let start = std::time::Instant::now();
121    let _ = args.json; // --json is a no-op because output is already JSON by default
122    let paths = AppPaths::resolve(args.db.as_deref())?;
123    paths.ensure_dirs()?;
124
125    if args.status && (args.rehash || args.to_llm_only) {
126        return Err(AppError::Validation(
127            "--status cannot be combined with --rehash or --to-llm-only".into(),
128        ));
129    }
130    if args.rehash && args.to_llm_only {
131        return Err(AppError::Validation(
132            "--rehash and --to-llm-only are mutually exclusive".into(),
133        ));
134    }
135    if args.to_llm_only && !args.drop_vec_tables {
136        return Err(AppError::Validation(
137            "--to-llm-only requires --drop-vec-tables to acknowledge the destructive drop".into(),
138        ));
139    }
140
141    let mut conn = open_rw(&paths.db)?;
142
143    if args.status {
144        let schema_version = latest_schema_version(&conn).unwrap_or(0);
145        let applied = list_applied_migrations(&conn)?;
146        output::emit_json(&MigrateStatusResponse {
147            db_path: paths.db.display().to_string(),
148            applied_migrations: applied,
149            schema_version,
150            elapsed_ms: start.elapsed().as_millis() as u64,
151        })?;
152        return Ok(());
153    }
154
155    if args.rehash {
156        let report = run_rehash(&mut conn, &paths.db)?;
157        output::emit_json(&report)?;
158        return Ok(());
159    }
160
161    if args.to_llm_only {
162        let report = run_to_llm_only(&mut conn, &paths.db)?;
163        output::emit_json(&report)?;
164        return Ok(());
165    }
166
167    crate::migrations::runner()
168        .run(&mut conn)
169        .map_err(|e| AppError::Internal(anyhow::anyhow!("migration failed: {e}")))?;
170
171    conn.execute_batch(&format!(
172        "PRAGMA user_version = {};",
173        crate::constants::SCHEMA_USER_VERSION
174    ))?;
175
176    let schema_version = latest_schema_version(&conn)?;
177    conn.execute(
178        "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('schema_version', ?1)",
179        rusqlite::params![schema_version],
180    )?;
181
182    output::emit_json(&MigrateResponse {
183        db_path: paths.db.display().to_string(),
184        schema_version,
185        status: "ok".to_string(),
186        elapsed_ms: start.elapsed().as_millis() as u64,
187    })?;
188
189    Ok(())
190}
191
192/// Compute the SipHasher13 checksum for a migration entry. Matches the
193/// algorithm used by refinery-core 0.9.1 (`name | version | sql`).
194///
195/// The `version` parameter MUST be `i32` (the default
196/// `SchemaVersion` alias in refinery-core) — passing `i64` would
197/// produce a different hash because the SipHasher13 implementation
198/// hashes the value's bit representation, and the two integer types
199/// differ in width. The `int8-versions` feature is NOT enabled.
200fn compute_checksum(name: &str, version: i32, sql: &str) -> u64 {
201    let mut hasher = SipHasher13::new();
202    name.hash(&mut hasher);
203    version.hash(&mut hasher);
204    sql.hash(&mut hasher);
205    hasher.finish()
206}
207
208fn run_rehash(conn: &mut rusqlite::Connection, db_path: &Path) -> Result<RehashReport, AppError> {
209    let start = std::time::Instant::now();
210    let schema_version = latest_schema_version(conn).unwrap_or(0);
211
212    if !history_table_exists(conn) {
213        return Ok(RehashReport {
214            db_path: db_path.display().to_string(),
215            schema_version,
216            rewritten: vec![],
217            inspected: 0,
218            status: "ok_no_history".to_string(),
219            elapsed_ms: start.elapsed().as_millis() as u64,
220        });
221    }
222
223    let mut rewritten: Vec<RehashEntry> = Vec::new();
224    let mut inspected = 0usize;
225
226    for mig in crate::migrations::runner().get_migrations().iter() {
227        if mig.sql().is_none() {
228            continue;
229        }
230        let name = mig.name().to_string();
231        let version = mig.version();
232        let sql = mig.sql().unwrap_or("").to_string();
233        let new_checksum = compute_checksum(&name, version, &sql);
234
235        let row: Option<String> = conn
236            .query_row(
237                "SELECT checksum FROM refinery_schema_history WHERE version = ?1",
238                rusqlite::params![version],
239                |r| r.get(0),
240            )
241            .optional()?;
242
243        inspected += 1;
244        if let Some(existing) = row {
245            let existing_trim = existing.trim();
246            let new_str = new_checksum.to_string();
247            if existing_trim != new_str {
248                conn.execute(
249                    "UPDATE refinery_schema_history SET checksum = ?1 WHERE version = ?2",
250                    rusqlite::params![new_str, version],
251                )?;
252                rewritten.push(RehashEntry {
253                    version: version as i64,
254                    name,
255                    old_checksum: existing_trim.to_string(),
256                    new_checksum: new_str,
257                });
258            }
259        } else {
260            // Row missing for a migration file that the runner already has
261            // loaded. Insert with a matching checksum so future runs accept
262            // it as already applied. (Rare; happens only on partial history.)
263            conn.execute(
264                "INSERT OR IGNORE INTO refinery_schema_history (version, name, checksum) VALUES (?1, ?2, ?3)",
265                rusqlite::params![version, name, new_checksum.to_string()],
266            )?;
267        }
268    }
269
270    let status = if rewritten.is_empty() {
271        "ok_no_changes"
272    } else {
273        "ok_rewritten"
274    };
275
276    Ok(RehashReport {
277        db_path: db_path.display().to_string(),
278        schema_version,
279        rewritten,
280        inspected,
281        status: status.to_string(),
282        elapsed_ms: start.elapsed().as_millis() as u64,
283    })
284}
285
286fn run_to_llm_only(
287    conn: &mut rusqlite::Connection,
288    db_path: &Path,
289) -> Result<ToLlmOnlyReport, AppError> {
290    let start = std::time::Instant::now();
291
292    // 1. Detect whether vec tables are still present in sqlite_master.
293    //    They were created by the v1.0.74 era V002 migration and dropped
294    //    by V013 in v1.0.76. Fresh v1.0.76 databases never had them.
295    let vec_tables_were_present: bool = {
296        let count: i64 = conn
297            .query_row(
298                "SELECT COUNT(*) FROM sqlite_master
299                 WHERE type='table' AND name IN ('vec_memories','vec_entities','vec_chunks')",
300                [],
301                |r| r.get(0),
302            )
303            .unwrap_or(0);
304        count > 0
305    };
306
307    // 2. Rehash checksums (in case V002 was the offender).
308    let rehash_report = run_rehash(conn, db_path)?;
309    let rehashed = rehash_report.rewritten;
310
311    // 3. Apply pending migrations (V013 will run if it hasn't yet).
312    //    If the user is on v1.0.75 the V013 migration was already applied,
313    //    so this is a no-op; if they're on v1.0.74 the V013 drop will run.
314    crate::migrations::runner()
315        .run(conn)
316        .map_err(|e| AppError::Internal(anyhow::anyhow!("migration failed: {e}")))?;
317
318    conn.execute_batch(&format!(
319        "PRAGMA user_version = {};",
320        crate::constants::SCHEMA_USER_VERSION
321    ))?;
322
323    let schema_version = latest_schema_version(conn)?;
324    conn.execute(
325        "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('schema_version', ?1)",
326        rusqlite::params![schema_version],
327    )?;
328
329    // 4. Detect V013 application by checking the schema_version.
330    //    V013 has version 13, so schema_version >= 13 implies it ran.
331    let v013_applied = schema_version >= 13;
332
333    Ok(ToLlmOnlyReport {
334        db_path: db_path.display().to_string(),
335        schema_version,
336        rehashed,
337        vec_tables_were_present,
338        v013_applied,
339        status: "ok".to_string(),
340        elapsed_ms: start.elapsed().as_millis() as u64,
341    })
342}
343
344fn history_table_exists(conn: &rusqlite::Connection) -> bool {
345    conn.query_row(
346        "SELECT name FROM sqlite_master WHERE type='table' AND name='refinery_schema_history'",
347        [],
348        |r| r.get::<_, String>(0),
349    )
350    .optional()
351    .ok()
352    .flatten()
353    .is_some()
354}
355
356fn list_applied_migrations(conn: &rusqlite::Connection) -> Result<Vec<MigrationEntry>, AppError> {
357    let table_exists: Option<String> = conn
358        .query_row(
359            "SELECT name FROM sqlite_master WHERE type='table' AND name='refinery_schema_history'",
360            [],
361            |r| r.get(0),
362        )
363        .optional()?;
364    if table_exists.is_none() {
365        return Ok(vec![]);
366    }
367    let mut stmt = conn.prepare_cached(
368        "SELECT version, name, applied_on, checksum FROM refinery_schema_history ORDER BY version ASC",
369    )?;
370    let entries = stmt
371        .query_map([], |r| {
372            let checksum: Option<String> = r.get(3)?;
373            Ok(MigrationEntry {
374                version: r.get(0)?,
375                name: r.get(1)?,
376                applied_on: r.get(2)?,
377                checksum: checksum
378                    .map(|s| s.trim().to_string())
379                    .filter(|s| !s.is_empty()),
380            })
381        })?
382        .collect::<Result<Vec<_>, _>>()?;
383    Ok(entries)
384}
385
386fn latest_schema_version(conn: &rusqlite::Connection) -> Result<u32, AppError> {
387    match conn.query_row(
388        "SELECT version FROM refinery_schema_history ORDER BY version DESC LIMIT 1",
389        [],
390        |row| row.get::<_, i64>(0),
391    ) {
392        Ok(version) => Ok(version.max(0) as u32),
393        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
394        Err(err) => Err(AppError::Database(err)),
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use rusqlite::Connection;
402
403    fn create_db_without_history() -> Connection {
404        Connection::open_in_memory().expect("failed to open in-memory db")
405    }
406
407    fn create_db_with_history(version: i64) -> Connection {
408        let conn = Connection::open_in_memory().expect("failed to open in-memory db");
409        conn.execute_batch(
410            "CREATE TABLE refinery_schema_history (
411                version INTEGER NOT NULL,
412                name TEXT,
413                applied_on TEXT,
414                checksum TEXT
415            );",
416        )
417        .expect("failed to create history table");
418        conn.execute(
419            "INSERT INTO refinery_schema_history (version, name) VALUES (?1, 'V001__init')",
420            rusqlite::params![version],
421        )
422        .expect("failed to insert version");
423        conn
424    }
425
426    #[test]
427    fn latest_schema_version_returns_error_without_table() {
428        let conn = create_db_without_history();
429        let result = latest_schema_version(&conn);
430        assert!(result.is_err(), "must return Err when table does not exist");
431    }
432
433    #[test]
434    fn latest_schema_version_returns_max_version() {
435        let conn = create_db_with_history(6);
436        let version = latest_schema_version(&conn).unwrap();
437        assert_eq!(version, 6u32);
438    }
439
440    #[test]
441    fn migrate_response_serializes_required_fields() {
442        let resp = MigrateResponse {
443            db_path: "/tmp/test.sqlite".to_string(),
444            schema_version: 6,
445            status: "ok".to_string(),
446            elapsed_ms: 12,
447        };
448        let json = serde_json::to_value(&resp).unwrap();
449        assert_eq!(json["status"], "ok");
450        assert_eq!(json["schema_version"], 6);
451        assert_eq!(json["db_path"], "/tmp/test.sqlite");
452        assert_eq!(json["elapsed_ms"], 12);
453    }
454
455    #[test]
456    fn latest_schema_version_returns_zero_when_table_empty() {
457        let conn = Connection::open_in_memory().expect("in-memory db");
458        conn.execute_batch(
459            "CREATE TABLE refinery_schema_history (
460                version INTEGER NOT NULL,
461                name TEXT
462            );",
463        )
464        .expect("table creation");
465        let version = latest_schema_version(&conn).unwrap();
466        assert_eq!(version, 0u32);
467    }
468
469    #[test]
470    fn compute_checksum_is_deterministic_and_matches_refinery() {
471        // This is the same algorithm that refinery-core 0.9.1 uses. We
472        // pin the numeric value to detect any change in siphasher
473        // behaviour that would break migration verification.
474        let a = compute_checksum("vec_tables", 2, "SELECT 1;");
475        let b = compute_checksum("vec_tables", 2, "SELECT 1;");
476        assert_eq!(a, b, "checksum must be deterministic");
477        let c = compute_checksum("vec_tables", 2, "SELECT 1;\n");
478        assert_ne!(
479            a, c,
480            "trailing newline must change the checksum (matches refinery)"
481        );
482    }
483
484    #[test]
485    fn rehash_with_no_history_returns_empty() {
486        let mut conn = create_db_without_history();
487        let report = run_rehash(&mut conn, Path::new("/tmp/empty.sqlite")).unwrap();
488        assert_eq!(report.status, "ok_no_history");
489        assert!(report.rewritten.is_empty());
490        assert_eq!(report.inspected, 0);
491    }
492
493    #[test]
494    fn rehash_writes_matching_checksum() {
495        // Pre-populate the history with a WRONG checksum. The rehash
496        // must detect the mismatch and rewrite the row.
497        let mut conn = Connection::open_in_memory().expect("in-memory db");
498        conn.execute_batch(
499            "CREATE TABLE refinery_schema_history (
500                version INTEGER NOT NULL,
501                name TEXT,
502                applied_on TEXT,
503                checksum TEXT
504            );",
505        )
506        .expect("history create");
507        // Use the first migration present in the embedded set (V001).
508        let first = crate::migrations::runner().get_migrations()[0].clone();
509        let v = first.version();
510        let name = first.name().to_string();
511        let sql = first.sql().unwrap_or("").to_string();
512        let correct = compute_checksum(&name, v, &sql).to_string();
513        let wrong = "1234567890";
514        assert_ne!(correct, wrong, "test sanity: correct != wrong");
515
516        conn.execute(
517            "INSERT INTO refinery_schema_history (version, name, checksum) VALUES (?1, ?2, ?3)",
518            rusqlite::params![v, name, wrong],
519        )
520        .expect("insert");
521
522        let report = run_rehash(&mut conn, Path::new("/tmp/test.sqlite")).unwrap();
523        assert_eq!(report.rewritten.len(), 1);
524        assert_eq!(report.rewritten[0].old_checksum, wrong);
525        assert_eq!(report.rewritten[0].new_checksum, correct);
526
527        // And the row now matches what refinery would compute.
528        let stored: String = conn
529            .query_row(
530                "SELECT checksum FROM refinery_schema_history WHERE version = ?1",
531                rusqlite::params![v],
532                |r| r.get(0),
533            )
534            .unwrap();
535        assert_eq!(stored, correct);
536    }
537
538    #[test]
539    fn rehash_is_idempotent_when_checksums_match() {
540        let mut conn = Connection::open_in_memory().expect("in-memory db");
541        conn.execute_batch(
542            "CREATE TABLE refinery_schema_history (
543                version INTEGER NOT NULL,
544                name TEXT,
545                applied_on TEXT,
546                checksum TEXT
547            );",
548        )
549        .unwrap();
550        let first = crate::migrations::runner().get_migrations()[0].clone();
551        let v = first.version();
552        let name = first.name().to_string();
553        let sql = first.sql().unwrap_or("").to_string();
554        let correct = compute_checksum(&name, v, &sql).to_string();
555        conn.execute(
556            "INSERT INTO refinery_schema_history (version, name, checksum) VALUES (?1, ?2, ?3)",
557            rusqlite::params![v, name, correct.clone()],
558        )
559        .unwrap();
560
561        let report = run_rehash(&mut conn, Path::new("/tmp/test.sqlite")).unwrap();
562        assert!(
563            report.rewritten.is_empty(),
564            "must not rewrite matching rows"
565        );
566        assert_eq!(report.status, "ok_no_changes");
567    }
568
569    #[test]
570    fn rehash_matches_refinery_embedded_checksum_for_v001() {
571        // The ultimate correctness test: run a real migration, capture
572        // what refinery stored, then call run_rehash and confirm the
573        // file-derived checksum matches what the runner produced. This
574        // pins the algorithm end-to-end and would catch any future drift
575        // (e.g. a siphasher major version bump that changes SipHasher13).
576        let dir = tempfile::tempdir().expect("tempdir");
577        let path = dir.path().join("test.sqlite");
578        let mut conn = open_rw(&path).expect("open_rw");
579        crate::migrations::runner().run(&mut conn).expect("migrate");
580        let stored: String = conn
581            .query_row(
582                "SELECT checksum FROM refinery_schema_history WHERE version = 1",
583                [],
584                |r| r.get(0),
585            )
586            .unwrap();
587        let report = run_rehash(&mut conn, &path).expect("rehash");
588        assert!(
589            report.rewritten.is_empty(),
590            "V001 must NOT be rewritten when checksums already match: rewrote={:?}",
591            report.rewritten
592        );
593        // And re-running runner() should still succeed (the original
594        // error that the failing test exposed was that the second
595        // runner().run() call saw a checksum mismatch).
596        crate::migrations::runner()
597            .run(&mut conn)
598            .expect("re-run migrate must succeed");
599        let stored_after: String = conn
600            .query_row(
601                "SELECT checksum FROM refinery_schema_history WHERE version = 1",
602                [],
603                |r| r.get(0),
604            )
605            .unwrap();
606        assert_eq!(
607            stored, stored_after,
608            "checksum must not change after rehash"
609        );
610    }
611
612    #[test]
613    fn to_llm_only_reports_no_vec_tables_on_fresh_db() {
614        // Fresh v1.0.76 database (created by running the full migration
615        // set) has no vec tables.
616        let dir = tempfile::tempdir().expect("tempdir");
617        let path = dir.path().join("fresh.sqlite");
618        let mut conn = open_rw(&path).expect("open_rw");
619        crate::migrations::runner().run(&mut conn).expect("migrate");
620        let report = run_to_llm_only(&mut conn, &path).expect("to_llm_only");
621        assert!(!report.vec_tables_were_present);
622        assert!(report.v013_applied, "V013 must be marked applied");
623        assert_eq!(report.status, "ok");
624    }
625
626    #[test]
627    fn history_table_exists_detects_table() {
628        let conn = create_db_with_history(1);
629        assert!(history_table_exists(&conn));
630        let conn2 = create_db_without_history();
631        assert!(!history_table_exists(&conn2));
632    }
633}