Skip to main content

nodedb_cluster/metadata_group/
migration_state.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Durable migration state: `MigrationStateTable`, `MigrationId`,
4//! `MigrationCheckpointPayload`, and `MigrationPhaseTag`.
5//!
6//! `MigrationStateTable` is an `Arc<Mutex<_>>`-backed redb table
7//! (keyed by `MigrationId` as a hyphenated UUID string) that stores
8//! the latest `PersistedMigrationCheckpoint` for each in-flight
9//! migration.  Rows are upserted on every committed `MigrationCheckpoint`
10//! entry and deleted when a committed `MigrationAbort` has applied all
11//! compensations.
12
13use std::collections::HashMap;
14use std::sync::{Arc, Mutex};
15
16use serde::{Deserialize, Serialize};
17use uuid::Uuid;
18
19use crate::error::{ClusterError, MigrationCheckpointError, Result};
20use nodedb_types::Hlc;
21
22// ── Public type aliases ──────────────────────────────────────────────────────
23
24/// Unique identifier for a single migration run.  A new UUID is
25/// generated for every `MigrationExecutor::execute` call so that a
26/// resumed migration can be distinguished from a completely new one
27/// targeting the same vShard.
28pub type MigrationId = Uuid;
29
30// ── Phase tag ────────────────────────────────────────────────────────────────
31
32/// Discriminant-only label for the current phase of a migration.
33///
34/// Stored inside every `MigrationCheckpointPayload` so the recovery
35/// path can route without pattern-matching the full payload.
36#[derive(
37    Debug,
38    Clone,
39    Copy,
40    PartialEq,
41    Eq,
42    PartialOrd,
43    Ord,
44    Serialize,
45    Deserialize,
46    zerompk::ToMessagePack,
47    zerompk::FromMessagePack,
48)]
49pub enum MigrationPhaseTag {
50    AddLearner,
51    CatchUp,
52    PromoteLearner,
53    LeadershipTransfer,
54    Cutover,
55    Complete,
56}
57
58// ── Per-phase checkpoint payloads ────────────────────────────────────────────
59
60/// Payload written to the metadata Raft group at each phase boundary.
61///
62/// Each variant carries enough information to resume the migration from
63/// scratch if the coordinator crashes between two phases.
64#[derive(
65    Debug,
66    Clone,
67    PartialEq,
68    Eq,
69    Serialize,
70    Deserialize,
71    zerompk::ToMessagePack,
72    zerompk::FromMessagePack,
73)]
74pub enum MigrationCheckpointPayload {
75    /// Written just before `propose_conf_change(AddLearner)`.
76    AddLearner {
77        vshard_id: u32,
78        source_node: u64,
79        target_node: u64,
80        source_group: u64,
81        write_pause_budget_us: u64,
82        started_at_hlc: Hlc,
83    },
84    /// Written just after `AddLearner` conf-change commits.
85    CatchUp {
86        vshard_id: u32,
87        /// Log index at which `AddLearner` committed.  The resuming
88        /// coordinator uses this to skip re-proposing `AddLearner` if
89        /// the learner is already present in the group.
90        learner_log_index_at_add: u64,
91    },
92    /// Written just after `PromoteLearner` conf-change commits.
93    PromoteLearner {
94        vshard_id: u32,
95        target_node: u64,
96        source_group: u64,
97    },
98    /// Written just before `propose_and_wait(LeadershipTransfer)`.
99    LeadershipTransfer {
100        vshard_id: u32,
101        /// Whether `PromoteLearner` has already committed.
102        target_is_voter: bool,
103        new_leader_node_id: u64,
104        source_group: u64,
105    },
106    /// Written after `propose_and_wait(LeadershipTransfer)` returns
107    /// success (cut-over committed).
108    Cutover {
109        vshard_id: u32,
110        new_leader_node_id: u64,
111        source_group: u64,
112    },
113    /// Written after the ghost stub is installed and `save_ghosts` succeeds.
114    Complete {
115        vshard_id: u32,
116        actual_pause_us: u64,
117        ghost_stub_installed: bool,
118    },
119}
120
121impl MigrationCheckpointPayload {
122    /// Returns the phase tag matching this payload variant.
123    pub fn phase_tag(&self) -> MigrationPhaseTag {
124        match self {
125            Self::AddLearner { .. } => MigrationPhaseTag::AddLearner,
126            Self::CatchUp { .. } => MigrationPhaseTag::CatchUp,
127            Self::PromoteLearner { .. } => MigrationPhaseTag::PromoteLearner,
128            Self::LeadershipTransfer { .. } => MigrationPhaseTag::LeadershipTransfer,
129            Self::Cutover { .. } => MigrationPhaseTag::Cutover,
130            Self::Complete { .. } => MigrationPhaseTag::Complete,
131        }
132    }
133
134    /// Encode as zerompk bytes for CRC32C computation.
135    pub fn to_bytes(&self) -> Result<Vec<u8>> {
136        zerompk::to_msgpack_vec(self).map_err(|e| {
137            ClusterError::MigrationCheckpoint(MigrationCheckpointError::Codec {
138                detail: format!("payload encode: {e}"),
139            })
140        })
141    }
142
143    /// Compute the CRC32C of the encoded payload bytes.
144    pub fn crc32c(&self) -> Result<u32> {
145        let bytes = self.to_bytes()?;
146        Ok(crc32c::crc32c(&bytes))
147    }
148}
149
150// ── Persisted checkpoint row ─────────────────────────────────────────────────
151
152/// The value stored in the `MIGRATION_STATE_TABLE` redb table.
153///
154/// One row per `MigrationId`; upserted on every committed
155/// `MigrationCheckpoint` and deleted when `MigrationAbort` applies.
156#[derive(
157    Debug,
158    Clone,
159    PartialEq,
160    Eq,
161    Serialize,
162    Deserialize,
163    zerompk::ToMessagePack,
164    zerompk::FromMessagePack,
165)]
166pub struct PersistedMigrationCheckpoint {
167    pub migration_id: String, // UUID as hyphenated string
168    pub attempt: u32,
169    pub payload: MigrationCheckpointPayload,
170    pub crc32c: u32,
171    /// Wall-clock milliseconds at proposal time (non-authoritative —
172    /// used only for timeout-based abort decisions during recovery).
173    pub ts_ms: u64,
174}
175
176impl PersistedMigrationCheckpoint {
177    pub fn migration_uuid(&self) -> Option<MigrationId> {
178        self.migration_id.parse().ok()
179    }
180}
181
182// ── MigrationStateTable ──────────────────────────────────────────────────────
183
184/// In-memory mirror of `_cluster.migration_state`.
185///
186/// Mutations go through `MigrationStateTable` methods which update both
187/// the in-memory `HashMap` and the backing redb table atomically.
188/// The `Arc<Mutex<_>>` wrapper allows the table to be shared across the
189/// `CacheApplier` and `MigrationExecutor` without a lifetime tie-in.
190///
191/// `ClusterCatalog` is held as `Arc` to survive actor hand-offs.
192pub struct MigrationStateTable {
193    /// In-memory view keyed by UUID string (matches redb key format).
194    mem: HashMap<String, PersistedMigrationCheckpoint>,
195    db: Arc<redb::Database>,
196}
197
198impl MigrationStateTable {
199    pub const TABLE: redb::TableDefinition<'static, &'static str, &'static [u8]> =
200        redb::TableDefinition::new("_cluster.migration_state");
201
202    /// Create a new, empty in-memory table backed by `db`.
203    pub fn new(db: Arc<redb::Database>) -> Self {
204        Self {
205            mem: HashMap::new(),
206            db,
207        }
208    }
209
210    /// Load all persisted checkpoints from redb into the in-memory map.
211    ///
212    /// Called once at startup before recovery runs.
213    pub fn load_all(&mut self) -> Result<()> {
214        let txn = self.db.begin_read().map_err(|e| ClusterError::Storage {
215            detail: format!("migration_state begin_read: {e}"),
216        })?;
217        let table = txn
218            .open_table(Self::TABLE)
219            .map_err(|e| ClusterError::Storage {
220                detail: format!("migration_state open_table: {e}"),
221            })?;
222        let range = table.range::<&str>(..).map_err(|e| ClusterError::Storage {
223            detail: format!("migration_state range: {e}"),
224        })?;
225        for entry in range {
226            let (key, value) = entry.map_err(|e| ClusterError::Storage {
227                detail: format!("migration_state iter: {e}"),
228            })?;
229            let key_str = key.value().to_owned();
230            match zerompk::from_msgpack::<PersistedMigrationCheckpoint>(value.value()) {
231                Ok(row) => {
232                    self.mem.insert(key_str, row);
233                }
234                Err(e) => {
235                    tracing::warn!(key = %key_str, error = %e, "migration_state: corrupt row skipped");
236                }
237            }
238        }
239        Ok(())
240    }
241
242    /// Upsert a checkpoint row (in-memory + redb).
243    ///
244    /// Idempotent: if the exact `(migration_id, phase, attempt)` tuple
245    /// is already stored, this is a no-op.
246    pub fn upsert(&mut self, row: PersistedMigrationCheckpoint) -> Result<()> {
247        let key = row.migration_id.clone();
248        // Idempotency: skip if identical (migration_id, phase_tag, attempt) already stored.
249        if let Some(existing) = self.mem.get(&key)
250            && existing.payload.phase_tag() == row.payload.phase_tag()
251            && existing.attempt == row.attempt
252        {
253            return Ok(());
254        }
255        // Encode and persist.
256        let bytes = zerompk::to_msgpack_vec(&row).map_err(|e| ClusterError::Codec {
257            detail: format!("migration_state encode: {e}"),
258        })?;
259        let txn = self.db.begin_write().map_err(|e| ClusterError::Storage {
260            detail: format!("migration_state begin_write: {e}"),
261        })?;
262        {
263            let mut table = txn
264                .open_table(Self::TABLE)
265                .map_err(|e| ClusterError::Storage {
266                    detail: format!("migration_state open_table: {e}"),
267                })?;
268            table
269                .insert(key.as_str(), bytes.as_slice())
270                .map_err(|e| ClusterError::Storage {
271                    detail: format!("migration_state insert: {e}"),
272                })?;
273        }
274        txn.commit().map_err(|e| ClusterError::Storage {
275            detail: format!("migration_state commit: {e}"),
276        })?;
277        self.mem.insert(key, row);
278        Ok(())
279    }
280
281    /// Remove a checkpoint row (in-memory + redb).  No-op if not present.
282    pub fn remove(&mut self, migration_id: &MigrationId) -> Result<()> {
283        let key = migration_id.hyphenated().to_string();
284        self.mem.remove(&key);
285        let txn = self.db.begin_write().map_err(|e| ClusterError::Storage {
286            detail: format!("migration_state begin_write: {e}"),
287        })?;
288        {
289            let mut table = txn
290                .open_table(Self::TABLE)
291                .map_err(|e| ClusterError::Storage {
292                    detail: format!("migration_state open_table: {e}"),
293                })?;
294            let _ = table
295                .remove(key.as_str())
296                .map_err(|e| ClusterError::Storage {
297                    detail: format!("migration_state remove: {e}"),
298                })?;
299        }
300        txn.commit().map_err(|e| ClusterError::Storage {
301            detail: format!("migration_state commit: {e}"),
302        })?;
303        Ok(())
304    }
305
306    /// Get a snapshot of all in-flight checkpoints (in-memory view).
307    pub fn all_checkpoints(&self) -> Vec<PersistedMigrationCheckpoint> {
308        self.mem.values().cloned().collect()
309    }
310
311    /// Lookup the latest checkpoint for a migration ID.
312    pub fn get(&self, migration_id: &MigrationId) -> Option<&PersistedMigrationCheckpoint> {
313        self.mem.get(&migration_id.hyphenated().to_string())
314    }
315}
316
317// ── Shared handle ────────────────────────────────────────────────────────────
318
319/// `Arc<Mutex<MigrationStateTable>>` — the type threaded through
320/// `MetadataCache`, `CacheApplier`, and `MigrationExecutor`.
321pub type SharedMigrationStateTable = Arc<Mutex<MigrationStateTable>>;
322
323/// Construct a shared migration state table backed by `db`.
324pub fn new_shared(db: Arc<redb::Database>) -> SharedMigrationStateTable {
325    Arc::new(Mutex::new(MigrationStateTable::new(db)))
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    fn temp_db() -> Arc<redb::Database> {
333        let dir = tempfile::tempdir().unwrap();
334        let path = dir.path().join("test.redb");
335        let db = redb::Database::create(&path).unwrap();
336        // Create the table.
337        let txn = db.begin_write().unwrap();
338        {
339            let _ = txn.open_table(MigrationStateTable::TABLE).unwrap();
340        }
341        txn.commit().unwrap();
342        // Box keeps the TempDir alive — we leak it intentionally for tests.
343        std::mem::forget(dir);
344        Arc::new(db)
345    }
346
347    fn make_row(
348        id: MigrationId,
349        phase: MigrationCheckpointPayload,
350        attempt: u32,
351    ) -> PersistedMigrationCheckpoint {
352        let crc = phase.crc32c().unwrap();
353        PersistedMigrationCheckpoint {
354            migration_id: id.hyphenated().to_string(),
355            attempt,
356            payload: phase,
357            crc32c: crc,
358            ts_ms: 0,
359        }
360    }
361
362    #[test]
363    fn upsert_and_load_roundtrip() {
364        let db = temp_db();
365        let mut table = MigrationStateTable::new(Arc::clone(&db));
366
367        let id = Uuid::new_v4();
368        let payload = MigrationCheckpointPayload::AddLearner {
369            vshard_id: 5,
370            source_node: 1,
371            target_node: 2,
372            source_group: 0,
373            write_pause_budget_us: 500_000,
374            started_at_hlc: Hlc::default(),
375        };
376        let row = make_row(id, payload.clone(), 0);
377        table.upsert(row).unwrap();
378
379        // Reload from redb.
380        let mut table2 = MigrationStateTable::new(Arc::clone(&db));
381        table2.load_all().unwrap();
382        let loaded = table2.get(&id).unwrap();
383        assert_eq!(loaded.payload, payload);
384        assert_eq!(loaded.attempt, 0);
385    }
386
387    #[test]
388    fn idempotent_upsert_same_phase_attempt() {
389        let db = temp_db();
390        let mut table = MigrationStateTable::new(Arc::clone(&db));
391
392        let id = Uuid::new_v4();
393        let payload = MigrationCheckpointPayload::CatchUp {
394            vshard_id: 3,
395            learner_log_index_at_add: 10,
396        };
397        let row = make_row(id, payload, 0);
398        table.upsert(row.clone()).unwrap();
399        table.upsert(row).unwrap(); // second upsert — must be no-op
400        assert_eq!(table.all_checkpoints().len(), 1);
401    }
402
403    #[test]
404    fn remove_deletes_from_redb() {
405        let db = temp_db();
406        let mut table = MigrationStateTable::new(Arc::clone(&db));
407
408        let id = Uuid::new_v4();
409        let payload = MigrationCheckpointPayload::Complete {
410            vshard_id: 7,
411            actual_pause_us: 100,
412            ghost_stub_installed: true,
413        };
414        table.upsert(make_row(id, payload, 1)).unwrap();
415        table.remove(&id).unwrap();
416
417        let mut table2 = MigrationStateTable::new(Arc::clone(&db));
418        table2.load_all().unwrap();
419        assert!(table2.get(&id).is_none());
420    }
421
422    #[test]
423    fn payload_crc32c_detects_corruption() {
424        let payload = MigrationCheckpointPayload::CatchUp {
425            vshard_id: 9,
426            learner_log_index_at_add: 42,
427        };
428        let mut bytes = payload.to_bytes().unwrap();
429        // Flip a byte.
430        bytes[0] ^= 0xFF;
431        // The CRC of the original should differ from the CRC of the mutated bytes.
432        let original_crc = payload.crc32c().unwrap();
433        let corrupted_crc = crc32c::crc32c(&bytes);
434        assert_ne!(original_crc, corrupted_crc);
435    }
436
437    #[test]
438    fn zerompk_payload_roundtrip() {
439        let payload = MigrationCheckpointPayload::LeadershipTransfer {
440            vshard_id: 11,
441            target_is_voter: true,
442            new_leader_node_id: 7,
443            source_group: 2,
444        };
445        let bytes = zerompk::to_msgpack_vec(&payload).unwrap();
446        let decoded: MigrationCheckpointPayload = zerompk::from_msgpack(&bytes).unwrap();
447        assert_eq!(payload, decoded);
448    }
449}