use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Copy)]
pub struct MigrationStep {
pub version: i64,
pub description: &'static str,
}
#[derive(Debug, Clone)]
pub struct AppliedMigration {
pub version: i64,
pub description: String,
pub applied_at: DateTime<Utc>,
}
#[derive(Debug)]
pub struct MigrationError {
pub version: i64,
pub description: String,
pub cause: String,
}
impl std::fmt::Display for MigrationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "migration v{} ({}) failed: {}", self.version, self.description, self.cause)
}
}
impl std::error::Error for MigrationError {}
#[cfg(any(feature = "sqlite-memory", feature = "database-memory"))]
macro_rules! impl_sql_migration_runner {
($mod_name:ident, $pool_ty:ty, $int_type:expr) => {
pub mod $mod_name {
use super::MigrationError;
use chrono::Utc;
use sqlx::Row;
use std::future::Future;
pub async fn run_sql_migrations<F, Fut>(
pool: &$pool_ty,
registry_table: &str,
steps: &[(i64, &str, &str)],
detect_existing: F,
) -> Result<(), adk_core::AdkError>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<bool, adk_core::AdkError>>,
{
let create_sql = format!(
"CREATE TABLE IF NOT EXISTS {registry_table} (\
version {} PRIMARY KEY, \
description TEXT NOT NULL, \
applied_at TEXT NOT NULL\
)",
$int_type
);
sqlx::query(&create_sql).execute(pool).await.map_err(|e| {
adk_core::AdkError::memory(format!("migration registry creation failed: {e}"))
})?;
let max_sql =
format!("SELECT COALESCE(MAX(version), 0) AS max_v FROM {registry_table}");
let row = sqlx::query(&max_sql).fetch_one(pool).await.map_err(|e| {
adk_core::AdkError::memory(format!("migration registry read failed: {e}"))
})?;
let mut max_applied: i64 = row.try_get("max_v").map_err(|e| {
adk_core::AdkError::memory(format!("migration registry read failed: {e}"))
})?;
if max_applied == 0 {
let existing = detect_existing().await?;
if existing {
if let Some(&(v, desc, _)) = steps.first() {
let now = Utc::now().to_rfc3339();
let ins = format!(
"INSERT INTO {registry_table} \
(version, description, applied_at) \
VALUES ({v}, '{desc}', '{now}')"
);
sqlx::query(&ins).execute(pool).await.map_err(|e| {
adk_core::AdkError::memory(format!(
"{}",
MigrationError {
version: v,
description: desc.to_string(),
cause: e.to_string(),
}
))
})?;
max_applied = v;
}
}
}
let max_compiled = steps.last().map(|s| s.0).unwrap_or(0);
if max_applied > max_compiled {
return Err(adk_core::AdkError::memory(format!(
"schema version mismatch: database is at v{max_applied} \
but code only knows up to v{max_compiled}. \
Upgrade your ADK version."
)));
}
for &(version, description, sql) in steps {
if version <= max_applied {
continue;
}
let mut tx = pool.begin().await.map_err(|e| {
adk_core::AdkError::memory(format!(
"{}",
MigrationError {
version,
description: description.to_string(),
cause: format!("transaction begin failed: {e}"),
}
))
})?;
sqlx::raw_sql(sql).execute(&mut *tx).await.map_err(|e| {
adk_core::AdkError::memory(format!(
"{}",
MigrationError {
version,
description: description.to_string(),
cause: e.to_string(),
}
))
})?;
let now = Utc::now().to_rfc3339();
let rec = format!(
"INSERT INTO {registry_table} \
(version, description, applied_at) \
VALUES ({version}, '{description}', '{now}')"
);
sqlx::query(&rec).execute(&mut *tx).await.map_err(|e| {
adk_core::AdkError::memory(format!(
"{}",
MigrationError {
version,
description: description.to_string(),
cause: format!("registry record failed: {e}"),
}
))
})?;
tx.commit().await.map_err(|e| {
adk_core::AdkError::memory(format!(
"{}",
MigrationError {
version,
description: description.to_string(),
cause: format!("transaction commit failed: {e}"),
}
))
})?;
}
Ok(())
}
pub async fn sql_schema_version(
pool: &$pool_ty,
registry_table: &str,
) -> Result<i64, adk_core::AdkError> {
let sql =
format!("SELECT COALESCE(MAX(version), 0) AS max_v FROM {registry_table}");
match sqlx::query(&sql).fetch_one(pool).await {
Ok(row) => {
let version: i64 = row.try_get("max_v").unwrap_or(0);
Ok(version)
}
Err(_) => Ok(0),
}
}
}
};
}
#[cfg(feature = "sqlite-memory")]
impl_sql_migration_runner!(sqlite_runner, sqlx::SqlitePool, "INTEGER");
#[cfg(feature = "database-memory")]
impl_sql_migration_runner!(pg_runner, sqlx::PgPool, "BIGINT");