arcly-http 0.2.2

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Database migration lifecycle — versioned, checksummed, lock-guarded.
//!
//! Boot-time only (runs in `ArclyPlugin::on_init`, before the container
//! freezes) — the request hot path never sees this module.
//!
//! ## Guarantees
//!
//! - **Ledger** — every applied migration is recorded in `arcly_migrations`
//!   (version, name, checksum, applied_at) per datasource; re-runs skip.
//! - **Drift detection** — if the code's checksum for an applied version no
//!   longer matches the ledger, the boot **fails loudly**: someone edited a
//!   shipped migration, which silently forks schemas across environments.
//! - **Fleet safety** — `run` takes the cluster `DLockBackend`; replicas
//!   booting simultaneously serialize on `arcly:lock:migrate:{pool}` instead
//!   of racing DDL.
//! - **Multi-tenant** — `run_all` walks every datasource in the registry
//!   (deterministic name order) and reports per pool.
//!
//! ## Contract
//!
//! One SQL statement per `Migration::up` (portable across the Any driver's
//! prepared-statement path). Each statement runs inside its own transaction
//! together with its ledger insert, so a failed step leaves no half-applied
//! version behind.

use futures::future::BoxFuture;

use crate::data::db::ArclyDbPool;
use crate::data::{DataError, DataSourceRegistry};
use crate::resilience::{DLockBackend, DistributedLock};

// ─── Migration contract ───────────────────────────────────────────────────────

pub trait Migration: Send + Sync + 'static {
    /// Monotonic version, e.g. `2026_0611_0001`. Duplicates are rejected at
    /// registration.
    fn version(&self) -> u64;
    fn name(&self) -> &'static str;
    /// One SQL statement.
    fn up(&self) -> &'static str;

    /// FNV-1a of `up()` — stored in the ledger; mismatch on a later boot is
    /// schema drift and fails the boot.
    fn checksum(&self) -> u64 {
        self.up().bytes().fold(0xcbf2_9ce4_8422_2325u64, |h, b| {
            (h ^ b as u64).wrapping_mul(0x0000_0100_0000_01b3)
        })
    }
}

// ─── Runner ───────────────────────────────────────────────────────────────────

pub struct MigrationReport {
    pub pool: &'static str,
    pub applied: Vec<&'static str>,
    pub skipped: usize,
}

pub struct MigrationRunner {
    migrations: Vec<Box<dyn Migration>>,
    lock_ttl_ms: u64,
}

impl MigrationRunner {
    pub fn new() -> Self {
        Self {
            migrations: Vec::new(),
            lock_ttl_ms: 60_000,
        }
    }

    /// Register a migration. Panics on duplicate versions — a boot-time
    /// programming error that must never reach production silently.
    #[allow(clippy::should_implement_trait)] // builder-style, not arithmetic
    pub fn add(mut self, m: impl Migration) -> Self {
        assert!(
            !self.migrations.iter().any(|e| e.version() == m.version()),
            "duplicate migration version {}",
            m.version(),
        );
        self.migrations.push(Box::new(m));
        self.migrations.sort_by_key(|m| m.version());
        self
    }

    /// Apply pending migrations to one pool under the cluster migration lock.
    pub fn run<'a>(
        &'a self,
        pool: &'a ArclyDbPool,
        lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
    ) -> BoxFuture<'a, Result<MigrationReport, DataError>> {
        Box::pin(async move {
            // Serialize DDL across the fleet: poll the lock briefly — another
            // replica applying the same migrations is success for us too,
            // but we re-check the ledger ourselves once we get the lock.
            let lock = DistributedLock {
                name: "migrate",
                ttl_ms: self.lock_ttl_ms,
            };
            let mut guard = None;
            for _ in 0..150 {
                // ≤ 30 s wait
                if let Some(g) = lock.try_lock(lock_backend).await {
                    guard = Some(g);
                    break;
                }
                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
            }
            let _guard = guard
                .ok_or_else(|| DataError::timeout("could not obtain cluster migration lock"))?;

            use crate::data::{AccessIntent, DataSource};
            let mut conn = pool.acquire(AccessIntent::Write).await?;
            conn.execute(
                "CREATE TABLE IF NOT EXISTS arcly_migrations (
                    version    BIGINT PRIMARY KEY,
                    name       TEXT   NOT NULL,
                    checksum   TEXT   NOT NULL,
                    applied_at TEXT   NOT NULL
                )",
            )
            .await?;

            let mut report = MigrationReport {
                pool: pool.name(),
                applied: Vec::new(),
                skipped: 0,
            };

            for m in &self.migrations {
                let v = m.version();
                // Bind the version like the insert does — never interpolate
                // into ledger SQL, even for trusted values.
                let count = conn
                    .fetch_one_i64_bind(
                        "SELECT COUNT(*) FROM arcly_migrations WHERE version = ?",
                        &[&v.to_string()],
                    )
                    .await?;

                if count > 0 {
                    // Applied before — verify the shipped SQL hasn't changed.
                    let recorded = conn
                        .fetch_one_string(
                            "SELECT checksum FROM arcly_migrations WHERE version = ?",
                            &[&v.to_string()],
                        )
                        .await?;
                    let current = format!("{:016x}", m.checksum());
                    if recorded != current {
                        return Err(DataError::config(format!(
                            "schema drift on migration {v} ({}): ledger {recorded} ≠ code {current} \
                             — a shipped migration was edited; write a new version instead",
                            m.name(),
                        )));
                    }
                    report.skipped += 1;
                    continue;
                }

                // Statement + ledger row in one transaction.
                let mut tx = pool.begin().await?;
                tx.execute_bind(m.up(), &[]).await?;
                tx.execute_bind(
                    "INSERT INTO arcly_migrations (version, name, checksum, applied_at) \
                     VALUES (?, ?, ?, ?)",
                    &[
                        &v.to_string(),
                        m.name(),
                        &format!("{:016x}", m.checksum()),
                        &chrono_now(),
                    ],
                )
                .await?;
                tx.commit().await?;

                tracing::info!(
                    pool = pool.name(),
                    version = v,
                    name = m.name(),
                    "migration applied"
                );
                report.applied.push(m.name());
            }

            Ok(report)
        })
    }

    /// Apply to every datasource in the registry, deterministic name order.
    pub fn run_all<'a>(
        &'a self,
        registry: &'a DataSourceRegistry<ArclyDbPool>,
        lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
    ) -> BoxFuture<'a, Result<Vec<MigrationReport>, DataError>> {
        Box::pin(async move {
            let mut reports = Vec::new();
            for (_, pool) in registry.iter() {
                reports.push(self.run(pool, lock_backend).await?);
            }
            Ok(reports)
        })
    }
}

impl Default for MigrationRunner {
    fn default() -> Self {
        Self::new()
    }
}

fn chrono_now() -> String {
    // ISO-ish UTC seconds without pulling chrono into the core.
    let secs = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or(0);
    format!("unix:{secs}")
}