1use crate::errors::AppError;
4use crate::output;
5use crate::paths::AppPaths;
6use crate::storage::connection::open_rw;
7use rusqlite::OptionalExtension;
8use serde::Serialize;
9use siphasher::sip::SipHasher13;
10use std::hash::{Hash, Hasher};
11use std::path::Path;
12
13#[derive(clap::Args)]
14#[command(after_long_help = "EXAMPLES:\n \
15 # Apply pending schema migrations\n \
16 sqlite-graphrag migrate\n\n \
17 # Show already-applied migrations without applying new ones\n \
18 sqlite-graphrag migrate --status\n\n \
19 # Migrate a database at a custom path\n \
20 sqlite-graphrag migrate --db /path/to/graphrag.sqlite\n\n \
21 # Rewrite recorded migration checksums to match the current file content.\n \
22 # Use this after upgrading across a version that intentionally changed a\n \
23 # migration file (v1.0.76 is the first release where this is exposed).\n \
24 sqlite-graphrag migrate --rehash\n\n \
25 # Full upgrade: rehash, apply V013 (drop vec tables), verify schema.\n \
26 # Required once for users upgrading from v1.0.74 or v1.0.75.\n \
27 sqlite-graphrag migrate --to-llm-only")]
28pub struct MigrateArgs {
29 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
30 pub db: Option<String>,
31 #[arg(long, default_value_t = false)]
33 pub json: bool,
34 #[arg(long, default_value_t = false)]
36 pub status: bool,
37 #[arg(long, default_value_t = false)]
40 pub rehash: bool,
41 #[arg(long, default_value_t = false)]
45 pub to_llm_only: bool,
46 #[arg(long, default_value_t = false)]
52 pub drop_vec_tables: bool,
53}
54
55#[derive(Serialize)]
56struct MigrateResponse {
57 db_path: String,
58 schema_version: u32,
61 status: String,
62 elapsed_ms: u64,
64}
65
66#[derive(Serialize)]
67struct MigrateStatusResponse {
68 db_path: String,
69 applied_migrations: Vec<MigrationEntry>,
70 schema_version: u32,
72 elapsed_ms: u64,
73}
74
75#[derive(Serialize)]
76struct MigrationEntry {
77 version: i64,
78 name: String,
79 applied_on: Option<String>,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 checksum: Option<String>,
82}
83
84#[derive(Serialize)]
85struct RehashReport {
86 db_path: String,
87 schema_version: u32,
88 rewritten: Vec<RehashEntry>,
91 inspected: usize,
93 status: String,
94 elapsed_ms: u64,
95}
96
97#[derive(Serialize, Debug)]
98struct RehashEntry {
99 version: i64,
100 name: String,
101 old_checksum: String,
102 new_checksum: String,
103}
104
105#[derive(Serialize)]
106struct ToLlmOnlyReport {
107 db_path: String,
108 schema_version: u32,
109 rehashed: Vec<RehashEntry>,
110 vec_tables_were_present: bool,
113 v013_applied: bool,
115 status: String,
116 elapsed_ms: u64,
117}
118
119pub fn run(args: MigrateArgs) -> Result<(), AppError> {
120 let start = std::time::Instant::now();
121 let _ = args.json; let paths = AppPaths::resolve(args.db.as_deref())?;
123 paths.ensure_dirs()?;
124
125 if args.status && (args.rehash || args.to_llm_only) {
126 return Err(AppError::Validation(
127 "--status cannot be combined with --rehash or --to-llm-only".into(),
128 ));
129 }
130 if args.rehash && args.to_llm_only {
131 return Err(AppError::Validation(
132 "--rehash and --to-llm-only are mutually exclusive".into(),
133 ));
134 }
135 if args.to_llm_only && !args.drop_vec_tables {
136 return Err(AppError::Validation(
137 "--to-llm-only requires --drop-vec-tables to acknowledge the destructive drop".into(),
138 ));
139 }
140
141 let mut conn = open_rw(&paths.db)?;
142
143 if args.status {
144 let schema_version = latest_schema_version(&conn).unwrap_or(0);
145 let applied = list_applied_migrations(&conn)?;
146 output::emit_json(&MigrateStatusResponse {
147 db_path: paths.db.display().to_string(),
148 applied_migrations: applied,
149 schema_version,
150 elapsed_ms: start.elapsed().as_millis() as u64,
151 })?;
152 return Ok(());
153 }
154
155 if args.rehash {
156 let report = run_rehash(&mut conn, &paths.db)?;
157 output::emit_json(&report)?;
158 return Ok(());
159 }
160
161 if args.to_llm_only {
162 let report = run_to_llm_only(&mut conn, &paths.db)?;
163 output::emit_json(&report)?;
164 return Ok(());
165 }
166
167 crate::migrations::runner()
168 .run(&mut conn)
169 .map_err(|e| AppError::Internal(anyhow::anyhow!("migration failed: {e}")))?;
170
171 conn.execute_batch(&format!(
172 "PRAGMA user_version = {};",
173 crate::constants::SCHEMA_USER_VERSION
174 ))?;
175
176 let schema_version = latest_schema_version(&conn)?;
177 conn.execute(
178 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('schema_version', ?1)",
179 rusqlite::params![schema_version],
180 )?;
181
182 output::emit_json(&MigrateResponse {
183 db_path: paths.db.display().to_string(),
184 schema_version,
185 status: "ok".to_string(),
186 elapsed_ms: start.elapsed().as_millis() as u64,
187 })?;
188
189 Ok(())
190}
191
192fn compute_checksum(name: &str, version: i32, sql: &str) -> u64 {
201 let mut hasher = SipHasher13::new();
202 name.hash(&mut hasher);
203 version.hash(&mut hasher);
204 sql.hash(&mut hasher);
205 hasher.finish()
206}
207
208fn run_rehash(conn: &mut rusqlite::Connection, db_path: &Path) -> Result<RehashReport, AppError> {
209 let start = std::time::Instant::now();
210 let schema_version = latest_schema_version(conn).unwrap_or(0);
211
212 if !history_table_exists(conn) {
213 return Ok(RehashReport {
214 db_path: db_path.display().to_string(),
215 schema_version,
216 rewritten: vec![],
217 inspected: 0,
218 status: "ok_no_history".to_string(),
219 elapsed_ms: start.elapsed().as_millis() as u64,
220 });
221 }
222
223 let mut rewritten: Vec<RehashEntry> = Vec::new();
224 let mut inspected = 0usize;
225
226 for mig in crate::migrations::runner().get_migrations().iter() {
227 if mig.sql().is_none() {
228 continue;
229 }
230 let name = mig.name().to_string();
231 let version = mig.version();
232 let sql = mig.sql().unwrap_or("").to_string();
233 let new_checksum = compute_checksum(&name, version, &sql);
234
235 let row: Option<String> = conn
236 .query_row(
237 "SELECT checksum FROM refinery_schema_history WHERE version = ?1",
238 rusqlite::params![version],
239 |r| r.get(0),
240 )
241 .optional()?;
242
243 inspected += 1;
244 if let Some(existing) = row {
245 let existing_trim = existing.trim();
246 let new_str = new_checksum.to_string();
247 if existing_trim != new_str {
248 conn.execute(
249 "UPDATE refinery_schema_history SET checksum = ?1 WHERE version = ?2",
250 rusqlite::params![new_str, version],
251 )?;
252 rewritten.push(RehashEntry {
253 version: version as i64,
254 name,
255 old_checksum: existing_trim.to_string(),
256 new_checksum: new_str,
257 });
258 }
259 } else {
260 conn.execute(
264 "INSERT OR IGNORE INTO refinery_schema_history (version, name, checksum) VALUES (?1, ?2, ?3)",
265 rusqlite::params![version, name, new_checksum.to_string()],
266 )?;
267 }
268 }
269
270 let status = if rewritten.is_empty() {
271 "ok_no_changes"
272 } else {
273 "ok_rewritten"
274 };
275
276 Ok(RehashReport {
277 db_path: db_path.display().to_string(),
278 schema_version,
279 rewritten,
280 inspected,
281 status: status.to_string(),
282 elapsed_ms: start.elapsed().as_millis() as u64,
283 })
284}
285
286fn run_to_llm_only(
287 conn: &mut rusqlite::Connection,
288 db_path: &Path,
289) -> Result<ToLlmOnlyReport, AppError> {
290 let start = std::time::Instant::now();
291
292 let vec_tables_were_present: bool = {
296 let count: i64 = conn
297 .query_row(
298 "SELECT COUNT(*) FROM sqlite_master
299 WHERE type='table' AND name IN ('vec_memories','vec_entities','vec_chunks')",
300 [],
301 |r| r.get(0),
302 )
303 .unwrap_or(0);
304 count > 0
305 };
306
307 let rehash_report = run_rehash(conn, db_path)?;
309 let rehashed = rehash_report.rewritten;
310
311 crate::migrations::runner()
315 .run(conn)
316 .map_err(|e| AppError::Internal(anyhow::anyhow!("migration failed: {e}")))?;
317
318 conn.execute_batch(&format!(
319 "PRAGMA user_version = {};",
320 crate::constants::SCHEMA_USER_VERSION
321 ))?;
322
323 let schema_version = latest_schema_version(conn)?;
324 conn.execute(
325 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('schema_version', ?1)",
326 rusqlite::params![schema_version],
327 )?;
328
329 let v013_applied = schema_version >= 13;
332
333 Ok(ToLlmOnlyReport {
334 db_path: db_path.display().to_string(),
335 schema_version,
336 rehashed,
337 vec_tables_were_present,
338 v013_applied,
339 status: "ok".to_string(),
340 elapsed_ms: start.elapsed().as_millis() as u64,
341 })
342}
343
344fn history_table_exists(conn: &rusqlite::Connection) -> bool {
345 conn.query_row(
346 "SELECT name FROM sqlite_master WHERE type='table' AND name='refinery_schema_history'",
347 [],
348 |r| r.get::<_, String>(0),
349 )
350 .optional()
351 .ok()
352 .flatten()
353 .is_some()
354}
355
356fn list_applied_migrations(conn: &rusqlite::Connection) -> Result<Vec<MigrationEntry>, AppError> {
357 let table_exists: Option<String> = conn
358 .query_row(
359 "SELECT name FROM sqlite_master WHERE type='table' AND name='refinery_schema_history'",
360 [],
361 |r| r.get(0),
362 )
363 .optional()?;
364 if table_exists.is_none() {
365 return Ok(vec![]);
366 }
367 let mut stmt = conn.prepare_cached(
368 "SELECT version, name, applied_on, checksum FROM refinery_schema_history ORDER BY version ASC",
369 )?;
370 let entries = stmt
371 .query_map([], |r| {
372 let checksum: Option<String> = r.get(3)?;
373 Ok(MigrationEntry {
374 version: r.get(0)?,
375 name: r.get(1)?,
376 applied_on: r.get(2)?,
377 checksum: checksum
378 .map(|s| s.trim().to_string())
379 .filter(|s| !s.is_empty()),
380 })
381 })?
382 .collect::<Result<Vec<_>, _>>()?;
383 Ok(entries)
384}
385
386fn latest_schema_version(conn: &rusqlite::Connection) -> Result<u32, AppError> {
387 match conn.query_row(
388 "SELECT version FROM refinery_schema_history ORDER BY version DESC LIMIT 1",
389 [],
390 |row| row.get::<_, i64>(0),
391 ) {
392 Ok(version) => Ok(version.max(0) as u32),
393 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
394 Err(err) => Err(AppError::Database(err)),
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use rusqlite::Connection;
402
403 fn create_db_without_history() -> Connection {
404 Connection::open_in_memory().expect("failed to open in-memory db")
405 }
406
407 fn create_db_with_history(version: i64) -> Connection {
408 let conn = Connection::open_in_memory().expect("failed to open in-memory db");
409 conn.execute_batch(
410 "CREATE TABLE refinery_schema_history (
411 version INTEGER NOT NULL,
412 name TEXT,
413 applied_on TEXT,
414 checksum TEXT
415 );",
416 )
417 .expect("failed to create history table");
418 conn.execute(
419 "INSERT INTO refinery_schema_history (version, name) VALUES (?1, 'V001__init')",
420 rusqlite::params![version],
421 )
422 .expect("failed to insert version");
423 conn
424 }
425
426 #[test]
427 fn latest_schema_version_returns_error_without_table() {
428 let conn = create_db_without_history();
429 let result = latest_schema_version(&conn);
430 assert!(result.is_err(), "must return Err when table does not exist");
431 }
432
433 #[test]
434 fn latest_schema_version_returns_max_version() {
435 let conn = create_db_with_history(6);
436 let version = latest_schema_version(&conn).unwrap();
437 assert_eq!(version, 6u32);
438 }
439
440 #[test]
441 fn migrate_response_serializes_required_fields() {
442 let resp = MigrateResponse {
443 db_path: "/tmp/test.sqlite".to_string(),
444 schema_version: 6,
445 status: "ok".to_string(),
446 elapsed_ms: 12,
447 };
448 let json = serde_json::to_value(&resp).unwrap();
449 assert_eq!(json["status"], "ok");
450 assert_eq!(json["schema_version"], 6);
451 assert_eq!(json["db_path"], "/tmp/test.sqlite");
452 assert_eq!(json["elapsed_ms"], 12);
453 }
454
455 #[test]
456 fn latest_schema_version_returns_zero_when_table_empty() {
457 let conn = Connection::open_in_memory().expect("in-memory db");
458 conn.execute_batch(
459 "CREATE TABLE refinery_schema_history (
460 version INTEGER NOT NULL,
461 name TEXT
462 );",
463 )
464 .expect("table creation");
465 let version = latest_schema_version(&conn).unwrap();
466 assert_eq!(version, 0u32);
467 }
468
469 #[test]
470 fn compute_checksum_is_deterministic_and_matches_refinery() {
471 let a = compute_checksum("vec_tables", 2, "SELECT 1;");
475 let b = compute_checksum("vec_tables", 2, "SELECT 1;");
476 assert_eq!(a, b, "checksum must be deterministic");
477 let c = compute_checksum("vec_tables", 2, "SELECT 1;\n");
478 assert_ne!(
479 a, c,
480 "trailing newline must change the checksum (matches refinery)"
481 );
482 }
483
484 #[test]
485 fn rehash_with_no_history_returns_empty() {
486 let mut conn = create_db_without_history();
487 let report = run_rehash(&mut conn, Path::new("/tmp/empty.sqlite")).unwrap();
488 assert_eq!(report.status, "ok_no_history");
489 assert!(report.rewritten.is_empty());
490 assert_eq!(report.inspected, 0);
491 }
492
493 #[test]
494 fn rehash_writes_matching_checksum() {
495 let mut conn = Connection::open_in_memory().expect("in-memory db");
498 conn.execute_batch(
499 "CREATE TABLE refinery_schema_history (
500 version INTEGER NOT NULL,
501 name TEXT,
502 applied_on TEXT,
503 checksum TEXT
504 );",
505 )
506 .expect("history create");
507 let first = crate::migrations::runner().get_migrations()[0].clone();
509 let v = first.version();
510 let name = first.name().to_string();
511 let sql = first.sql().unwrap_or("").to_string();
512 let correct = compute_checksum(&name, v, &sql).to_string();
513 let wrong = "1234567890";
514 assert_ne!(correct, wrong, "test sanity: correct != wrong");
515
516 conn.execute(
517 "INSERT INTO refinery_schema_history (version, name, checksum) VALUES (?1, ?2, ?3)",
518 rusqlite::params![v, name, wrong],
519 )
520 .expect("insert");
521
522 let report = run_rehash(&mut conn, Path::new("/tmp/test.sqlite")).unwrap();
523 assert_eq!(report.rewritten.len(), 1);
524 assert_eq!(report.rewritten[0].old_checksum, wrong);
525 assert_eq!(report.rewritten[0].new_checksum, correct);
526
527 let stored: String = conn
529 .query_row(
530 "SELECT checksum FROM refinery_schema_history WHERE version = ?1",
531 rusqlite::params![v],
532 |r| r.get(0),
533 )
534 .unwrap();
535 assert_eq!(stored, correct);
536 }
537
538 #[test]
539 fn rehash_is_idempotent_when_checksums_match() {
540 let mut conn = Connection::open_in_memory().expect("in-memory db");
541 conn.execute_batch(
542 "CREATE TABLE refinery_schema_history (
543 version INTEGER NOT NULL,
544 name TEXT,
545 applied_on TEXT,
546 checksum TEXT
547 );",
548 )
549 .unwrap();
550 let first = crate::migrations::runner().get_migrations()[0].clone();
551 let v = first.version();
552 let name = first.name().to_string();
553 let sql = first.sql().unwrap_or("").to_string();
554 let correct = compute_checksum(&name, v, &sql).to_string();
555 conn.execute(
556 "INSERT INTO refinery_schema_history (version, name, checksum) VALUES (?1, ?2, ?3)",
557 rusqlite::params![v, name, correct.clone()],
558 )
559 .unwrap();
560
561 let report = run_rehash(&mut conn, Path::new("/tmp/test.sqlite")).unwrap();
562 assert!(
563 report.rewritten.is_empty(),
564 "must not rewrite matching rows"
565 );
566 assert_eq!(report.status, "ok_no_changes");
567 }
568
569 #[test]
570 fn rehash_matches_refinery_embedded_checksum_for_v001() {
571 let dir = tempfile::tempdir().expect("tempdir");
577 let path = dir.path().join("test.sqlite");
578 let mut conn = open_rw(&path).expect("open_rw");
579 crate::migrations::runner().run(&mut conn).expect("migrate");
580 let stored: String = conn
581 .query_row(
582 "SELECT checksum FROM refinery_schema_history WHERE version = 1",
583 [],
584 |r| r.get(0),
585 )
586 .unwrap();
587 let report = run_rehash(&mut conn, &path).expect("rehash");
588 assert!(
589 report.rewritten.is_empty(),
590 "V001 must NOT be rewritten when checksums already match: rewrote={:?}",
591 report.rewritten
592 );
593 crate::migrations::runner()
597 .run(&mut conn)
598 .expect("re-run migrate must succeed");
599 let stored_after: String = conn
600 .query_row(
601 "SELECT checksum FROM refinery_schema_history WHERE version = 1",
602 [],
603 |r| r.get(0),
604 )
605 .unwrap();
606 assert_eq!(
607 stored, stored_after,
608 "checksum must not change after rehash"
609 );
610 }
611
612 #[test]
613 fn to_llm_only_reports_no_vec_tables_on_fresh_db() {
614 let dir = tempfile::tempdir().expect("tempdir");
617 let path = dir.path().join("fresh.sqlite");
618 let mut conn = open_rw(&path).expect("open_rw");
619 crate::migrations::runner().run(&mut conn).expect("migrate");
620 let report = run_to_llm_only(&mut conn, &path).expect("to_llm_only");
621 assert!(!report.vec_tables_were_present);
622 assert!(report.v013_applied, "V013 must be marked applied");
623 assert_eq!(report.status, "ok");
624 }
625
626 #[test]
627 fn history_table_exists_detects_table() {
628 let conn = create_db_with_history(1);
629 assert!(history_table_exists(&conn));
630 let conn2 = create_db_without_history();
631 assert!(!history_table_exists(&conn2));
632 }
633}