use std::path::Path;
use serde_json::{json, Value};
use crate::connection::DatabaseClient;
use crate::error::{Result, SurqlError};
use crate::migration::hooks::is_auto_snapshot_enabled;
use crate::migration::models::MigrationHistory;
use crate::migration::versioning::{create_snapshot, store_snapshot};
use crate::schema::registry::SchemaRegistry;
pub const MIGRATION_TABLE_NAME: &str = "_migration_history";
pub async fn create_migration_table(client: &DatabaseClient) -> Result<()> {
let statements: [&str; 7] = [
"DEFINE TABLE IF NOT EXISTS _migration_history SCHEMAFULL;",
"DEFINE FIELD IF NOT EXISTS version ON TABLE _migration_history TYPE string;",
"DEFINE FIELD IF NOT EXISTS description ON TABLE _migration_history TYPE string;",
"DEFINE FIELD IF NOT EXISTS applied_at ON TABLE _migration_history TYPE datetime;",
"DEFINE FIELD IF NOT EXISTS checksum ON TABLE _migration_history TYPE string;",
"DEFINE FIELD IF NOT EXISTS execution_time_ms ON TABLE _migration_history TYPE option<int>;",
"DEFINE INDEX IF NOT EXISTS version_idx ON TABLE _migration_history COLUMNS version UNIQUE;",
];
let mut surql = String::new();
for stmt in statements {
surql.push_str(stmt);
surql.push('\n');
}
client
.query(&surql)
.await
.map_err(|e| SurqlError::MigrationHistory {
reason: format!("failed to create migration history table: {e}"),
})?;
Ok(())
}
pub async fn ensure_migration_table(client: &DatabaseClient) -> Result<()> {
create_migration_table(client).await
}
pub async fn record_migration(client: &DatabaseClient, entry: &MigrationHistory) -> Result<()> {
ensure_migration_table(client).await?;
let mut vars: std::collections::BTreeMap<String, Value> = std::collections::BTreeMap::new();
vars.insert("id".into(), Value::String(record_id_for(&entry.version)));
vars.insert("version".into(), Value::String(entry.version.clone()));
vars.insert(
"description".into(),
Value::String(entry.description.clone()),
);
vars.insert(
"applied_at".into(),
Value::String(entry.applied_at.to_rfc3339()),
);
vars.insert("checksum".into(), Value::String(entry.checksum.clone()));
let mut set = String::from(
"version = $version, description = $description, \
applied_at = <datetime> $applied_at, checksum = $checksum",
);
if let Some(ms) = entry.execution_time_ms {
vars.insert("execution_time_ms".into(), json!(ms));
set.push_str(", execution_time_ms = $execution_time_ms");
}
let surql = format!(
"CREATE type::record('{table}', $id) SET {set};",
table = MIGRATION_TABLE_NAME,
);
client
.query_with_vars(&surql, vars)
.await
.map_err(|e| SurqlError::MigrationHistory {
reason: format!("failed to record migration {}: {e}", entry.version),
})?;
Ok(())
}
pub async fn remove_migration_record(client: &DatabaseClient, version: &str) -> Result<()> {
ensure_migration_table(client).await?;
let surql = format!(
"DELETE FROM {table} WHERE version = $version;",
table = MIGRATION_TABLE_NAME,
);
let mut vars: std::collections::BTreeMap<String, Value> = std::collections::BTreeMap::new();
vars.insert("version".into(), Value::String(version.to_string()));
client
.query_with_vars(&surql, vars)
.await
.map_err(|e| SurqlError::MigrationHistory {
reason: format!("failed to remove migration record {version}: {e}"),
})?;
Ok(())
}
pub async fn get_applied_migrations(client: &DatabaseClient) -> Result<Vec<MigrationHistory>> {
ensure_migration_table(client).await?;
let surql = format!(
"SELECT * FROM {table} ORDER BY applied_at ASC;",
table = MIGRATION_TABLE_NAME,
);
let raw = client
.query(&surql)
.await
.map_err(|e| SurqlError::MigrationHistory {
reason: format!("failed to fetch applied migrations: {e}"),
})?;
Ok(parse_history_rows(&raw))
}
pub async fn is_migration_applied(client: &DatabaseClient, version: &str) -> Result<bool> {
ensure_migration_table(client).await?;
let surql = format!(
"SELECT * FROM {table} WHERE version = $version LIMIT 1;",
table = MIGRATION_TABLE_NAME,
);
let mut vars: std::collections::BTreeMap<String, Value> = std::collections::BTreeMap::new();
vars.insert("version".into(), Value::String(version.to_string()));
let raw =
client
.query_with_vars(&surql, vars)
.await
.map_err(|e| SurqlError::MigrationHistory {
reason: format!("failed to query migration {version}: {e}"),
})?;
Ok(!parse_history_rows(&raw).is_empty())
}
pub async fn get_migration_history(client: &DatabaseClient) -> Result<Vec<MigrationHistory>> {
get_applied_migrations(client).await
}
pub fn auto_snapshot_after_apply(registry: &SchemaRegistry, snapshots_dir: &Path, version: &str) {
if !is_auto_snapshot_enabled() {
return;
}
match create_snapshot(registry, version, format!("auto: {version}")) {
Ok(snapshot) => {
if let Err(err) = store_snapshot(&snapshot, snapshots_dir) {
tracing::warn!(target: "surql::migration::history", %err, %version, "auto_snapshot_store_failed");
}
}
Err(err) => {
tracing::warn!(target: "surql::migration::history", %err, %version, "auto_snapshot_create_failed");
}
}
}
fn record_id_for(version: &str) -> String {
version
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
.collect()
}
fn parse_history_rows(raw: &Value) -> Vec<MigrationHistory> {
let mut out = Vec::new();
collect_rows(raw, &mut out);
out
}
fn collect_rows(value: &Value, out: &mut Vec<MigrationHistory>) {
match value {
Value::Array(items) => {
for item in items {
collect_rows(item, out);
}
}
Value::Object(obj) => {
if let Some(inner) = obj.get("result") {
collect_rows(inner, out);
return;
}
if let Some(entry) = history_from_object(obj) {
out.push(entry);
}
}
_ => {}
}
}
fn history_from_object(obj: &serde_json::Map<String, Value>) -> Option<MigrationHistory> {
let version = obj.get("version").and_then(Value::as_str)?.to_string();
let description = obj
.get("description")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let checksum = obj
.get("checksum")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let applied_at = obj.get("applied_at").and_then(parse_datetime)?;
let execution_time_ms = obj.get("execution_time_ms").and_then(|v| match v {
Value::Number(n) => n.as_u64(),
_ => None,
});
Some(MigrationHistory {
version,
description,
applied_at,
checksum,
execution_time_ms,
})
}
fn parse_datetime(value: &Value) -> Option<chrono::DateTime<chrono::Utc>> {
let s = value.as_str()?;
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
return Some(dt.with_timezone(&chrono::Utc));
}
if let Ok(dt) = chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.fZ") {
return Some(dt.with_timezone(&chrono::Utc));
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use serde_json::json;
#[test]
fn record_id_sanitises_separators() {
assert_eq!(record_id_for("20260102_120000"), "20260102_120000");
assert_eq!(record_id_for("20260102-120000"), "20260102_120000");
assert_eq!(record_id_for("v1.2.3"), "v1_2_3");
}
#[test]
fn parse_history_rows_extracts_nested_result() {
let raw = json!([{
"result": [{
"version": "v1",
"description": "initial",
"applied_at": "2026-01-02T12:00:00Z",
"checksum": "abc",
"execution_time_ms": 42,
}],
}]);
let rows = parse_history_rows(&raw);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].version, "v1");
assert_eq!(rows[0].execution_time_ms, Some(42));
}
#[test]
fn parse_history_rows_accepts_flat_array() {
let raw = json!([{
"version": "v1",
"description": "d",
"applied_at": "2026-01-02T12:00:00Z",
"checksum": "abc",
}]);
let rows = parse_history_rows(&raw);
assert_eq!(rows.len(), 1);
assert!(rows[0].execution_time_ms.is_none());
}
#[test]
fn parse_history_rows_skips_rows_without_timestamp() {
let raw = json!([{ "result": [{ "version": "v1", "description": "d", "checksum": "c" }] }]);
let rows = parse_history_rows(&raw);
assert!(rows.is_empty());
}
#[test]
fn parse_datetime_handles_rfc3339() {
let v = json!("2026-01-02T12:00:00Z");
let dt = parse_datetime(&v).unwrap();
let expected = Utc.with_ymd_and_hms(2026, 1, 2, 12, 0, 0).unwrap();
assert_eq!(dt, expected);
}
#[test]
fn migration_table_name_constant_matches_python() {
assert_eq!(MIGRATION_TABLE_NAME, "_migration_history");
}
}