sqlite_graphrag/commands/
migrate.rs1use crate::errors::AppError;
4use crate::output;
5use crate::paths::AppPaths;
6use crate::storage::connection::open_rw;
7use rusqlite::OptionalExtension;
8use serde::Serialize;
9
10#[derive(clap::Args)]
11#[command(after_long_help = "EXAMPLES:\n \
12 # Apply pending schema migrations\n \
13 sqlite-graphrag migrate\n\n \
14 # Show already-applied migrations without applying new ones\n \
15 sqlite-graphrag migrate --status\n\n \
16 # Migrate a database at a custom path\n \
17 sqlite-graphrag migrate --db /path/to/graphrag.sqlite")]
18pub struct MigrateArgs {
19 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
20 pub db: Option<String>,
21 #[arg(long, default_value_t = false)]
23 pub json: bool,
24 #[arg(long, default_value_t = false)]
26 pub status: bool,
27}
28
29#[derive(Serialize)]
30struct MigrateResponse {
31 db_path: String,
32 schema_version: u32,
35 status: String,
36 elapsed_ms: u64,
38}
39
40#[derive(Serialize)]
41struct MigrateStatusResponse {
42 db_path: String,
43 applied_migrations: Vec<MigrationEntry>,
44 schema_version: u32,
46 elapsed_ms: u64,
47}
48
49#[derive(Serialize)]
50struct MigrationEntry {
51 version: i64,
52 name: String,
53 applied_on: Option<String>,
54}
55
56pub fn run(args: MigrateArgs) -> Result<(), AppError> {
57 let start = std::time::Instant::now();
58 let _ = args.json; let paths = AppPaths::resolve(args.db.as_deref())?;
60 paths.ensure_dirs()?;
61
62 let mut conn = open_rw(&paths.db)?;
63
64 if args.status {
65 let schema_version = latest_schema_version(&conn).unwrap_or(0);
66 let applied = list_applied_migrations(&conn)?;
67 output::emit_json(&MigrateStatusResponse {
68 db_path: paths.db.display().to_string(),
69 applied_migrations: applied,
70 schema_version,
71 elapsed_ms: start.elapsed().as_millis() as u64,
72 })?;
73 return Ok(());
74 }
75
76 crate::migrations::runner()
77 .run(&mut conn)
78 .map_err(|e| AppError::Internal(anyhow::anyhow!("migration failed: {e}")))?;
79
80 conn.execute_batch(&format!(
81 "PRAGMA user_version = {};",
82 crate::constants::SCHEMA_USER_VERSION
83 ))?;
84
85 let schema_version = latest_schema_version(&conn)?;
86 conn.execute(
87 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('schema_version', ?1)",
88 rusqlite::params![schema_version],
89 )?;
90
91 output::emit_json(&MigrateResponse {
92 db_path: paths.db.display().to_string(),
93 schema_version,
94 status: "ok".to_string(),
95 elapsed_ms: start.elapsed().as_millis() as u64,
96 })?;
97
98 Ok(())
99}
100
101fn list_applied_migrations(conn: &rusqlite::Connection) -> Result<Vec<MigrationEntry>, AppError> {
102 let table_exists: Option<String> = conn
103 .query_row(
104 "SELECT name FROM sqlite_master WHERE type='table' AND name='refinery_schema_history'",
105 [],
106 |r| r.get(0),
107 )
108 .optional()?;
109 if table_exists.is_none() {
110 return Ok(vec![]);
111 }
112 let mut stmt = conn.prepare(
113 "SELECT version, name, applied_on FROM refinery_schema_history ORDER BY version ASC",
114 )?;
115 let entries = stmt
116 .query_map([], |r| {
117 Ok(MigrationEntry {
118 version: r.get(0)?,
119 name: r.get(1)?,
120 applied_on: r.get(2)?,
121 })
122 })?
123 .collect::<Result<Vec<_>, _>>()?;
124 Ok(entries)
125}
126
127fn latest_schema_version(conn: &rusqlite::Connection) -> Result<u32, AppError> {
128 match conn.query_row(
129 "SELECT version FROM refinery_schema_history ORDER BY version DESC LIMIT 1",
130 [],
131 |row| row.get::<_, i64>(0),
132 ) {
133 Ok(version) => Ok(version.max(0) as u32),
134 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
135 Err(err) => Err(AppError::Database(err)),
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use rusqlite::Connection;
143
144 fn create_db_without_history() -> Connection {
145 Connection::open_in_memory().expect("failed to open in-memory db")
146 }
147
148 fn create_db_with_history(version: i64) -> Connection {
149 let conn = Connection::open_in_memory().expect("failed to open in-memory db");
150 conn.execute_batch(
151 "CREATE TABLE refinery_schema_history (
152 version INTEGER NOT NULL,
153 name TEXT,
154 applied_on TEXT,
155 checksum TEXT
156 );",
157 )
158 .expect("failed to create history table");
159 conn.execute(
160 "INSERT INTO refinery_schema_history (version, name) VALUES (?1, 'V001__init')",
161 rusqlite::params![version],
162 )
163 .expect("failed to insert version");
164 conn
165 }
166
167 #[test]
168 fn latest_schema_version_returns_error_without_table() {
169 let conn = create_db_without_history();
170 let result = latest_schema_version(&conn);
172 assert!(result.is_err(), "must return Err when table does not exist");
173 }
174
175 #[test]
176 fn latest_schema_version_returns_max_version() {
177 let conn = create_db_with_history(6);
178 let version = latest_schema_version(&conn).unwrap();
179 assert_eq!(version, 6u32);
180 }
181
182 #[test]
183 fn migrate_response_serializes_required_fields() {
184 let resp = MigrateResponse {
185 db_path: "/tmp/test.sqlite".to_string(),
186 schema_version: 6,
187 status: "ok".to_string(),
188 elapsed_ms: 12,
189 };
190 let json = serde_json::to_value(&resp).unwrap();
191 assert_eq!(json["status"], "ok");
192 assert_eq!(json["schema_version"], 6);
193 assert_eq!(json["db_path"], "/tmp/test.sqlite");
194 assert_eq!(json["elapsed_ms"], 12);
195 }
196
197 #[test]
198 fn latest_schema_version_returns_zero_when_table_empty() {
199 let conn = Connection::open_in_memory().expect("in-memory db");
200 conn.execute_batch(
201 "CREATE TABLE refinery_schema_history (
202 version INTEGER NOT NULL,
203 name TEXT
204 );",
205 )
206 .expect("table creation");
207 let version = latest_schema_version(&conn).unwrap();
209 assert_eq!(version, 0u32);
210 }
211}