nodedb_cluster/metadata_group/
migration_state.rs1use std::collections::HashMap;
14use std::sync::{Arc, Mutex};
15
16use serde::{Deserialize, Serialize};
17use uuid::Uuid;
18
19use crate::error::{ClusterError, MigrationCheckpointError, Result};
20use nodedb_types::Hlc;
21
22pub type MigrationId = Uuid;
29
30#[derive(
37 Debug,
38 Clone,
39 Copy,
40 PartialEq,
41 Eq,
42 PartialOrd,
43 Ord,
44 Serialize,
45 Deserialize,
46 zerompk::ToMessagePack,
47 zerompk::FromMessagePack,
48)]
49pub enum MigrationPhaseTag {
50 AddLearner,
51 CatchUp,
52 PromoteLearner,
53 LeadershipTransfer,
54 Cutover,
55 Complete,
56}
57
58#[derive(
65 Debug,
66 Clone,
67 PartialEq,
68 Eq,
69 Serialize,
70 Deserialize,
71 zerompk::ToMessagePack,
72 zerompk::FromMessagePack,
73)]
74pub enum MigrationCheckpointPayload {
75 AddLearner {
77 vshard_id: u32,
78 source_node: u64,
79 target_node: u64,
80 source_group: u64,
81 write_pause_budget_us: u64,
82 started_at_hlc: Hlc,
83 },
84 CatchUp {
86 vshard_id: u32,
87 learner_log_index_at_add: u64,
91 },
92 PromoteLearner {
94 vshard_id: u32,
95 target_node: u64,
96 source_group: u64,
97 },
98 LeadershipTransfer {
100 vshard_id: u32,
101 target_is_voter: bool,
103 new_leader_node_id: u64,
104 source_group: u64,
105 },
106 Cutover {
109 vshard_id: u32,
110 new_leader_node_id: u64,
111 source_group: u64,
112 },
113 Complete {
115 vshard_id: u32,
116 actual_pause_us: u64,
117 ghost_stub_installed: bool,
118 },
119}
120
121impl MigrationCheckpointPayload {
122 pub fn phase_tag(&self) -> MigrationPhaseTag {
124 match self {
125 Self::AddLearner { .. } => MigrationPhaseTag::AddLearner,
126 Self::CatchUp { .. } => MigrationPhaseTag::CatchUp,
127 Self::PromoteLearner { .. } => MigrationPhaseTag::PromoteLearner,
128 Self::LeadershipTransfer { .. } => MigrationPhaseTag::LeadershipTransfer,
129 Self::Cutover { .. } => MigrationPhaseTag::Cutover,
130 Self::Complete { .. } => MigrationPhaseTag::Complete,
131 }
132 }
133
134 pub fn to_bytes(&self) -> Result<Vec<u8>> {
136 zerompk::to_msgpack_vec(self).map_err(|e| {
137 ClusterError::MigrationCheckpoint(MigrationCheckpointError::Codec {
138 detail: format!("payload encode: {e}"),
139 })
140 })
141 }
142
143 pub fn crc32c(&self) -> Result<u32> {
145 let bytes = self.to_bytes()?;
146 Ok(crc32c::crc32c(&bytes))
147 }
148}
149
150#[derive(
157 Debug,
158 Clone,
159 PartialEq,
160 Eq,
161 Serialize,
162 Deserialize,
163 zerompk::ToMessagePack,
164 zerompk::FromMessagePack,
165)]
166pub struct PersistedMigrationCheckpoint {
167 pub migration_id: String, pub attempt: u32,
169 pub payload: MigrationCheckpointPayload,
170 pub crc32c: u32,
171 pub ts_ms: u64,
174}
175
176impl PersistedMigrationCheckpoint {
177 pub fn migration_uuid(&self) -> Option<MigrationId> {
178 self.migration_id.parse().ok()
179 }
180}
181
182pub struct MigrationStateTable {
193 mem: HashMap<String, PersistedMigrationCheckpoint>,
195 db: Arc<redb::Database>,
196}
197
198impl MigrationStateTable {
199 pub const TABLE: redb::TableDefinition<'static, &'static str, &'static [u8]> =
200 redb::TableDefinition::new("_cluster.migration_state");
201
202 pub fn new(db: Arc<redb::Database>) -> Self {
204 Self {
205 mem: HashMap::new(),
206 db,
207 }
208 }
209
210 pub fn load_all(&mut self) -> Result<()> {
214 let txn = self.db.begin_read().map_err(|e| ClusterError::Storage {
215 detail: format!("migration_state begin_read: {e}"),
216 })?;
217 let table = txn
218 .open_table(Self::TABLE)
219 .map_err(|e| ClusterError::Storage {
220 detail: format!("migration_state open_table: {e}"),
221 })?;
222 let range = table.range::<&str>(..).map_err(|e| ClusterError::Storage {
223 detail: format!("migration_state range: {e}"),
224 })?;
225 for entry in range {
226 let (key, value) = entry.map_err(|e| ClusterError::Storage {
227 detail: format!("migration_state iter: {e}"),
228 })?;
229 let key_str = key.value().to_owned();
230 match zerompk::from_msgpack::<PersistedMigrationCheckpoint>(value.value()) {
231 Ok(row) => {
232 self.mem.insert(key_str, row);
233 }
234 Err(e) => {
235 tracing::warn!(key = %key_str, error = %e, "migration_state: corrupt row skipped");
236 }
237 }
238 }
239 Ok(())
240 }
241
242 pub fn upsert(&mut self, row: PersistedMigrationCheckpoint) -> Result<()> {
247 let key = row.migration_id.clone();
248 if let Some(existing) = self.mem.get(&key)
250 && existing.payload.phase_tag() == row.payload.phase_tag()
251 && existing.attempt == row.attempt
252 {
253 return Ok(());
254 }
255 let bytes = zerompk::to_msgpack_vec(&row).map_err(|e| ClusterError::Codec {
257 detail: format!("migration_state encode: {e}"),
258 })?;
259 let txn = self.db.begin_write().map_err(|e| ClusterError::Storage {
260 detail: format!("migration_state begin_write: {e}"),
261 })?;
262 {
263 let mut table = txn
264 .open_table(Self::TABLE)
265 .map_err(|e| ClusterError::Storage {
266 detail: format!("migration_state open_table: {e}"),
267 })?;
268 table
269 .insert(key.as_str(), bytes.as_slice())
270 .map_err(|e| ClusterError::Storage {
271 detail: format!("migration_state insert: {e}"),
272 })?;
273 }
274 txn.commit().map_err(|e| ClusterError::Storage {
275 detail: format!("migration_state commit: {e}"),
276 })?;
277 self.mem.insert(key, row);
278 Ok(())
279 }
280
281 pub fn remove(&mut self, migration_id: &MigrationId) -> Result<()> {
283 let key = migration_id.hyphenated().to_string();
284 self.mem.remove(&key);
285 let txn = self.db.begin_write().map_err(|e| ClusterError::Storage {
286 detail: format!("migration_state begin_write: {e}"),
287 })?;
288 {
289 let mut table = txn
290 .open_table(Self::TABLE)
291 .map_err(|e| ClusterError::Storage {
292 detail: format!("migration_state open_table: {e}"),
293 })?;
294 let _ = table
295 .remove(key.as_str())
296 .map_err(|e| ClusterError::Storage {
297 detail: format!("migration_state remove: {e}"),
298 })?;
299 }
300 txn.commit().map_err(|e| ClusterError::Storage {
301 detail: format!("migration_state commit: {e}"),
302 })?;
303 Ok(())
304 }
305
306 pub fn all_checkpoints(&self) -> Vec<PersistedMigrationCheckpoint> {
308 self.mem.values().cloned().collect()
309 }
310
311 pub fn get(&self, migration_id: &MigrationId) -> Option<&PersistedMigrationCheckpoint> {
313 self.mem.get(&migration_id.hyphenated().to_string())
314 }
315}
316
317pub type SharedMigrationStateTable = Arc<Mutex<MigrationStateTable>>;
322
323pub fn new_shared(db: Arc<redb::Database>) -> SharedMigrationStateTable {
325 Arc::new(Mutex::new(MigrationStateTable::new(db)))
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 fn temp_db() -> Arc<redb::Database> {
333 let dir = tempfile::tempdir().unwrap();
334 let path = dir.path().join("test.redb");
335 let db = redb::Database::create(&path).unwrap();
336 let txn = db.begin_write().unwrap();
338 {
339 let _ = txn.open_table(MigrationStateTable::TABLE).unwrap();
340 }
341 txn.commit().unwrap();
342 std::mem::forget(dir);
344 Arc::new(db)
345 }
346
347 fn make_row(
348 id: MigrationId,
349 phase: MigrationCheckpointPayload,
350 attempt: u32,
351 ) -> PersistedMigrationCheckpoint {
352 let crc = phase.crc32c().unwrap();
353 PersistedMigrationCheckpoint {
354 migration_id: id.hyphenated().to_string(),
355 attempt,
356 payload: phase,
357 crc32c: crc,
358 ts_ms: 0,
359 }
360 }
361
362 #[test]
363 fn upsert_and_load_roundtrip() {
364 let db = temp_db();
365 let mut table = MigrationStateTable::new(Arc::clone(&db));
366
367 let id = Uuid::new_v4();
368 let payload = MigrationCheckpointPayload::AddLearner {
369 vshard_id: 5,
370 source_node: 1,
371 target_node: 2,
372 source_group: 0,
373 write_pause_budget_us: 500_000,
374 started_at_hlc: Hlc::default(),
375 };
376 let row = make_row(id, payload.clone(), 0);
377 table.upsert(row).unwrap();
378
379 let mut table2 = MigrationStateTable::new(Arc::clone(&db));
381 table2.load_all().unwrap();
382 let loaded = table2.get(&id).unwrap();
383 assert_eq!(loaded.payload, payload);
384 assert_eq!(loaded.attempt, 0);
385 }
386
387 #[test]
388 fn idempotent_upsert_same_phase_attempt() {
389 let db = temp_db();
390 let mut table = MigrationStateTable::new(Arc::clone(&db));
391
392 let id = Uuid::new_v4();
393 let payload = MigrationCheckpointPayload::CatchUp {
394 vshard_id: 3,
395 learner_log_index_at_add: 10,
396 };
397 let row = make_row(id, payload, 0);
398 table.upsert(row.clone()).unwrap();
399 table.upsert(row).unwrap(); assert_eq!(table.all_checkpoints().len(), 1);
401 }
402
403 #[test]
404 fn remove_deletes_from_redb() {
405 let db = temp_db();
406 let mut table = MigrationStateTable::new(Arc::clone(&db));
407
408 let id = Uuid::new_v4();
409 let payload = MigrationCheckpointPayload::Complete {
410 vshard_id: 7,
411 actual_pause_us: 100,
412 ghost_stub_installed: true,
413 };
414 table.upsert(make_row(id, payload, 1)).unwrap();
415 table.remove(&id).unwrap();
416
417 let mut table2 = MigrationStateTable::new(Arc::clone(&db));
418 table2.load_all().unwrap();
419 assert!(table2.get(&id).is_none());
420 }
421
422 #[test]
423 fn payload_crc32c_detects_corruption() {
424 let payload = MigrationCheckpointPayload::CatchUp {
425 vshard_id: 9,
426 learner_log_index_at_add: 42,
427 };
428 let mut bytes = payload.to_bytes().unwrap();
429 bytes[0] ^= 0xFF;
431 let original_crc = payload.crc32c().unwrap();
433 let corrupted_crc = crc32c::crc32c(&bytes);
434 assert_ne!(original_crc, corrupted_crc);
435 }
436
437 #[test]
438 fn zerompk_payload_roundtrip() {
439 let payload = MigrationCheckpointPayload::LeadershipTransfer {
440 vshard_id: 11,
441 target_is_voter: true,
442 new_leader_node_id: 7,
443 source_group: 2,
444 };
445 let bytes = zerompk::to_msgpack_vec(&payload).unwrap();
446 let decoded: MigrationCheckpointPayload = zerompk::from_msgpack(&bytes).unwrap();
447 assert_eq!(payload, decoded);
448 }
449}