1use std::sync::{Arc, Mutex, RwLock};
14use std::time::Duration;
15
16use tracing::{debug, info};
17
18use crate::conf_change::{ConfChange, ConfChangeType};
19use crate::decommission::MetadataProposer;
20use crate::error::{ClusterError, Result};
21use crate::ghost::{GhostStub, GhostTable};
22use crate::metadata_group::{MetadataEntry, RoutingChange};
23use crate::migration::{MigrationPhase, MigrationState};
24use crate::multi_raft::MultiRaft;
25use crate::routing::RoutingTable;
26use crate::topology::ClusterTopology;
27use crate::transport::NexarTransport;
28
29#[derive(Debug, Clone)]
31pub struct MigrationRequest {
32 pub vshard_id: u16,
33 pub source_node: u64,
34 pub target_node: u64,
35 pub write_pause_budget_us: u64,
37}
38
39impl Default for MigrationRequest {
40 fn default() -> Self {
41 Self {
42 vshard_id: 0,
43 source_node: 0,
44 target_node: 0,
45 write_pause_budget_us: 500_000, }
47 }
48}
49
50#[derive(Debug)]
52pub struct MigrationResult {
53 pub vshard_id: u16,
54 pub source_node: u64,
55 pub target_node: u64,
56 pub phase: MigrationPhase,
57 pub elapsed: Option<Duration>,
58}
59
60pub struct MigrationExecutor {
65 multi_raft: Arc<Mutex<MultiRaft>>,
66 routing: Arc<RwLock<RoutingTable>>,
67 topology: Arc<RwLock<ClusterTopology>>,
68 transport: Arc<NexarTransport>,
69 ghost_table: Arc<Mutex<GhostTable>>,
70 metadata_proposer: Option<Arc<dyn MetadataProposer>>,
77}
78
79impl MigrationExecutor {
80 pub fn new(
81 multi_raft: Arc<Mutex<MultiRaft>>,
82 routing: Arc<RwLock<RoutingTable>>,
83 topology: Arc<RwLock<ClusterTopology>>,
84 transport: Arc<NexarTransport>,
85 ) -> Self {
86 Self {
87 multi_raft,
88 routing,
89 topology,
90 transport,
91 ghost_table: Arc::new(Mutex::new(GhostTable::new())),
92 metadata_proposer: None,
93 }
94 }
95
96 pub fn with_metadata_proposer(mut self, proposer: Arc<dyn MetadataProposer>) -> Self {
99 self.metadata_proposer = Some(proposer);
100 self
101 }
102
103 pub fn ghost_table(&self) -> &Arc<Mutex<GhostTable>> {
105 &self.ghost_table
106 }
107
108 pub async fn execute(&self, req: MigrationRequest) -> Result<MigrationResult> {
112 let source_group = {
114 let routing = self.routing.read().unwrap_or_else(|p| p.into_inner());
115 routing.group_for_vshard(req.vshard_id)?
116 };
117
118 let mut state = MigrationState::new(
119 req.vshard_id,
120 source_group,
121 source_group, req.source_node,
123 req.target_node,
124 req.write_pause_budget_us,
125 );
126
127 info!(
128 vshard = req.vshard_id,
129 source = req.source_node,
130 target = req.target_node,
131 group = source_group,
132 "starting vShard migration"
133 );
134
135 self.phase1_base_copy(&mut state, source_group, &req)
138 .await?;
139
140 self.phase2_wal_catchup(&mut state, source_group, &req)
143 .await?;
144
145 self.phase3_cutover(&mut state, source_group, &req).await?;
148
149 let elapsed = state.elapsed();
150 let phase = state.phase().clone();
151
152 info!(
153 vshard = req.vshard_id,
154 source = req.source_node,
155 target = req.target_node,
156 elapsed_ms = elapsed.map(|d| d.as_millis() as u64).unwrap_or(0),
157 "vShard migration completed"
158 );
159
160 Ok(MigrationResult {
161 vshard_id: req.vshard_id,
162 source_node: req.source_node,
163 target_node: req.target_node,
164 phase,
165 elapsed,
166 })
167 }
168
169 async fn phase1_base_copy(
175 &self,
176 state: &mut MigrationState,
177 group_id: u64,
178 req: &MigrationRequest,
179 ) -> Result<()> {
180 let committed = {
182 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
183 let statuses = mr.group_statuses();
184 statuses
185 .iter()
186 .find(|s| s.group_id == group_id)
187 .map(|s| s.commit_index)
188 .unwrap_or(0)
189 };
190 state.start_base_copy(committed);
191
192 info!(
193 vshard = req.vshard_id,
194 group = group_id,
195 target = req.target_node,
196 entries = committed,
197 "phase 1: adding target to raft group"
198 );
199
200 let change = ConfChange {
204 change_type: ConfChangeType::AddLearner,
205 node_id: req.target_node,
206 };
207
208 {
209 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
210 mr.propose_conf_change(group_id, &change)?;
211 }
212
213 if let Some(node_info) = {
215 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
216 topo.get_node(req.target_node).map(|n| n.addr.clone())
217 } && let Ok(addr) = node_info.parse()
218 {
219 self.transport.register_peer(req.target_node, addr);
220 }
221
222 state.update_base_copy(committed);
227
228 debug!(
229 vshard = req.vshard_id,
230 "phase 1 complete: target added as learner to raft group"
231 );
232
233 Ok(())
234 }
235
236 async fn phase2_wal_catchup(
238 &self,
239 state: &mut MigrationState,
240 group_id: u64,
241 req: &MigrationRequest,
242 ) -> Result<()> {
243 let leader_commit = {
244 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
245 let statuses = mr.group_statuses();
246 statuses
247 .iter()
248 .find(|s| s.group_id == group_id)
249 .map(|s| s.commit_index)
250 .unwrap_or(0)
251 };
252
253 state.start_wal_catchup(leader_commit, leader_commit);
254
255 info!(
256 vshard = req.vshard_id,
257 leader_commit, "phase 2: monitoring replication lag"
258 );
259
260 let initial_stable_id = self.transport.peer_connection_stable_id(req.target_node);
264
265 let initial_target_addr = {
267 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
268 topo.get_node(req.target_node).map(|n| n.addr.clone())
269 };
270
271 let poll_interval = Duration::from_millis(100);
275 let timeout = Duration::from_secs(60);
276 let deadline = std::time::Instant::now() + timeout;
277
278 loop {
279 tokio::time::sleep(poll_interval).await;
280
281 if let Some(initial_id) = initial_stable_id {
284 match self.transport.peer_connection_stable_id(req.target_node) {
285 Some(current_id) if current_id != initial_id => {
286 let reason = format!(
287 "peer identity changed mid-migration: connection stable_id {} -> {} for node {}",
288 initial_id, current_id, req.target_node
289 );
290 state.fail(reason.clone());
291 return Err(ClusterError::Transport { detail: reason });
292 }
293 None => {
294 let reason = format!(
296 "connection to target node {} lost during migration",
297 req.target_node
298 );
299 state.fail(reason.clone());
300 return Err(ClusterError::Transport { detail: reason });
301 }
302 _ => {}
303 }
304 }
305
306 {
308 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
309 let current_addr = topo.get_node(req.target_node).map(|n| n.addr.clone());
310 if current_addr != initial_target_addr {
311 let reason = format!(
312 "target node {} address changed during migration: {:?} -> {:?}",
313 req.target_node, initial_target_addr, current_addr
314 );
315 state.fail(reason.clone());
316 return Err(ClusterError::Transport { detail: reason });
317 }
318 }
319
320 let (leader_commit, target_match) = {
321 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
322 let statuses = mr.group_statuses();
323 let commit = statuses
324 .iter()
325 .find(|s| s.group_id == group_id)
326 .map(|s| s.commit_index)
327 .unwrap_or(0);
328 let target_match = mr.match_index_for(group_id, req.target_node).unwrap_or(0);
330 (commit, target_match)
331 };
332
333 state.update_wal_catchup(leader_commit, target_match);
334
335 if state.is_catchup_ready() {
336 let promote = ConfChange {
339 change_type: ConfChangeType::PromoteLearner,
340 node_id: req.target_node,
341 };
342 {
343 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
344 mr.propose_conf_change(group_id, &promote)?;
345 }
346 debug!(
347 vshard = req.vshard_id,
348 leader_commit,
349 target_match,
350 "phase 2 complete: target caught up and promoted to voter"
351 );
352 return Ok(());
353 }
354
355 if std::time::Instant::now() >= deadline {
356 let reason = format!(
357 "WAL catch-up timed out after {}s (leader={leader_commit}, target={target_match})",
358 timeout.as_secs()
359 );
360 state.fail(reason.clone());
361 return Err(ClusterError::Transport { detail: reason });
362 }
363 }
364 }
365
366 async fn phase3_cutover(
374 &self,
375 state: &mut MigrationState,
376 group_id: u64,
377 req: &MigrationRequest,
378 ) -> Result<()> {
379 let estimated_pause_us = 10_000;
380
381 state.start_cutover(estimated_pause_us).map_err(|e| {
382 state.fail(format!("cutover rejected: {e}"));
383 e
384 })?;
385
386 let cutover_start = std::time::Instant::now();
387
388 info!(
389 vshard = req.vshard_id,
390 estimated_pause_us, "phase 3: atomic cut-over"
391 );
392
393 if let Some(proposer) = &self.metadata_proposer {
398 let entry = MetadataEntry::RoutingChange(RoutingChange::LeadershipTransfer {
399 group_id,
400 new_leader_node_id: req.target_node,
401 });
402 proposer.propose_and_wait(entry).await?;
403 } else {
404 let mut routing = self.routing.write().unwrap_or_else(|p| p.into_inner());
405 routing.set_leader(group_id, req.target_node);
406 }
407
408 {
411 let mut ghosts = self.ghost_table.lock().unwrap_or_else(|p| p.into_inner());
412 ghosts.insert(GhostStub {
413 node_id: format!("vshard-{}", req.vshard_id),
414 target_shard: req.vshard_id,
415 refcount: 1,
416 created_at_ms: std::time::SystemTime::now()
417 .duration_since(std::time::UNIX_EPOCH)
418 .unwrap_or_default()
419 .as_millis() as u64,
420 });
421 }
422
423 let actual_pause_us = cutover_start.elapsed().as_micros() as u64;
424 state.complete(actual_pause_us);
425
426 debug!(
427 vshard = req.vshard_id,
428 actual_pause_us, "phase 3 complete: routing updated"
429 );
430
431 Ok(())
432 }
433}
434
435pub struct MigrationTracker {
437 active: Mutex<Vec<MigrationState>>,
438}
439
440impl MigrationTracker {
441 pub fn new() -> Self {
442 Self {
443 active: Mutex::new(Vec::new()),
444 }
445 }
446
447 pub fn add(&self, state: MigrationState) {
448 let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
449 active.push(state);
450 }
451
452 pub fn active_count(&self) -> usize {
453 let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
454 active.iter().filter(|s| s.is_active()).count()
455 }
456
457 pub fn snapshot(&self) -> Vec<MigrationSnapshot> {
459 let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
460 active
461 .iter()
462 .map(|s| MigrationSnapshot {
463 vshard_id: s.vshard_id(),
464 phase: format!("{:?}", s.phase()),
465 elapsed_ms: s.elapsed().map(|d| d.as_millis() as u64).unwrap_or(0),
466 is_active: s.is_active(),
467 })
468 .collect()
469 }
470
471 pub fn gc(&self, max_age: Duration) {
473 let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
474 active.retain(|s| s.is_active() || s.elapsed().map(|d| d < max_age).unwrap_or(true));
475 }
476}
477
478impl Default for MigrationTracker {
479 fn default() -> Self {
480 Self::new()
481 }
482}
483
484#[derive(Debug, Clone)]
486pub struct MigrationSnapshot {
487 pub vshard_id: u16,
488 pub phase: String,
489 pub elapsed_ms: u64,
490 pub is_active: bool,
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use crate::routing::RoutingTable;
497 use crate::topology::ClusterTopology;
498
499 #[test]
500 fn migration_tracker_lifecycle() {
501 let tracker = MigrationTracker::new();
502 assert_eq!(tracker.active_count(), 0);
503
504 let mut state = MigrationState::new(0, 0, 1, 1, 2, 500_000);
505 state.start_base_copy(100);
506 tracker.add(state);
507
508 assert_eq!(tracker.active_count(), 1);
509 assert_eq!(tracker.snapshot().len(), 1);
510 assert!(tracker.snapshot()[0].is_active);
511 }
512
513 #[tokio::test]
514 async fn migration_executor_phase1() {
515 let dir = tempfile::tempdir().unwrap();
517 let rt = RoutingTable::uniform(1, &[1], 1);
518 let mut mr = crate::multi_raft::MultiRaft::new(1, rt.clone(), dir.path().to_path_buf());
519 mr.add_group(0, vec![]).unwrap();
520
521 use std::time::Instant;
523 for node in mr.groups_mut().values_mut() {
524 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
525 }
526 let _ = mr.tick();
528 for (gid, ready) in mr.tick().groups {
530 if let Some(last) = ready.committed_entries.last() {
531 mr.advance_applied(gid, last.index).unwrap();
532 }
533 }
534
535 let multi_raft = Arc::new(Mutex::new(mr));
536 let routing = Arc::new(RwLock::new(rt));
537 let topology = Arc::new(RwLock::new(ClusterTopology::new()));
538 let transport = Arc::new(
539 NexarTransport::new(
540 1,
541 "127.0.0.1:0".parse().unwrap(),
542 crate::transport::credentials::TransportCredentials::Insecure,
543 )
544 .unwrap(),
545 );
546
547 let executor = MigrationExecutor::new(multi_raft.clone(), routing, topology, transport);
548
549 let mut state = MigrationState::new(0, 0, 0, 1, 2, 500_000);
550
551 let req = MigrationRequest {
552 vshard_id: 0,
553 source_node: 1,
554 target_node: 2,
555 write_pause_budget_us: 500_000,
556 };
557
558 executor
560 .phase1_base_copy(&mut state, 0, &req)
561 .await
562 .unwrap();
563
564 }
567
568 #[test]
569 fn migration_request_default() {
570 let req = MigrationRequest::default();
571 assert_eq!(req.write_pause_budget_us, 500_000);
572 }
573}