Skip to main content

nodedb_cluster/
migration_executor.rs

1//! vShard migration executor — drives the 3-phase migration state machine.
2//!
3//! **Phase 1 (Base Copy):** Add target node to source Raft group as learner.
4//! Raft replication handles data transfer (AppendEntries with committed log entries).
5//!
6//! **Phase 2 (WAL Catch-Up):** Monitor target's replication lag. When the target's
7//! commit_index is within threshold of the leader's, catch-up is ready.
8//!
9//! **Phase 3 (Atomic Cut-Over):** Propose a routing table update through Raft.
10//! Once committed on all replicas, the vShard is atomically owned by the target group.
11//! Create ghost stubs on the source for transparent scatter-gather.
12
13use std::sync::{Arc, Mutex, RwLock};
14use std::time::Duration;
15
16use tracing::{debug, info};
17
18use crate::conf_change::{ConfChange, ConfChangeType};
19use crate::decommission::MetadataProposer;
20use crate::error::{ClusterError, Result};
21use crate::ghost::{GhostStub, GhostTable};
22use crate::metadata_group::{MetadataEntry, RoutingChange};
23use crate::migration::{MigrationPhase, MigrationState};
24use crate::multi_raft::MultiRaft;
25use crate::routing::RoutingTable;
26use crate::topology::ClusterTopology;
27use crate::transport::NexarTransport;
28
29/// Configuration for a vShard migration.
30#[derive(Debug, Clone)]
31pub struct MigrationRequest {
32    pub vshard_id: u16,
33    pub source_node: u64,
34    pub target_node: u64,
35    /// Maximum allowed write pause during Phase 3 (microseconds).
36    pub write_pause_budget_us: u64,
37}
38
39impl Default for MigrationRequest {
40    fn default() -> Self {
41        Self {
42            vshard_id: 0,
43            source_node: 0,
44            target_node: 0,
45            write_pause_budget_us: 500_000, // 500ms default budget.
46        }
47    }
48}
49
50/// Result of a completed migration.
51#[derive(Debug)]
52pub struct MigrationResult {
53    pub vshard_id: u16,
54    pub source_node: u64,
55    pub target_node: u64,
56    pub phase: MigrationPhase,
57    pub elapsed: Option<Duration>,
58}
59
60/// Executes a vShard migration through the 3-phase protocol.
61///
62/// Coordinates between MultiRaft (for Raft membership + proposal), RoutingTable
63/// (for vShard ownership), and the transport layer (for data transfer).
64pub struct MigrationExecutor {
65    multi_raft: Arc<Mutex<MultiRaft>>,
66    routing: Arc<RwLock<RoutingTable>>,
67    topology: Arc<RwLock<ClusterTopology>>,
68    transport: Arc<NexarTransport>,
69    ghost_table: Arc<Mutex<GhostTable>>,
70    /// Optional metadata proposer for replicated routing updates.
71    /// When set, Phase 3 cut-over proposes a `RoutingChange` through
72    /// the metadata Raft group so every node applies the routing
73    /// update atomically on commit. When `None`, falls back to
74    /// local-only routing mutation (used by tests that don't stand
75    /// up a metadata group).
76    metadata_proposer: Option<Arc<dyn MetadataProposer>>,
77}
78
79impl MigrationExecutor {
80    pub fn new(
81        multi_raft: Arc<Mutex<MultiRaft>>,
82        routing: Arc<RwLock<RoutingTable>>,
83        topology: Arc<RwLock<ClusterTopology>>,
84        transport: Arc<NexarTransport>,
85    ) -> Self {
86        Self {
87            multi_raft,
88            routing,
89            topology,
90            transport,
91            ghost_table: Arc::new(Mutex::new(GhostTable::new())),
92            metadata_proposer: None,
93        }
94    }
95
96    /// Attach a metadata proposer for replicated Phase 3 cut-over.
97    /// Production wiring calls this; tests may omit it for simplicity.
98    pub fn with_metadata_proposer(mut self, proposer: Arc<dyn MetadataProposer>) -> Self {
99        self.metadata_proposer = Some(proposer);
100        self
101    }
102
103    /// Access the ghost table (for scatter-gather resolution).
104    pub fn ghost_table(&self) -> &Arc<Mutex<GhostTable>> {
105        &self.ghost_table
106    }
107
108    /// Execute a full 3-phase migration.
109    ///
110    /// This must be called on the source node (the current leader for the vShard's group).
111    pub async fn execute(&self, req: MigrationRequest) -> Result<MigrationResult> {
112        // Resolve the source group from routing.
113        let source_group = {
114            let routing = self.routing.read().unwrap_or_else(|p| p.into_inner());
115            routing.group_for_vshard(req.vshard_id)?
116        };
117
118        let mut state = MigrationState::new(
119            req.vshard_id,
120            source_group,
121            source_group, // Target group is same group with new member.
122            req.source_node,
123            req.target_node,
124            req.write_pause_budget_us,
125        );
126
127        info!(
128            vshard = req.vshard_id,
129            source = req.source_node,
130            target = req.target_node,
131            group = source_group,
132            "starting vShard migration"
133        );
134
135        // ── Phase 1: Add target to Raft group (base copy via replication) ──
136
137        self.phase1_base_copy(&mut state, source_group, &req)
138            .await?;
139
140        // ── Phase 2: WAL catch-up (monitor replication lag) ──
141
142        self.phase2_wal_catchup(&mut state, source_group, &req)
143            .await?;
144
145        // ── Phase 3: Atomic cut-over (routing update via Raft) ──
146
147        self.phase3_cutover(&mut state, source_group, &req).await?;
148
149        let elapsed = state.elapsed();
150        let phase = state.phase().clone();
151
152        info!(
153            vshard = req.vshard_id,
154            source = req.source_node,
155            target = req.target_node,
156            elapsed_ms = elapsed.map(|d| d.as_millis() as u64).unwrap_or(0),
157            "vShard migration completed"
158        );
159
160        Ok(MigrationResult {
161            vshard_id: req.vshard_id,
162            source_node: req.source_node,
163            target_node: req.target_node,
164            phase,
165            elapsed,
166        })
167    }
168
169    /// Phase 1: Add target node to the Raft group.
170    ///
171    /// Raft replication automatically transfers committed log entries to the new
172    /// member. This is the "base copy" — the new node receives all historical
173    /// state through Raft's AppendEntries mechanism.
174    async fn phase1_base_copy(
175        &self,
176        state: &mut MigrationState,
177        group_id: u64,
178        req: &MigrationRequest,
179    ) -> Result<()> {
180        // Estimate base copy size (approximation: number of committed entries).
181        let committed = {
182            let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
183            let statuses = mr.group_statuses();
184            statuses
185                .iter()
186                .find(|s| s.group_id == group_id)
187                .map(|s| s.commit_index)
188                .unwrap_or(0)
189        };
190        state.start_base_copy(committed);
191
192        info!(
193            vshard = req.vshard_id,
194            group = group_id,
195            target = req.target_node,
196            entries = committed,
197            "phase 1: adding target to raft group"
198        );
199
200        // Add target node as a LEARNER so it can catch up via Raft
201        // replication without participating in elections or voting.
202        // Promotion to voter happens after Phase 2 confirms catch-up.
203        let change = ConfChange {
204            change_type: ConfChangeType::AddLearner,
205            node_id: req.target_node,
206        };
207
208        {
209            let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
210            mr.propose_conf_change(group_id, &change)?;
211        }
212
213        // Register the target peer in the transport so AppendEntries can reach it.
214        if let Some(node_info) = {
215            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
216            topo.get_node(req.target_node).map(|n| n.addr.clone())
217        } && let Ok(addr) = node_info.parse()
218        {
219            self.transport.register_peer(req.target_node, addr);
220        }
221
222        // The ConfChange will be replicated and applied. The target node
223        // receives the full log through Raft's normal replication.
224        // Mark base copy as complete — Raft replication is now in
225        // progress; the real progress signal is match_index in Phase 2.
226        state.update_base_copy(committed);
227
228        debug!(
229            vshard = req.vshard_id,
230            "phase 1 complete: target added as learner to raft group"
231        );
232
233        Ok(())
234    }
235
236    /// Phase 2: Monitor target's replication lag until catch-up is ready.
237    async fn phase2_wal_catchup(
238        &self,
239        state: &mut MigrationState,
240        group_id: u64,
241        req: &MigrationRequest,
242    ) -> Result<()> {
243        let leader_commit = {
244            let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
245            let statuses = mr.group_statuses();
246            statuses
247                .iter()
248                .find(|s| s.group_id == group_id)
249                .map(|s| s.commit_index)
250                .unwrap_or(0)
251        };
252
253        state.start_wal_catchup(leader_commit, leader_commit);
254
255        info!(
256            vshard = req.vshard_id,
257            leader_commit, "phase 2: monitoring replication lag"
258        );
259
260        // Capture the initial connection stable_id to the target.
261        // If this changes during catch-up, it means the connection was replaced
262        // (possible node crash + restart at same address), and we must abort.
263        let initial_stable_id = self.transport.peer_connection_stable_id(req.target_node);
264
265        // Also capture the target's address from topology for change detection.
266        let initial_target_addr = {
267            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
268            topo.get_node(req.target_node).map(|n| n.addr.clone())
269        };
270
271        // Poll until the target has caught up by checking the leader's
272        // match_index for the target node. This confirms the target has
273        // actually replicated the data, not just the leader's commit index.
274        let poll_interval = Duration::from_millis(100);
275        let timeout = Duration::from_secs(60);
276        let deadline = std::time::Instant::now() + timeout;
277
278        loop {
279            tokio::time::sleep(poll_interval).await;
280
281            // Verify peer identity hasn't changed mid-transfer.
282            // Check 1: connection stable_id — detects QUIC connection replacement.
283            if let Some(initial_id) = initial_stable_id {
284                match self.transport.peer_connection_stable_id(req.target_node) {
285                    Some(current_id) if current_id != initial_id => {
286                        let reason = format!(
287                            "peer identity changed mid-migration: connection stable_id {} -> {} for node {}",
288                            initial_id, current_id, req.target_node
289                        );
290                        state.fail(reason.clone());
291                        return Err(ClusterError::Transport { detail: reason });
292                    }
293                    None => {
294                        // Connection dropped — peer may have crashed.
295                        let reason = format!(
296                            "connection to target node {} lost during migration",
297                            req.target_node
298                        );
299                        state.fail(reason.clone());
300                        return Err(ClusterError::Transport { detail: reason });
301                    }
302                    _ => {}
303                }
304            }
305
306            // Check 2: topology address — detects node replacement at different address.
307            {
308                let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
309                let current_addr = topo.get_node(req.target_node).map(|n| n.addr.clone());
310                if current_addr != initial_target_addr {
311                    let reason = format!(
312                        "target node {} address changed during migration: {:?} -> {:?}",
313                        req.target_node, initial_target_addr, current_addr
314                    );
315                    state.fail(reason.clone());
316                    return Err(ClusterError::Transport { detail: reason });
317                }
318            }
319
320            let (leader_commit, target_match) = {
321                let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
322                let statuses = mr.group_statuses();
323                let commit = statuses
324                    .iter()
325                    .find(|s| s.group_id == group_id)
326                    .map(|s| s.commit_index)
327                    .unwrap_or(0);
328                // Query the target's match_index from the leader's replication state.
329                let target_match = mr.match_index_for(group_id, req.target_node).unwrap_or(0);
330                (commit, target_match)
331            };
332
333            state.update_wal_catchup(leader_commit, target_match);
334
335            if state.is_catchup_ready() {
336                // Learner has caught up — promote to voter so the
337                // group has enough replicas for a safe cut-over.
338                let promote = ConfChange {
339                    change_type: ConfChangeType::PromoteLearner,
340                    node_id: req.target_node,
341                };
342                {
343                    let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
344                    mr.propose_conf_change(group_id, &promote)?;
345                }
346                debug!(
347                    vshard = req.vshard_id,
348                    leader_commit,
349                    target_match,
350                    "phase 2 complete: target caught up and promoted to voter"
351                );
352                return Ok(());
353            }
354
355            if std::time::Instant::now() >= deadline {
356                let reason = format!(
357                    "WAL catch-up timed out after {}s (leader={leader_commit}, target={target_match})",
358                    timeout.as_secs()
359                );
360                state.fail(reason.clone());
361                return Err(ClusterError::Transport { detail: reason });
362            }
363        }
364    }
365
366    /// Phase 3: Atomic routing table update.
367    ///
368    /// When a [`MetadataProposer`] is attached, the cut-over proposes
369    /// a `LeadershipTransfer` through the metadata Raft group so
370    /// every node applies the routing update atomically on commit.
371    /// Without a proposer (tests), falls back to a local-only
372    /// mutation.
373    async fn phase3_cutover(
374        &self,
375        state: &mut MigrationState,
376        group_id: u64,
377        req: &MigrationRequest,
378    ) -> Result<()> {
379        let estimated_pause_us = 10_000;
380
381        state.start_cutover(estimated_pause_us).map_err(|e| {
382            state.fail(format!("cutover rejected: {e}"));
383            e
384        })?;
385
386        let cutover_start = std::time::Instant::now();
387
388        info!(
389            vshard = req.vshard_id,
390            estimated_pause_us, "phase 3: atomic cut-over"
391        );
392
393        // Propose the routing change. With a metadata proposer the
394        // `CacheApplier::with_live_state` on every node handles the
395        // actual routing mutation when the entry commits; without a
396        // proposer we mutate locally for backward-compat.
397        if let Some(proposer) = &self.metadata_proposer {
398            let entry = MetadataEntry::RoutingChange(RoutingChange::LeadershipTransfer {
399                group_id,
400                new_leader_node_id: req.target_node,
401            });
402            proposer.propose_and_wait(entry).await?;
403        } else {
404            let mut routing = self.routing.write().unwrap_or_else(|p| p.into_inner());
405            routing.set_leader(group_id, req.target_node);
406        }
407
408        // Ghost stub so in-flight scatter-gather queries that still
409        // target the old leader are transparently forwarded.
410        {
411            let mut ghosts = self.ghost_table.lock().unwrap_or_else(|p| p.into_inner());
412            ghosts.insert(GhostStub {
413                node_id: format!("vshard-{}", req.vshard_id),
414                target_shard: req.vshard_id,
415                refcount: 1,
416                created_at_ms: std::time::SystemTime::now()
417                    .duration_since(std::time::UNIX_EPOCH)
418                    .unwrap_or_default()
419                    .as_millis() as u64,
420            });
421        }
422
423        let actual_pause_us = cutover_start.elapsed().as_micros() as u64;
424        state.complete(actual_pause_us);
425
426        debug!(
427            vshard = req.vshard_id,
428            actual_pause_us, "phase 3 complete: routing updated"
429        );
430
431        Ok(())
432    }
433}
434
435/// Track active migrations across the cluster.
436pub struct MigrationTracker {
437    active: Mutex<Vec<MigrationState>>,
438}
439
440impl MigrationTracker {
441    pub fn new() -> Self {
442        Self {
443            active: Mutex::new(Vec::new()),
444        }
445    }
446
447    pub fn add(&self, state: MigrationState) {
448        let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
449        active.push(state);
450    }
451
452    pub fn active_count(&self) -> usize {
453        let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
454        active.iter().filter(|s| s.is_active()).count()
455    }
456
457    /// Snapshot of all migration states for observability.
458    pub fn snapshot(&self) -> Vec<MigrationSnapshot> {
459        let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
460        active
461            .iter()
462            .map(|s| MigrationSnapshot {
463                vshard_id: s.vshard_id(),
464                phase: format!("{:?}", s.phase()),
465                elapsed_ms: s.elapsed().map(|d| d.as_millis() as u64).unwrap_or(0),
466                is_active: s.is_active(),
467            })
468            .collect()
469    }
470
471    /// Remove completed/failed migrations older than the given age.
472    pub fn gc(&self, max_age: Duration) {
473        let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
474        active.retain(|s| s.is_active() || s.elapsed().map(|d| d < max_age).unwrap_or(true));
475    }
476}
477
478impl Default for MigrationTracker {
479    fn default() -> Self {
480        Self::new()
481    }
482}
483
484/// Observability snapshot of a migration.
485#[derive(Debug, Clone)]
486pub struct MigrationSnapshot {
487    pub vshard_id: u16,
488    pub phase: String,
489    pub elapsed_ms: u64,
490    pub is_active: bool,
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::routing::RoutingTable;
497    use crate::topology::ClusterTopology;
498
499    #[test]
500    fn migration_tracker_lifecycle() {
501        let tracker = MigrationTracker::new();
502        assert_eq!(tracker.active_count(), 0);
503
504        let mut state = MigrationState::new(0, 0, 1, 1, 2, 500_000);
505        state.start_base_copy(100);
506        tracker.add(state);
507
508        assert_eq!(tracker.active_count(), 1);
509        assert_eq!(tracker.snapshot().len(), 1);
510        assert!(tracker.snapshot()[0].is_active);
511    }
512
513    #[tokio::test]
514    async fn migration_executor_phase1() {
515        // Test that phase 1 adds the target node to the Raft group.
516        let dir = tempfile::tempdir().unwrap();
517        let rt = RoutingTable::uniform(1, &[1], 1);
518        let mut mr = crate::multi_raft::MultiRaft::new(1, rt.clone(), dir.path().to_path_buf());
519        mr.add_group(0, vec![]).unwrap();
520
521        // Make node 1 the leader (single-node → auto-elected).
522        use std::time::Instant;
523        for node in mr.groups_mut().values_mut() {
524            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
525        }
526        // Tick to trigger election.
527        let _ = mr.tick();
528        // Drain ready to consume the no-op.
529        for (gid, ready) in mr.tick().groups {
530            if let Some(last) = ready.committed_entries.last() {
531                mr.advance_applied(gid, last.index).unwrap();
532            }
533        }
534
535        let multi_raft = Arc::new(Mutex::new(mr));
536        let routing = Arc::new(RwLock::new(rt));
537        let topology = Arc::new(RwLock::new(ClusterTopology::new()));
538        let transport = Arc::new(
539            NexarTransport::new(
540                1,
541                "127.0.0.1:0".parse().unwrap(),
542                crate::transport::credentials::TransportCredentials::Insecure,
543            )
544            .unwrap(),
545        );
546
547        let executor = MigrationExecutor::new(multi_raft.clone(), routing, topology, transport);
548
549        let mut state = MigrationState::new(0, 0, 0, 1, 2, 500_000);
550
551        let req = MigrationRequest {
552            vshard_id: 0,
553            source_node: 1,
554            target_node: 2,
555            write_pause_budget_us: 500_000,
556        };
557
558        // Phase 1 should succeed (adds node 2 as learner to group 0).
559        executor
560            .phase1_base_copy(&mut state, 0, &req)
561            .await
562            .unwrap();
563
564        // Verify: the ConfChange (AddLearner) was proposed in the Raft log.
565        // Application happens on next tick/commit cycle.
566    }
567
568    #[test]
569    fn migration_request_default() {
570        let req = MigrationRequest::default();
571        assert_eq!(req.write_pause_budget_us, 500_000);
572    }
573}