use crate::errors::AppError;
use crate::output;
use crate::paths::AppPaths;
use crate::storage::connection::open_rw;
use rusqlite::OptionalExtension;
use serde::Serialize;
#[derive(clap::Args)]
#[command(after_long_help = "EXAMPLES:\n \
# Apply pending schema migrations\n \
sqlite-graphrag migrate\n\n \
# Show already-applied migrations without applying new ones\n \
sqlite-graphrag migrate --status\n\n \
# Migrate a database at a custom path\n \
sqlite-graphrag migrate --db /path/to/graphrag.sqlite")]
pub struct MigrateArgs {
#[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
pub db: Option<String>,
#[arg(long, default_value_t = false)]
pub json: bool,
#[arg(long, default_value_t = false)]
pub status: bool,
}
#[derive(Serialize)]
struct MigrateResponse {
db_path: String,
schema_version: u32,
status: String,
elapsed_ms: u64,
}
#[derive(Serialize)]
struct MigrateStatusResponse {
db_path: String,
applied_migrations: Vec<MigrationEntry>,
schema_version: u32,
elapsed_ms: u64,
}
#[derive(Serialize)]
struct MigrationEntry {
version: i64,
name: String,
applied_on: Option<String>,
}
pub fn run(args: MigrateArgs) -> Result<(), AppError> {
let start = std::time::Instant::now();
let _ = args.json; let paths = AppPaths::resolve(args.db.as_deref())?;
paths.ensure_dirs()?;
let mut conn = open_rw(&paths.db)?;
if args.status {
let schema_version = latest_schema_version(&conn).unwrap_or(0);
let applied = list_applied_migrations(&conn)?;
output::emit_json(&MigrateStatusResponse {
db_path: paths.db.display().to_string(),
applied_migrations: applied,
schema_version,
elapsed_ms: start.elapsed().as_millis() as u64,
})?;
return Ok(());
}
crate::migrations::runner()
.run(&mut conn)
.map_err(|e| AppError::Internal(anyhow::anyhow!("migration failed: {e}")))?;
conn.execute_batch(&format!(
"PRAGMA user_version = {};",
crate::constants::SCHEMA_USER_VERSION
))?;
let schema_version = latest_schema_version(&conn)?;
conn.execute(
"INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('schema_version', ?1)",
rusqlite::params![schema_version],
)?;
output::emit_json(&MigrateResponse {
db_path: paths.db.display().to_string(),
schema_version,
status: "ok".to_string(),
elapsed_ms: start.elapsed().as_millis() as u64,
})?;
Ok(())
}
fn list_applied_migrations(conn: &rusqlite::Connection) -> Result<Vec<MigrationEntry>, AppError> {
let table_exists: Option<String> = conn
.query_row(
"SELECT name FROM sqlite_master WHERE type='table' AND name='refinery_schema_history'",
[],
|r| r.get(0),
)
.optional()?;
if table_exists.is_none() {
return Ok(vec![]);
}
let mut stmt = conn.prepare(
"SELECT version, name, applied_on FROM refinery_schema_history ORDER BY version ASC",
)?;
let entries = stmt
.query_map([], |r| {
Ok(MigrationEntry {
version: r.get(0)?,
name: r.get(1)?,
applied_on: r.get(2)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(entries)
}
fn latest_schema_version(conn: &rusqlite::Connection) -> Result<u32, AppError> {
match conn.query_row(
"SELECT version FROM refinery_schema_history ORDER BY version DESC LIMIT 1",
[],
|row| row.get::<_, i64>(0),
) {
Ok(version) => Ok(version.max(0) as u32),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(err) => Err(AppError::Database(err)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
fn create_db_without_history() -> Connection {
Connection::open_in_memory().expect("failed to open in-memory db")
}
fn create_db_with_history(version: i64) -> Connection {
let conn = Connection::open_in_memory().expect("failed to open in-memory db");
conn.execute_batch(
"CREATE TABLE refinery_schema_history (
version INTEGER NOT NULL,
name TEXT,
applied_on TEXT,
checksum TEXT
);",
)
.expect("failed to create history table");
conn.execute(
"INSERT INTO refinery_schema_history (version, name) VALUES (?1, 'V001__init')",
rusqlite::params![version],
)
.expect("failed to insert version");
conn
}
#[test]
fn latest_schema_version_returns_error_without_table() {
let conn = create_db_without_history();
let result = latest_schema_version(&conn);
assert!(result.is_err(), "must return Err when table does not exist");
}
#[test]
fn latest_schema_version_returns_max_version() {
let conn = create_db_with_history(6);
let version = latest_schema_version(&conn).unwrap();
assert_eq!(version, 6u32);
}
#[test]
fn migrate_response_serializes_required_fields() {
let resp = MigrateResponse {
db_path: "/tmp/test.sqlite".to_string(),
schema_version: 6,
status: "ok".to_string(),
elapsed_ms: 12,
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["status"], "ok");
assert_eq!(json["schema_version"], 6);
assert_eq!(json["db_path"], "/tmp/test.sqlite");
assert_eq!(json["elapsed_ms"], 12);
}
#[test]
fn latest_schema_version_returns_zero_when_table_empty() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(
"CREATE TABLE refinery_schema_history (
version INTEGER NOT NULL,
name TEXT
);",
)
.expect("table creation");
let version = latest_schema_version(&conn).unwrap();
assert_eq!(version, 0u32);
}
}