Skip to main content

nodedb_cluster/
migration.rs

1use std::time::Instant;
2
3use tracing::{info, warn};
4
5/// 3-phase shard migration state machine.
6///
7/// 1. **Base Copy**: Target node pulls the vShard's L1 segments via RDMA/QUIC.
8/// 2. **WAL Catch-up**: Target subscribes to source WAL, replays live mutations.
9/// 3. **Atomic Cut-over**: Raft leader updates routing table atomically.
10///
11/// Write Pause Disclosure : During Phase 3, writes to the
12/// migrating vShard are paused. `migration_write_pause_budget_us` controls the
13/// maximum acceptable pause.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum MigrationPhase {
16    /// Not migrating.
17    Idle,
18    /// Phase 1: Bulk data transfer from source to target.
19    BaseCopy {
20        bytes_transferred: u64,
21        bytes_total: u64,
22    },
23    /// Phase 2: Target replaying WAL entries from source.
24    WalCatchUp {
25        /// Source LSN when catch-up started.
26        start_lsn: u64,
27        /// Current LSN on target.
28        current_lsn: u64,
29        /// Latest LSN on source.
30        source_lsn: u64,
31    },
32    /// Phase 3: Writes paused, routing table update in progress.
33    AtomicCutOver {
34        /// When the pause started.
35        pause_start_us: u64,
36    },
37    /// Migration completed successfully.
38    Completed {
39        /// Actual pause duration in microseconds.
40        pause_duration_us: u64,
41    },
42    /// Migration failed.
43    Failed { reason: String },
44}
45
46/// Full state of a vShard migration.
47#[derive(Debug, Clone)]
48pub struct MigrationState {
49    /// vShard being migrated.
50    pub vshard_id: u16,
51    /// Source Raft group.
52    pub source_group: u64,
53    /// Target Raft group.
54    pub target_group: u64,
55    /// Source node (current owner).
56    pub source_node: u64,
57    /// Target node.
58    pub target_node: u64,
59    /// Current phase.
60    pub phase: MigrationPhase,
61    /// Maximum write-pause budget in microseconds.
62    pub write_pause_budget_us: u64,
63    /// When migration was initiated.
64    pub started_at: Option<Instant>,
65}
66
67impl MigrationState {
68    pub fn new(
69        vshard_id: u16,
70        source_group: u64,
71        target_group: u64,
72        source_node: u64,
73        target_node: u64,
74        write_pause_budget_us: u64,
75    ) -> Self {
76        Self {
77            vshard_id,
78            source_group,
79            target_group,
80            source_node,
81            target_node,
82            phase: MigrationPhase::Idle,
83            write_pause_budget_us,
84            started_at: None,
85        }
86    }
87
88    /// Start Phase 1: Base Copy.
89    pub fn start_base_copy(&mut self, bytes_total: u64) {
90        self.started_at = Some(Instant::now());
91        self.phase = MigrationPhase::BaseCopy {
92            bytes_transferred: 0,
93            bytes_total,
94        };
95        info!(
96            vshard = self.vshard_id,
97            source = self.source_node,
98            target = self.target_node,
99            bytes_total,
100            "starting base copy"
101        );
102    }
103
104    /// Update base copy progress.
105    pub fn update_base_copy(&mut self, bytes_transferred: u64) {
106        if let MigrationPhase::BaseCopy { bytes_total, .. } = self.phase {
107            self.phase = MigrationPhase::BaseCopy {
108                bytes_transferred,
109                bytes_total,
110            };
111        }
112    }
113
114    /// Transition to Phase 2: WAL Catch-up.
115    pub fn start_wal_catchup(&mut self, start_lsn: u64, source_lsn: u64) {
116        self.phase = MigrationPhase::WalCatchUp {
117            start_lsn,
118            current_lsn: start_lsn,
119            source_lsn,
120        };
121        info!(
122            vshard = self.vshard_id,
123            start_lsn,
124            source_lsn,
125            lag = source_lsn - start_lsn,
126            "starting wal catch-up"
127        );
128    }
129
130    /// Update WAL catch-up progress.
131    pub fn update_wal_catchup(&mut self, current_lsn: u64, source_lsn: u64) {
132        if let MigrationPhase::WalCatchUp { start_lsn, .. } = self.phase {
133            self.phase = MigrationPhase::WalCatchUp {
134                start_lsn,
135                current_lsn,
136                source_lsn,
137            };
138        }
139    }
140
141    /// Check if WAL lag is low enough to proceed to cut-over.
142    pub fn is_catchup_ready(&self) -> bool {
143        if let MigrationPhase::WalCatchUp {
144            current_lsn,
145            source_lsn,
146            ..
147        } = self.phase
148        {
149            // Sub-millisecond lag = within 10 LSN entries.
150            source_lsn.saturating_sub(current_lsn) <= 10
151        } else {
152            false
153        }
154    }
155
156    /// Attempt to start Phase 3: Atomic Cut-over.
157    ///
158    /// Returns `Err` if the estimated pause would exceed the budget.
159    pub fn start_cutover(&mut self, estimated_pause_us: u64) -> crate::Result<()> {
160        if estimated_pause_us > self.write_pause_budget_us {
161            warn!(
162                vshard = self.vshard_id,
163                estimated_us = estimated_pause_us,
164                budget_us = self.write_pause_budget_us,
165                "refusing cut-over: pause exceeds budget"
166            );
167            return Err(crate::ClusterError::MigrationPauseBudgetExceeded {
168                estimated_us: estimated_pause_us,
169                budget_us: self.write_pause_budget_us,
170            });
171        }
172
173        self.phase = MigrationPhase::AtomicCutOver {
174            pause_start_us: estimated_pause_us,
175        };
176        info!(
177            vshard = self.vshard_id,
178            estimated_pause_us, "starting atomic cut-over"
179        );
180        Ok(())
181    }
182
183    /// Complete the migration.
184    pub fn complete(&mut self, actual_pause_us: u64) {
185        self.phase = MigrationPhase::Completed {
186            pause_duration_us: actual_pause_us,
187        };
188        info!(
189            vshard = self.vshard_id,
190            pause_us = actual_pause_us,
191            "migration completed"
192        );
193    }
194
195    /// Mark migration as failed. Source remains authoritative.
196    pub fn fail(&mut self, reason: String) {
197        warn!(
198            vshard = self.vshard_id,
199            reason = %reason,
200            "migration failed, source remains authoritative"
201        );
202        self.phase = MigrationPhase::Failed { reason };
203    }
204
205    pub fn is_active(&self) -> bool {
206        !matches!(
207            self.phase,
208            MigrationPhase::Idle | MigrationPhase::Completed { .. } | MigrationPhase::Failed { .. }
209        )
210    }
211
212    /// Elapsed time since migration started.
213    pub fn elapsed(&self) -> Option<std::time::Duration> {
214        self.started_at.map(|s| s.elapsed())
215    }
216
217    pub fn vshard_id(&self) -> u16 {
218        self.vshard_id
219    }
220
221    pub fn phase(&self) -> &MigrationPhase {
222        &self.phase
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    fn test_migration() -> MigrationState {
231        MigrationState::new(42, 0, 1, 10, 20, 1000)
232    }
233
234    #[test]
235    fn full_lifecycle() {
236        let mut m = test_migration();
237        assert!(!m.is_active());
238
239        // Phase 1.
240        m.start_base_copy(1_000_000);
241        assert!(m.is_active());
242        m.update_base_copy(500_000);
243        if let MigrationPhase::BaseCopy {
244            bytes_transferred, ..
245        } = m.phase
246        {
247            assert_eq!(bytes_transferred, 500_000);
248        } else {
249            panic!("expected BaseCopy");
250        }
251
252        // Phase 2.
253        m.start_wal_catchup(100, 200);
254        assert!(!m.is_catchup_ready());
255        m.update_wal_catchup(195, 200);
256        assert!(m.is_catchup_ready());
257
258        // Phase 3.
259        m.start_cutover(500).unwrap();
260        assert!(matches!(m.phase, MigrationPhase::AtomicCutOver { .. }));
261
262        // Complete.
263        m.complete(450);
264        assert!(!m.is_active());
265        if let MigrationPhase::Completed {
266            pause_duration_us, ..
267        } = m.phase
268        {
269            assert_eq!(pause_duration_us, 450);
270        }
271    }
272
273    #[test]
274    fn pause_budget_exceeded() {
275        let mut m = test_migration();
276        m.start_base_copy(100);
277        m.start_wal_catchup(0, 5);
278        m.update_wal_catchup(5, 5);
279
280        // Budget is 1000µs, estimated is 2000µs.
281        let err = m.start_cutover(2000).unwrap_err();
282        assert!(matches!(
283            err,
284            crate::ClusterError::MigrationPauseBudgetExceeded { .. }
285        ));
286    }
287
288    #[test]
289    fn failure_recovery() {
290        let mut m = test_migration();
291        m.start_base_copy(100);
292        m.fail("network partition".into());
293        assert!(!m.is_active());
294        assert!(matches!(m.phase, MigrationPhase::Failed { .. }));
295    }
296
297    #[test]
298    fn catchup_threshold() {
299        let mut m = test_migration();
300        m.start_base_copy(100);
301        m.start_wal_catchup(0, 100);
302
303        // Still 90 behind.
304        m.update_wal_catchup(10, 100);
305        assert!(!m.is_catchup_ready());
306
307        // Within 10.
308        m.update_wal_catchup(95, 100);
309        assert!(m.is_catchup_ready());
310    }
311}