Skip to main content

arcly_http/data/
migrate.rs

1//! Database migration lifecycle — versioned, checksummed, lock-guarded.
2//!
3//! Boot-time only (runs in `ArclyPlugin::on_init`, before the container
4//! freezes) — the request hot path never sees this module.
5//!
6//! ## Guarantees
7//!
8//! - **Ledger** — every applied migration is recorded in `arcly_migrations`
9//!   (version, name, checksum, applied_at) per datasource; re-runs skip.
10//! - **Drift detection** — if the code's checksum for an applied version no
11//!   longer matches the ledger, the boot **fails loudly**: someone edited a
12//!   shipped migration, which silently forks schemas across environments.
13//! - **Fleet safety** — `run` takes the cluster `DLockBackend`; replicas
14//!   booting simultaneously serialize on `arcly:lock:migrate:{pool}` instead
15//!   of racing DDL.
16//! - **Multi-tenant** — `run_all` walks every datasource in the registry
17//!   (deterministic name order) and reports per pool.
18//!
19//! ## Contract
20//!
21//! One SQL statement per `Migration::up` (portable across the Any driver's
22//! prepared-statement path). Each statement runs inside its own transaction
23//! together with its ledger insert, so a failed step leaves no half-applied
24//! version behind.
25
26use futures::future::BoxFuture;
27
28use crate::data::db::ArclyDbPool;
29use crate::data::{DataError, DataSourceRegistry};
30use crate::resilience::{DLockBackend, DistributedLock};
31
32// ─── Migration contract ───────────────────────────────────────────────────────
33
34pub trait Migration: Send + Sync + 'static {
35    /// Monotonic version, e.g. `2026_0611_0001`. Duplicates are rejected at
36    /// registration.
37    fn version(&self) -> u64;
38    fn name(&self) -> &'static str;
39    /// One SQL statement.
40    fn up(&self) -> &'static str;
41
42    /// FNV-1a of `up()` — stored in the ledger; mismatch on a later boot is
43    /// schema drift and fails the boot.
44    fn checksum(&self) -> u64 {
45        self.up().bytes().fold(0xcbf2_9ce4_8422_2325u64, |h, b| {
46            (h ^ b as u64).wrapping_mul(0x0000_0100_0000_01b3)
47        })
48    }
49}
50
51// ─── Runner ───────────────────────────────────────────────────────────────────
52
53pub struct MigrationReport {
54    pub pool: &'static str,
55    pub applied: Vec<&'static str>,
56    pub skipped: usize,
57}
58
59pub struct MigrationRunner {
60    migrations: Vec<Box<dyn Migration>>,
61    lock_ttl_ms: u64,
62}
63
64impl MigrationRunner {
65    pub fn new() -> Self {
66        Self {
67            migrations: Vec::new(),
68            lock_ttl_ms: 60_000,
69        }
70    }
71
72    /// Register a migration. Panics on duplicate versions — a boot-time
73    /// programming error that must never reach production silently.
74    #[allow(clippy::should_implement_trait)] // builder-style, not arithmetic
75    pub fn add(mut self, m: impl Migration) -> Self {
76        assert!(
77            !self.migrations.iter().any(|e| e.version() == m.version()),
78            "duplicate migration version {}",
79            m.version(),
80        );
81        self.migrations.push(Box::new(m));
82        self.migrations.sort_by_key(|m| m.version());
83        self
84    }
85
86    /// Apply pending migrations to one pool under the cluster migration lock.
87    pub fn run<'a>(
88        &'a self,
89        pool: &'a ArclyDbPool,
90        lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
91    ) -> BoxFuture<'a, Result<MigrationReport, DataError>> {
92        Box::pin(async move {
93            // Serialize DDL across the fleet: poll the lock briefly — another
94            // replica applying the same migrations is success for us too,
95            // but we re-check the ledger ourselves once we get the lock.
96            let lock = DistributedLock {
97                name: "migrate",
98                ttl_ms: self.lock_ttl_ms,
99            };
100            let mut guard = None;
101            for _ in 0..150 {
102                // ≤ 30 s wait
103                if let Some(g) = lock.try_lock(lock_backend).await {
104                    guard = Some(g);
105                    break;
106                }
107                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
108            }
109            let _guard = guard
110                .ok_or_else(|| DataError::timeout("could not obtain cluster migration lock"))?;
111
112            use crate::data::{AccessIntent, DataSource};
113            let mut conn = pool.acquire(AccessIntent::Write).await?;
114            conn.execute(
115                "CREATE TABLE IF NOT EXISTS arcly_migrations (
116                    version    BIGINT PRIMARY KEY,
117                    name       TEXT   NOT NULL,
118                    checksum   TEXT   NOT NULL,
119                    applied_at TEXT   NOT NULL
120                )",
121            )
122            .await?;
123
124            let mut report = MigrationReport {
125                pool: pool.name(),
126                applied: Vec::new(),
127                skipped: 0,
128            };
129
130            for m in &self.migrations {
131                let v = m.version();
132                // Bind the version like the insert does — never interpolate
133                // into ledger SQL, even for trusted values.
134                let count = conn
135                    .fetch_one_i64_bind(
136                        "SELECT COUNT(*) FROM arcly_migrations WHERE version = ?",
137                        &[&v.to_string()],
138                    )
139                    .await?;
140
141                if count > 0 {
142                    // Applied before — verify the shipped SQL hasn't changed.
143                    let recorded = conn
144                        .fetch_one_string(
145                            "SELECT checksum FROM arcly_migrations WHERE version = ?",
146                            &[&v.to_string()],
147                        )
148                        .await?;
149                    let current = format!("{:016x}", m.checksum());
150                    if recorded != current {
151                        return Err(DataError::config(format!(
152                            "schema drift on migration {v} ({}): ledger {recorded} ≠ code {current} \
153                             — a shipped migration was edited; write a new version instead",
154                            m.name(),
155                        )));
156                    }
157                    report.skipped += 1;
158                    continue;
159                }
160
161                // Statement + ledger row in one transaction.
162                let mut tx = pool.begin().await?;
163                tx.execute_bind(m.up(), &[]).await?;
164                tx.execute_bind(
165                    "INSERT INTO arcly_migrations (version, name, checksum, applied_at) \
166                     VALUES (?, ?, ?, ?)",
167                    &[
168                        &v.to_string(),
169                        m.name(),
170                        &format!("{:016x}", m.checksum()),
171                        &chrono_now(),
172                    ],
173                )
174                .await?;
175                tx.commit().await?;
176
177                tracing::info!(
178                    pool = pool.name(),
179                    version = v,
180                    name = m.name(),
181                    "migration applied"
182                );
183                report.applied.push(m.name());
184            }
185
186            Ok(report)
187        })
188    }
189
190    /// Apply to every datasource in the registry, deterministic name order.
191    pub fn run_all<'a>(
192        &'a self,
193        registry: &'a DataSourceRegistry<ArclyDbPool>,
194        lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
195    ) -> BoxFuture<'a, Result<Vec<MigrationReport>, DataError>> {
196        Box::pin(async move {
197            let mut reports = Vec::new();
198            for (_, pool) in registry.iter() {
199                reports.push(self.run(pool, lock_backend).await?);
200            }
201            Ok(reports)
202        })
203    }
204}
205
206impl Default for MigrationRunner {
207    fn default() -> Self {
208        Self::new()
209    }
210}
211
212fn chrono_now() -> String {
213    // ISO-ish UTC seconds without pulling chrono into the core.
214    let secs = std::time::SystemTime::now()
215        .duration_since(std::time::UNIX_EPOCH)
216        .map(|d| d.as_secs())
217        .unwrap_or(0);
218    format!("unix:{secs}")
219}