use futures::future::BoxFuture;
use crate::data::db::ArclyDbPool;
use crate::data::{DataError, DataSourceRegistry};
use crate::resilience::{DLockBackend, DistributedLock};
pub trait Migration: Send + Sync + 'static {
fn version(&self) -> u64;
fn name(&self) -> &'static str;
fn up(&self) -> &'static str;
fn checksum(&self) -> u64 {
self.up().bytes().fold(0xcbf2_9ce4_8422_2325u64, |h, b| {
(h ^ b as u64).wrapping_mul(0x0000_0100_0000_01b3)
})
}
}
pub struct MigrationReport {
pub pool: &'static str,
pub applied: Vec<&'static str>,
pub skipped: usize,
}
pub struct MigrationRunner {
migrations: Vec<Box<dyn Migration>>,
lock_ttl_ms: u64,
}
impl MigrationRunner {
pub fn new() -> Self {
Self {
migrations: Vec::new(),
lock_ttl_ms: 60_000,
}
}
#[allow(clippy::should_implement_trait)] pub fn add(mut self, m: impl Migration) -> Self {
assert!(
!self.migrations.iter().any(|e| e.version() == m.version()),
"duplicate migration version {}",
m.version(),
);
self.migrations.push(Box::new(m));
self.migrations.sort_by_key(|m| m.version());
self
}
pub fn run<'a>(
&'a self,
pool: &'a ArclyDbPool,
lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
) -> BoxFuture<'a, Result<MigrationReport, DataError>> {
Box::pin(async move {
let lock = DistributedLock {
name: "migrate",
ttl_ms: self.lock_ttl_ms,
};
let mut guard = None;
for _ in 0..150 {
if let Some(g) = lock.try_lock(lock_backend).await {
guard = Some(g);
break;
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
let _guard =
guard.ok_or_else(|| DataError("could not obtain cluster migration lock".into()))?;
use crate::data::{AccessIntent, DataSource};
let mut conn = pool.acquire(AccessIntent::Write).await?;
conn.execute(
"CREATE TABLE IF NOT EXISTS arcly_migrations (
version BIGINT PRIMARY KEY,
name TEXT NOT NULL,
checksum TEXT NOT NULL,
applied_at TEXT NOT NULL
)",
)
.await?;
let mut report = MigrationReport {
pool: pool.name(),
applied: Vec::new(),
skipped: 0,
};
for m in &self.migrations {
let v = m.version();
let count = conn
.fetch_one_i64(&format!(
"SELECT COUNT(*) FROM arcly_migrations WHERE version = {v}",
))
.await?;
if count > 0 {
let recorded = conn
.fetch_one_string(
&format!("SELECT checksum FROM arcly_migrations WHERE version = {v}"),
&[],
)
.await?;
let current = format!("{:016x}", m.checksum());
if recorded != current {
return Err(DataError(format!(
"schema drift on migration {v} ({}): ledger {recorded} ≠ code {current} \
— a shipped migration was edited; write a new version instead",
m.name(),
)));
}
report.skipped += 1;
continue;
}
let mut tx = pool.begin().await?;
tx.execute_bind(m.up(), &[]).await?;
tx.execute_bind(
"INSERT INTO arcly_migrations (version, name, checksum, applied_at) \
VALUES (?, ?, ?, ?)",
&[
&v.to_string(),
m.name(),
&format!("{:016x}", m.checksum()),
&chrono_now(),
],
)
.await?;
tx.commit().await?;
tracing::info!(
pool = pool.name(),
version = v,
name = m.name(),
"migration applied"
);
report.applied.push(m.name());
}
Ok(report)
})
}
pub fn run_all<'a>(
&'a self,
registry: &'a DataSourceRegistry<ArclyDbPool>,
lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
) -> BoxFuture<'a, Result<Vec<MigrationReport>, DataError>> {
Box::pin(async move {
let mut reports = Vec::new();
for (_, pool) in registry.iter() {
reports.push(self.run(pool, lock_backend).await?);
}
Ok(reports)
})
}
}
impl Default for MigrationRunner {
fn default() -> Self {
Self::new()
}
}
fn chrono_now() -> String {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
format!("unix:{secs}")
}