use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use tracing::{debug, info};
use crate::conf_change::{ConfChange, ConfChangeType};
use crate::error::{ClusterError, Result};
use crate::ghost::{GhostStub, GhostTable};
use crate::migration::{MigrationPhase, MigrationState};
use crate::multi_raft::MultiRaft;
use crate::routing::RoutingTable;
use crate::topology::ClusterTopology;
use crate::transport::NexarTransport;
#[derive(Debug, Clone)]
pub struct MigrationRequest {
pub vshard_id: u16,
pub source_node: u64,
pub target_node: u64,
pub write_pause_budget_us: u64,
}
impl Default for MigrationRequest {
fn default() -> Self {
Self {
vshard_id: 0,
source_node: 0,
target_node: 0,
write_pause_budget_us: 500_000, }
}
}
#[derive(Debug)]
pub struct MigrationResult {
pub vshard_id: u16,
pub source_node: u64,
pub target_node: u64,
pub phase: MigrationPhase,
pub elapsed: Option<Duration>,
}
pub struct MigrationExecutor {
multi_raft: Arc<Mutex<MultiRaft>>,
routing: Arc<RwLock<RoutingTable>>,
topology: Arc<RwLock<ClusterTopology>>,
transport: Arc<NexarTransport>,
ghost_table: Arc<Mutex<GhostTable>>,
}
impl MigrationExecutor {
pub fn new(
multi_raft: Arc<Mutex<MultiRaft>>,
routing: Arc<RwLock<RoutingTable>>,
topology: Arc<RwLock<ClusterTopology>>,
transport: Arc<NexarTransport>,
) -> Self {
Self {
multi_raft,
routing,
topology,
transport,
ghost_table: Arc::new(Mutex::new(GhostTable::new())),
}
}
pub fn ghost_table(&self) -> &Arc<Mutex<GhostTable>> {
&self.ghost_table
}
pub async fn execute(&self, req: MigrationRequest) -> Result<MigrationResult> {
let source_group = {
let routing = self.routing.read().unwrap_or_else(|p| p.into_inner());
routing.group_for_vshard(req.vshard_id)?
};
let mut state = MigrationState::new(
req.vshard_id,
source_group,
source_group, req.source_node,
req.target_node,
req.write_pause_budget_us,
);
info!(
vshard = req.vshard_id,
source = req.source_node,
target = req.target_node,
group = source_group,
"starting vShard migration"
);
self.phase1_base_copy(&mut state, source_group, &req)
.await?;
self.phase2_wal_catchup(&mut state, source_group, &req)
.await?;
self.phase3_cutover(&mut state, source_group, &req).await?;
let elapsed = state.elapsed();
let phase = state.phase().clone();
info!(
vshard = req.vshard_id,
source = req.source_node,
target = req.target_node,
elapsed_ms = elapsed.map(|d| d.as_millis() as u64).unwrap_or(0),
"vShard migration completed"
);
Ok(MigrationResult {
vshard_id: req.vshard_id,
source_node: req.source_node,
target_node: req.target_node,
phase,
elapsed,
})
}
async fn phase1_base_copy(
&self,
state: &mut MigrationState,
group_id: u64,
req: &MigrationRequest,
) -> Result<()> {
let committed = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let statuses = mr.group_statuses();
statuses
.iter()
.find(|s| s.group_id == group_id)
.map(|s| s.commit_index)
.unwrap_or(0)
};
state.start_base_copy(committed);
info!(
vshard = req.vshard_id,
group = group_id,
target = req.target_node,
entries = committed,
"phase 1: adding target to raft group"
);
let change = ConfChange {
change_type: ConfChangeType::AddNode,
node_id: req.target_node,
};
{
let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
mr.propose_conf_change(group_id, &change)?;
}
if let Some(node_info) = {
let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
topo.get_node(req.target_node).map(|n| n.addr.clone())
} && let Ok(addr) = node_info.parse()
{
self.transport.register_peer(req.target_node, addr);
}
state.update_base_copy(committed);
debug!(
vshard = req.vshard_id,
"phase 1 complete: target added to raft group"
);
Ok(())
}
async fn phase2_wal_catchup(
&self,
state: &mut MigrationState,
group_id: u64,
req: &MigrationRequest,
) -> Result<()> {
let leader_commit = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let statuses = mr.group_statuses();
statuses
.iter()
.find(|s| s.group_id == group_id)
.map(|s| s.commit_index)
.unwrap_or(0)
};
state.start_wal_catchup(leader_commit, leader_commit);
info!(
vshard = req.vshard_id,
leader_commit, "phase 2: monitoring replication lag"
);
let initial_stable_id = self.transport.peer_connection_stable_id(req.target_node);
let initial_target_addr = {
let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
topo.get_node(req.target_node).map(|n| n.addr.clone())
};
let poll_interval = Duration::from_millis(100);
let timeout = Duration::from_secs(60);
let deadline = std::time::Instant::now() + timeout;
loop {
tokio::time::sleep(poll_interval).await;
if let Some(initial_id) = initial_stable_id {
match self.transport.peer_connection_stable_id(req.target_node) {
Some(current_id) if current_id != initial_id => {
let reason = format!(
"peer identity changed mid-migration: connection stable_id {} -> {} for node {}",
initial_id, current_id, req.target_node
);
state.fail(reason.clone());
return Err(ClusterError::Transport { detail: reason });
}
None => {
let reason = format!(
"connection to target node {} lost during migration",
req.target_node
);
state.fail(reason.clone());
return Err(ClusterError::Transport { detail: reason });
}
_ => {}
}
}
{
let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
let current_addr = topo.get_node(req.target_node).map(|n| n.addr.clone());
if current_addr != initial_target_addr {
let reason = format!(
"target node {} address changed during migration: {:?} -> {:?}",
req.target_node, initial_target_addr, current_addr
);
state.fail(reason.clone());
return Err(ClusterError::Transport { detail: reason });
}
}
let (leader_commit, target_match) = {
let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let statuses = mr.group_statuses();
let commit = statuses
.iter()
.find(|s| s.group_id == group_id)
.map(|s| s.commit_index)
.unwrap_or(0);
let target_match = mr.match_index_for(group_id, req.target_node).unwrap_or(0);
(commit, target_match)
};
state.update_wal_catchup(leader_commit, target_match);
if state.is_catchup_ready() {
debug!(
vshard = req.vshard_id,
leader_commit, target_match, "phase 2 complete: target caught up"
);
return Ok(());
}
if std::time::Instant::now() >= deadline {
let reason = format!(
"WAL catch-up timed out after {}s (leader={leader_commit}, target={target_match})",
timeout.as_secs()
);
state.fail(reason.clone());
return Err(ClusterError::Transport { detail: reason });
}
}
}
async fn phase3_cutover(
&self,
state: &mut MigrationState,
group_id: u64,
req: &MigrationRequest,
) -> Result<()> {
let estimated_pause_us = 10_000;
state.start_cutover(estimated_pause_us).map_err(|e| {
state.fail(format!("cutover rejected: {e}"));
e
})?;
let cutover_start = std::time::Instant::now();
info!(
vshard = req.vshard_id,
estimated_pause_us, "phase 3: atomic cut-over"
);
let routing_change = ConfChange {
change_type: ConfChangeType::AddNode,
node_id: req.target_node,
};
{
let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
mr.propose_conf_change(group_id, &routing_change)?;
}
{
let mut routing = self.routing.write().unwrap_or_else(|p| p.into_inner());
routing.reassign_vshard(req.vshard_id, group_id);
}
{
let mut ghosts = self.ghost_table.lock().unwrap_or_else(|p| p.into_inner());
ghosts.insert(GhostStub {
node_id: format!("vshard-{}", req.vshard_id),
target_shard: req.vshard_id,
refcount: 1,
created_at_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
});
}
debug!(
vshard = req.vshard_id,
target = req.target_node,
"ghost stub registered for transparent forwarding"
);
let actual_pause_us = cutover_start.elapsed().as_micros() as u64;
state.complete(actual_pause_us);
debug!(
vshard = req.vshard_id,
actual_pause_us, "phase 3 complete: routing updated via raft"
);
Ok(())
}
}
pub struct MigrationTracker {
active: Mutex<Vec<MigrationState>>,
}
impl MigrationTracker {
pub fn new() -> Self {
Self {
active: Mutex::new(Vec::new()),
}
}
pub fn add(&self, state: MigrationState) {
let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
active.push(state);
}
pub fn active_count(&self) -> usize {
let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
active.iter().filter(|s| s.is_active()).count()
}
pub fn snapshot(&self) -> Vec<MigrationSnapshot> {
let active = self.active.lock().unwrap_or_else(|p| p.into_inner());
active
.iter()
.map(|s| MigrationSnapshot {
vshard_id: s.vshard_id(),
phase: format!("{:?}", s.phase()),
elapsed_ms: s.elapsed().map(|d| d.as_millis() as u64).unwrap_or(0),
is_active: s.is_active(),
})
.collect()
}
pub fn gc(&self, max_age: Duration) {
let mut active = self.active.lock().unwrap_or_else(|p| p.into_inner());
active.retain(|s| s.is_active() || s.elapsed().map(|d| d < max_age).unwrap_or(true));
}
}
impl Default for MigrationTracker {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MigrationSnapshot {
pub vshard_id: u16,
pub phase: String,
pub elapsed_ms: u64,
pub is_active: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::routing::RoutingTable;
use crate::topology::ClusterTopology;
#[test]
fn migration_tracker_lifecycle() {
let tracker = MigrationTracker::new();
assert_eq!(tracker.active_count(), 0);
let mut state = MigrationState::new(0, 0, 1, 1, 2, 500_000);
state.start_base_copy(100);
tracker.add(state);
assert_eq!(tracker.active_count(), 1);
assert_eq!(tracker.snapshot().len(), 1);
assert!(tracker.snapshot()[0].is_active);
}
#[tokio::test]
async fn migration_executor_phase1() {
let dir = tempfile::tempdir().unwrap();
let rt = RoutingTable::uniform(1, &[1], 1);
let mut mr = crate::multi_raft::MultiRaft::new(1, rt.clone(), dir.path().to_path_buf());
mr.add_group(0, vec![]).unwrap();
use std::time::Instant;
for node in mr.groups_mut().values_mut() {
node.election_deadline_override(Instant::now() - Duration::from_millis(1));
}
let _ = mr.tick();
for (gid, ready) in mr.tick().groups {
if let Some(last) = ready.committed_entries.last() {
mr.advance_applied(gid, last.index).unwrap();
}
}
let multi_raft = Arc::new(Mutex::new(mr));
let routing = Arc::new(RwLock::new(rt));
let topology = Arc::new(RwLock::new(ClusterTopology::new()));
let transport = Arc::new(NexarTransport::new(1, "127.0.0.1:0".parse().unwrap()).unwrap());
let executor = MigrationExecutor::new(multi_raft.clone(), routing, topology, transport);
let mut state = MigrationState::new(0, 0, 0, 1, 2, 500_000);
let req = MigrationRequest {
vshard_id: 0,
source_node: 1,
target_node: 2,
write_pause_budget_us: 500_000,
};
executor
.phase1_base_copy(&mut state, 0, &req)
.await
.unwrap();
}
#[test]
fn migration_request_default() {
let req = MigrationRequest::default();
assert_eq!(req.write_pause_budget_us, 500_000);
}
}