arcly_http/data/migrate.rs
1//! Database migration lifecycle — versioned, checksummed, lock-guarded.
2//!
3//! Boot-time only (runs in `ArclyPlugin::on_init`, before the container
4//! freezes) — the request hot path never sees this module.
5//!
6//! ## Guarantees
7//!
8//! - **Ledger** — every applied migration is recorded in `arcly_migrations`
9//! (version, name, checksum, applied_at) per datasource; re-runs skip.
10//! - **Drift detection** — if the code's checksum for an applied version no
11//! longer matches the ledger, the boot **fails loudly**: someone edited a
12//! shipped migration, which silently forks schemas across environments.
13//! - **Fleet safety** — `run` takes the cluster `DLockBackend`; replicas
14//! booting simultaneously serialize on `arcly:lock:migrate:{pool}` instead
15//! of racing DDL.
16//! - **Multi-tenant** — `run_all` walks every datasource in the registry
17//! (deterministic name order) and reports per pool.
18//!
19//! ## Contract
20//!
21//! One SQL statement per `Migration::up` (portable across the Any driver's
22//! prepared-statement path). Each statement runs inside its own transaction
23//! together with its ledger insert, so a failed step leaves no half-applied
24//! version behind.
25
26use futures::future::BoxFuture;
27
28use crate::data::db::ArclyDbPool;
29use crate::data::{DataError, DataSourceRegistry};
30use crate::resilience::{DLockBackend, DistributedLock};
31
32// ─── Migration contract ───────────────────────────────────────────────────────
33
34pub trait Migration: Send + Sync + 'static {
35 /// Monotonic version, e.g. `2026_0611_0001`. Duplicates are rejected at
36 /// registration.
37 fn version(&self) -> u64;
38 fn name(&self) -> &'static str;
39 /// One SQL statement.
40 fn up(&self) -> &'static str;
41
42 /// FNV-1a of `up()` — stored in the ledger; mismatch on a later boot is
43 /// schema drift and fails the boot.
44 fn checksum(&self) -> u64 {
45 self.up().bytes().fold(0xcbf2_9ce4_8422_2325u64, |h, b| {
46 (h ^ b as u64).wrapping_mul(0x0000_0100_0000_01b3)
47 })
48 }
49}
50
51// ─── Runner ───────────────────────────────────────────────────────────────────
52
53pub struct MigrationReport {
54 pub pool: &'static str,
55 pub applied: Vec<&'static str>,
56 pub skipped: usize,
57}
58
59pub struct MigrationRunner {
60 migrations: Vec<Box<dyn Migration>>,
61 lock_ttl_ms: u64,
62}
63
64impl MigrationRunner {
65 pub fn new() -> Self {
66 Self {
67 migrations: Vec::new(),
68 lock_ttl_ms: 60_000,
69 }
70 }
71
72 /// Register a migration. Panics on duplicate versions — a boot-time
73 /// programming error that must never reach production silently.
74 #[allow(clippy::should_implement_trait)] // builder-style, not arithmetic
75 pub fn add(mut self, m: impl Migration) -> Self {
76 assert!(
77 !self.migrations.iter().any(|e| e.version() == m.version()),
78 "duplicate migration version {}",
79 m.version(),
80 );
81 self.migrations.push(Box::new(m));
82 self.migrations.sort_by_key(|m| m.version());
83 self
84 }
85
86 /// Apply pending migrations to one pool under the cluster migration lock.
87 pub fn run<'a>(
88 &'a self,
89 pool: &'a ArclyDbPool,
90 lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
91 ) -> BoxFuture<'a, Result<MigrationReport, DataError>> {
92 Box::pin(async move {
93 // Serialize DDL across the fleet: poll the lock briefly — another
94 // replica applying the same migrations is success for us too,
95 // but we re-check the ledger ourselves once we get the lock.
96 let lock = DistributedLock {
97 name: "migrate",
98 ttl_ms: self.lock_ttl_ms,
99 };
100 let mut guard = None;
101 for _ in 0..150 {
102 // ≤ 30 s wait
103 if let Some(g) = lock.try_lock(lock_backend).await {
104 guard = Some(g);
105 break;
106 }
107 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
108 }
109 let _guard = guard
110 .ok_or_else(|| DataError::timeout("could not obtain cluster migration lock"))?;
111
112 use crate::data::{AccessIntent, DataSource};
113 let mut conn = pool.acquire(AccessIntent::Write).await?;
114 conn.execute(
115 "CREATE TABLE IF NOT EXISTS arcly_migrations (
116 version BIGINT PRIMARY KEY,
117 name TEXT NOT NULL,
118 checksum TEXT NOT NULL,
119 applied_at TEXT NOT NULL
120 )",
121 )
122 .await?;
123
124 let mut report = MigrationReport {
125 pool: pool.name(),
126 applied: Vec::new(),
127 skipped: 0,
128 };
129
130 for m in &self.migrations {
131 let v = m.version();
132 // Bind the version like the insert does — never interpolate
133 // into ledger SQL, even for trusted values.
134 let count = conn
135 .fetch_one_i64_bind(
136 "SELECT COUNT(*) FROM arcly_migrations WHERE version = ?",
137 &[&v.to_string()],
138 )
139 .await?;
140
141 if count > 0 {
142 // Applied before — verify the shipped SQL hasn't changed.
143 let recorded = conn
144 .fetch_one_string(
145 "SELECT checksum FROM arcly_migrations WHERE version = ?",
146 &[&v.to_string()],
147 )
148 .await?;
149 let current = format!("{:016x}", m.checksum());
150 if recorded != current {
151 return Err(DataError::config(format!(
152 "schema drift on migration {v} ({}): ledger {recorded} ≠ code {current} \
153 — a shipped migration was edited; write a new version instead",
154 m.name(),
155 )));
156 }
157 report.skipped += 1;
158 continue;
159 }
160
161 // Statement + ledger row in one transaction.
162 let mut tx = pool.begin().await?;
163 tx.execute_bind(m.up(), &[]).await?;
164 tx.execute_bind(
165 "INSERT INTO arcly_migrations (version, name, checksum, applied_at) \
166 VALUES (?, ?, ?, ?)",
167 &[
168 &v.to_string(),
169 m.name(),
170 &format!("{:016x}", m.checksum()),
171 &chrono_now(),
172 ],
173 )
174 .await?;
175 tx.commit().await?;
176
177 tracing::info!(
178 pool = pool.name(),
179 version = v,
180 name = m.name(),
181 "migration applied"
182 );
183 report.applied.push(m.name());
184 }
185
186 Ok(report)
187 })
188 }
189
190 /// Apply to every datasource in the registry, deterministic name order.
191 pub fn run_all<'a>(
192 &'a self,
193 registry: &'a DataSourceRegistry<ArclyDbPool>,
194 lock_backend: &'a std::sync::Arc<dyn DLockBackend>,
195 ) -> BoxFuture<'a, Result<Vec<MigrationReport>, DataError>> {
196 Box::pin(async move {
197 let mut reports = Vec::new();
198 for (_, pool) in registry.iter() {
199 reports.push(self.run(pool, lock_backend).await?);
200 }
201 Ok(reports)
202 })
203 }
204}
205
206impl Default for MigrationRunner {
207 fn default() -> Self {
208 Self::new()
209 }
210}
211
212fn chrono_now() -> String {
213 // ISO-ish UTC seconds without pulling chrono into the core.
214 let secs = std::time::SystemTime::now()
215 .duration_since(std::time::UNIX_EPOCH)
216 .map(|d| d.as_secs())
217 .unwrap_or(0);
218 format!("unix:{secs}")
219}