Skip to main content

nodedb_cluster/
migration.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3use std::time::Instant;
4
5use tracing::{info, warn};
6
7/// 3-phase shard migration state machine.
8///
9/// 1. **Base Copy**: Target node pulls the vShard's L1 segments via RDMA/QUIC.
10/// 2. **WAL Catch-up**: Target subscribes to source WAL, replays live mutations.
11/// 3. **Atomic Cut-over**: Raft leader updates routing table atomically.
12///
13/// Write Pause Disclosure : During Phase 3, writes to the
14/// migrating vShard are paused. `migration_write_pause_budget_us` controls the
15/// maximum acceptable pause.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum MigrationPhase {
18    /// Not migrating.
19    Idle,
20    /// Phase 1: Bulk data transfer from source to target.
21    BaseCopy {
22        bytes_transferred: u64,
23        bytes_total: u64,
24    },
25    /// Phase 2: Target replaying WAL entries from source.
26    WalCatchUp {
27        /// Source LSN when catch-up started.
28        start_lsn: u64,
29        /// Current LSN on target.
30        current_lsn: u64,
31        /// Latest LSN on source.
32        source_lsn: u64,
33    },
34    /// Phase 3: Writes paused, routing table update in progress.
35    AtomicCutOver {
36        /// When the pause started.
37        pause_start_us: u64,
38    },
39    /// Migration completed successfully.
40    Completed {
41        /// Actual pause duration in microseconds.
42        pause_duration_us: u64,
43    },
44    /// Migration failed.
45    Failed { reason: String },
46}
47
48/// Full state of a vShard migration.
49#[derive(Debug, Clone)]
50pub struct MigrationState {
51    /// vShard being migrated.
52    pub vshard_id: u32,
53    /// Source Raft group.
54    pub source_group: u64,
55    /// Target Raft group.
56    pub target_group: u64,
57    /// Source node (current owner).
58    pub source_node: u64,
59    /// Target node.
60    pub target_node: u64,
61    /// Current phase.
62    pub phase: MigrationPhase,
63    /// Maximum write-pause budget in microseconds.
64    pub write_pause_budget_us: u64,
65    /// When migration was initiated.
66    pub started_at: Option<Instant>,
67}
68
69impl MigrationState {
70    pub fn new(
71        vshard_id: u32,
72        source_group: u64,
73        target_group: u64,
74        source_node: u64,
75        target_node: u64,
76        write_pause_budget_us: u64,
77    ) -> Self {
78        Self {
79            vshard_id,
80            source_group,
81            target_group,
82            source_node,
83            target_node,
84            phase: MigrationPhase::Idle,
85            write_pause_budget_us,
86            started_at: None,
87        }
88    }
89
90    /// Start Phase 1: Base Copy.
91    pub fn start_base_copy(&mut self, bytes_total: u64) {
92        self.started_at = Some(Instant::now());
93        self.phase = MigrationPhase::BaseCopy {
94            bytes_transferred: 0,
95            bytes_total,
96        };
97        info!(
98            vshard = self.vshard_id,
99            source = self.source_node,
100            target = self.target_node,
101            bytes_total,
102            "starting base copy"
103        );
104    }
105
106    /// Update base copy progress.
107    pub fn update_base_copy(&mut self, bytes_transferred: u64) {
108        if let MigrationPhase::BaseCopy { bytes_total, .. } = self.phase {
109            self.phase = MigrationPhase::BaseCopy {
110                bytes_transferred,
111                bytes_total,
112            };
113        }
114    }
115
116    /// Transition to Phase 2: WAL Catch-up.
117    pub fn start_wal_catchup(&mut self, start_lsn: u64, source_lsn: u64) {
118        self.phase = MigrationPhase::WalCatchUp {
119            start_lsn,
120            current_lsn: start_lsn,
121            source_lsn,
122        };
123        info!(
124            vshard = self.vshard_id,
125            start_lsn,
126            source_lsn,
127            lag = source_lsn - start_lsn,
128            "starting wal catch-up"
129        );
130    }
131
132    /// Update WAL catch-up progress.
133    pub fn update_wal_catchup(&mut self, current_lsn: u64, source_lsn: u64) {
134        if let MigrationPhase::WalCatchUp { start_lsn, .. } = self.phase {
135            self.phase = MigrationPhase::WalCatchUp {
136                start_lsn,
137                current_lsn,
138                source_lsn,
139            };
140        }
141    }
142
143    /// Check if WAL lag is low enough to proceed to cut-over.
144    pub fn is_catchup_ready(&self) -> bool {
145        if let MigrationPhase::WalCatchUp {
146            current_lsn,
147            source_lsn,
148            ..
149        } = self.phase
150        {
151            // Sub-millisecond lag = within 10 LSN entries.
152            source_lsn.saturating_sub(current_lsn) <= 10
153        } else {
154            false
155        }
156    }
157
158    /// Attempt to start Phase 3: Atomic Cut-over.
159    ///
160    /// Returns `Err` if the estimated pause would exceed the budget.
161    pub fn start_cutover(&mut self, estimated_pause_us: u64) -> crate::Result<()> {
162        if estimated_pause_us > self.write_pause_budget_us {
163            warn!(
164                vshard = self.vshard_id,
165                estimated_us = estimated_pause_us,
166                budget_us = self.write_pause_budget_us,
167                "refusing cut-over: pause exceeds budget"
168            );
169            return Err(crate::ClusterError::MigrationPauseBudgetExceeded {
170                estimated_us: estimated_pause_us,
171                budget_us: self.write_pause_budget_us,
172            });
173        }
174
175        self.phase = MigrationPhase::AtomicCutOver {
176            pause_start_us: estimated_pause_us,
177        };
178        info!(
179            vshard = self.vshard_id,
180            estimated_pause_us, "starting atomic cut-over"
181        );
182        Ok(())
183    }
184
185    /// Complete the migration.
186    pub fn complete(&mut self, actual_pause_us: u64) {
187        self.phase = MigrationPhase::Completed {
188            pause_duration_us: actual_pause_us,
189        };
190        info!(
191            vshard = self.vshard_id,
192            pause_us = actual_pause_us,
193            "migration completed"
194        );
195    }
196
197    /// Mark migration as failed. Source remains authoritative.
198    pub fn fail(&mut self, reason: String) {
199        warn!(
200            vshard = self.vshard_id,
201            reason = %reason,
202            "migration failed, source remains authoritative"
203        );
204        self.phase = MigrationPhase::Failed { reason };
205    }
206
207    pub fn is_active(&self) -> bool {
208        !matches!(
209            self.phase,
210            MigrationPhase::Idle | MigrationPhase::Completed { .. } | MigrationPhase::Failed { .. }
211        )
212    }
213
214    /// Elapsed time since migration started.
215    pub fn elapsed(&self) -> Option<std::time::Duration> {
216        self.started_at.map(|s| s.elapsed())
217    }
218
219    pub fn vshard_id(&self) -> u32 {
220        self.vshard_id
221    }
222
223    pub fn phase(&self) -> &MigrationPhase {
224        &self.phase
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    fn test_migration() -> MigrationState {
233        MigrationState::new(42, 0, 1, 10, 20, 1000)
234    }
235
236    #[test]
237    fn full_lifecycle() {
238        let mut m = test_migration();
239        assert!(!m.is_active());
240
241        // Phase 1.
242        m.start_base_copy(1_000_000);
243        assert!(m.is_active());
244        m.update_base_copy(500_000);
245        if let MigrationPhase::BaseCopy {
246            bytes_transferred, ..
247        } = m.phase
248        {
249            assert_eq!(bytes_transferred, 500_000);
250        } else {
251            panic!("expected BaseCopy");
252        }
253
254        // Phase 2.
255        m.start_wal_catchup(100, 200);
256        assert!(!m.is_catchup_ready());
257        m.update_wal_catchup(195, 200);
258        assert!(m.is_catchup_ready());
259
260        // Phase 3.
261        m.start_cutover(500).unwrap();
262        assert!(matches!(m.phase, MigrationPhase::AtomicCutOver { .. }));
263
264        // Complete.
265        m.complete(450);
266        assert!(!m.is_active());
267        if let MigrationPhase::Completed {
268            pause_duration_us, ..
269        } = m.phase
270        {
271            assert_eq!(pause_duration_us, 450);
272        }
273    }
274
275    #[test]
276    fn pause_budget_exceeded() {
277        let mut m = test_migration();
278        m.start_base_copy(100);
279        m.start_wal_catchup(0, 5);
280        m.update_wal_catchup(5, 5);
281
282        // Budget is 1000µs, estimated is 2000µs.
283        let err = m.start_cutover(2000).unwrap_err();
284        assert!(matches!(
285            err,
286            crate::ClusterError::MigrationPauseBudgetExceeded { .. }
287        ));
288    }
289
290    #[test]
291    fn failure_recovery() {
292        let mut m = test_migration();
293        m.start_base_copy(100);
294        m.fail("network partition".into());
295        assert!(!m.is_active());
296        assert!(matches!(m.phase, MigrationPhase::Failed { .. }));
297    }
298
299    #[test]
300    fn catchup_threshold() {
301        let mut m = test_migration();
302        m.start_base_copy(100);
303        m.start_wal_catchup(0, 100);
304
305        // Still 90 behind.
306        m.update_wal_catchup(10, 100);
307        assert!(!m.is_catchup_ready());
308
309        // Within 10.
310        m.update_wal_catchup(95, 100);
311        assert!(m.is_catchup_ready());
312    }
313}