use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::Buf;
use dashmap::DashMap;
use parking_lot::Mutex;
use super::migration::{MigrationError, MigrationFailureReason, MigrationPhase, MigrationState};
use super::migration_source::MigrationSourceHandler;
use super::registry::DaemonRegistry;
use crate::adapter::net::continuity::superposition::SuperpositionState;
use crate::adapter::net::identity::EntityId;
use crate::adapter::net::state::causal::{CausalEvent, CausalLink};
use crate::adapter::net::state::snapshot::{StateSnapshot, SNAPSHOT_VERSION};
const SNAPSHOT_WIRE_MAGIC: &[u8; 4] = b"CDS1";
#[expect(
clippy::expect_used,
reason = "snapshot_bytes.len() >= 37 checked above; slicing [5..37] yields exactly 32 bytes"
)]
fn validate_chunk_header(
chunk_index: u32,
snapshot_bytes: &[u8],
expected_daemon_origin: u64,
) -> Result<(), MigrationError> {
if chunk_index != 0 {
return Ok(());
}
if snapshot_bytes.len() < 37 {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady chunk 0 is {} bytes — too short for snapshot envelope (need >= 37)",
snapshot_bytes.len()
)));
}
if &snapshot_bytes[..4] != SNAPSHOT_WIRE_MAGIC {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady chunk 0 has wrong magic bytes: {:?}",
&snapshot_bytes[..4]
)));
}
let version = snapshot_bytes[4];
if version != SNAPSHOT_VERSION {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady chunk 0 carries snapshot version {} (expected {})",
version, SNAPSHOT_VERSION
)));
}
let entity_bytes: [u8; 32] = snapshot_bytes[5..37].try_into().expect("range is 32 bytes");
let claimed_origin = EntityId::from_bytes(entity_bytes).origin_hash();
if claimed_origin != expected_daemon_origin {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady chunk 0 entity_id origin {:#x} does not match daemon_origin {:#x}",
claimed_origin, expected_daemon_origin
)));
}
Ok(())
}
#[derive(Debug, Clone)]
pub enum MigrationMessage {
TakeSnapshot {
daemon_origin: u64,
target_node: u64,
},
SnapshotReady {
daemon_origin: u64,
snapshot_bytes: Vec<u8>,
seq_through: u64,
chunk_index: u32,
total_chunks: u32,
},
RestoreComplete {
daemon_origin: u64,
restored_seq: u64,
},
ReplayComplete {
daemon_origin: u64,
replayed_seq: u64,
target_head: CausalLink,
},
CutoverNotify {
daemon_origin: u64,
target_node: u64,
},
CleanupComplete {
daemon_origin: u64,
},
MigrationFailed {
daemon_origin: u64,
reason: MigrationFailureReason,
},
BufferedEvents {
daemon_origin: u64,
events: Vec<CausalEvent>,
},
ActivateTarget {
daemon_origin: u64,
},
ActivateAck {
daemon_origin: u64,
replayed_seq: u64,
},
}
pub mod wire {
use super::*;
use bytes::{Buf, BufMut};
pub const MSG_TAKE_SNAPSHOT: u8 = 0;
pub const MSG_SNAPSHOT_READY: u8 = 1;
pub const MSG_RESTORE_COMPLETE: u8 = 2;
pub const MSG_REPLAY_COMPLETE: u8 = 3;
pub const MSG_CUTOVER_NOTIFY: u8 = 4;
pub const MSG_CLEANUP_COMPLETE: u8 = 5;
pub const MSG_FAILED: u8 = 6;
pub const MSG_BUFFERED_EVENTS: u8 = 7;
pub const MSG_ACTIVATE_TARGET: u8 = 8;
pub const MSG_ACTIVATE_ACK: u8 = 9;
pub fn encode(msg: &MigrationMessage) -> Result<Vec<u8>, MigrationError> {
fn len_u32(field: &str, n: usize) -> Result<u32, MigrationError> {
u32::try_from(n).map_err(|_| {
MigrationError::StateFailed(format!("{} length {} exceeds u32::MAX", field, n))
})
}
let mut buf = Vec::with_capacity(128);
match msg {
MigrationMessage::TakeSnapshot {
daemon_origin,
target_node,
} => {
buf.put_u8(MSG_TAKE_SNAPSHOT);
buf.put_u64_le(*daemon_origin);
buf.put_u64_le(*target_node);
}
MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} => {
let payload_len = len_u32("snapshot_bytes", snapshot_bytes.len())?;
buf.put_u8(MSG_SNAPSHOT_READY);
buf.put_u64_le(*daemon_origin);
buf.put_u64_le(*seq_through);
buf.put_u32_le(*chunk_index);
buf.put_u32_le(*total_chunks);
buf.put_u32_le(payload_len);
buf.extend_from_slice(snapshot_bytes);
}
MigrationMessage::RestoreComplete {
daemon_origin,
restored_seq,
} => {
buf.put_u8(MSG_RESTORE_COMPLETE);
buf.put_u64_le(*daemon_origin);
buf.put_u64_le(*restored_seq);
}
MigrationMessage::ReplayComplete {
daemon_origin,
replayed_seq,
target_head,
} => {
buf.put_u8(MSG_REPLAY_COMPLETE);
buf.put_u64_le(*daemon_origin);
buf.put_u64_le(*replayed_seq);
buf.extend_from_slice(&target_head.to_bytes());
}
MigrationMessage::CutoverNotify {
daemon_origin,
target_node,
} => {
buf.put_u8(MSG_CUTOVER_NOTIFY);
buf.put_u64_le(*daemon_origin);
buf.put_u64_le(*target_node);
}
MigrationMessage::CleanupComplete { daemon_origin } => {
buf.put_u8(MSG_CLEANUP_COMPLETE);
buf.put_u64_le(*daemon_origin);
}
MigrationMessage::MigrationFailed {
daemon_origin,
reason,
} => {
buf.put_u8(MSG_FAILED);
buf.put_u64_le(*daemon_origin);
buf.put_u16_le(reason.code());
match reason {
MigrationFailureReason::NotReady
| MigrationFailureReason::FactoryNotFound
| MigrationFailureReason::ComputeNotSupported
| MigrationFailureReason::AlreadyMigrating => {}
MigrationFailureReason::StateFailed(msg)
| MigrationFailureReason::IdentityTransportFailed(msg) => {
let len = u16::try_from(msg.len()).map_err(|_| {
MigrationError::StateFailed(format!(
"failure reason message length {} exceeds u16::MAX",
msg.len()
))
})?;
buf.put_u16_le(len);
buf.extend_from_slice(msg.as_bytes());
}
MigrationFailureReason::NotReadyTimeout { attempts } => {
buf.put_u8(*attempts);
}
}
}
MigrationMessage::BufferedEvents {
daemon_origin,
events,
} => {
let event_count = len_u32("events", events.len())?;
buf.put_u8(MSG_BUFFERED_EVENTS);
buf.put_u64_le(*daemon_origin);
buf.put_u32_le(event_count);
for event in events {
let payload_len = len_u32("event payload", event.payload.len())?;
let link_bytes = event.link.to_bytes();
buf.extend_from_slice(&link_bytes);
buf.put_u32_le(payload_len);
buf.extend_from_slice(&event.payload);
buf.put_u64_le(event.received_at);
}
}
MigrationMessage::ActivateTarget { daemon_origin } => {
buf.put_u8(MSG_ACTIVATE_TARGET);
buf.put_u64_le(*daemon_origin);
}
MigrationMessage::ActivateAck {
daemon_origin,
replayed_seq,
} => {
buf.put_u8(MSG_ACTIVATE_ACK);
buf.put_u64_le(*daemon_origin);
buf.put_u64_le(*replayed_seq);
}
}
Ok(buf)
}
pub fn decode(data: &[u8]) -> Result<MigrationMessage, MigrationError> {
if data.is_empty() {
return Err(MigrationError::StateFailed("empty message".into()));
}
let mut cur = std::io::Cursor::new(data);
let msg_type = cur.get_u8();
match msg_type {
MSG_TAKE_SNAPSHOT => {
if cur.remaining() < 16 {
return Err(MigrationError::StateFailed("truncated TakeSnapshot".into()));
}
Ok(MigrationMessage::TakeSnapshot {
daemon_origin: cur.get_u64_le(),
target_node: cur.get_u64_le(),
})
}
MSG_SNAPSHOT_READY => {
if cur.remaining() < 28 {
return Err(MigrationError::StateFailed(
"truncated SnapshotReady".into(),
));
}
let daemon_origin = cur.get_u64_le();
let seq_through = cur.get_u64_le();
let chunk_index = cur.get_u32_le();
let total_chunks = cur.get_u32_le();
let len = cur.get_u32_le() as usize;
if total_chunks == 0 {
return Err(MigrationError::StateFailed(
"SnapshotReady: total_chunks must be >= 1".into(),
));
}
if total_chunks > MAX_TOTAL_CHUNKS {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady: total_chunks {} exceeds MAX_TOTAL_CHUNKS ({})",
total_chunks, MAX_TOTAL_CHUNKS
)));
}
if chunk_index >= total_chunks {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady: chunk_index {} out of range for total_chunks {}",
chunk_index, total_chunks
)));
}
if len > MAX_SNAPSHOT_CHUNK_SIZE {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady: chunk len {} exceeds MAX_SNAPSHOT_CHUNK_SIZE ({})",
len, MAX_SNAPSHOT_CHUNK_SIZE
)));
}
if cur.remaining() < len {
return Err(MigrationError::StateFailed(
"truncated snapshot payload".into(),
));
}
let mut snapshot_bytes = vec![0u8; len];
cur.copy_to_slice(&mut snapshot_bytes);
Ok(MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
})
}
MSG_RESTORE_COMPLETE => {
if cur.remaining() < 16 {
return Err(MigrationError::StateFailed(
"truncated RestoreComplete".into(),
));
}
Ok(MigrationMessage::RestoreComplete {
daemon_origin: cur.get_u64_le(),
restored_seq: cur.get_u64_le(),
})
}
MSG_REPLAY_COMPLETE => {
if cur.remaining() < 48 {
return Err(MigrationError::StateFailed(
"truncated ReplayComplete".into(),
));
}
let daemon_origin = cur.get_u64_le();
let replayed_seq = cur.get_u64_le();
let mut head_bytes = [0u8; 32];
cur.copy_to_slice(&mut head_bytes);
let target_head = CausalLink::from_bytes(&head_bytes).ok_or_else(|| {
MigrationError::StateFailed(
"ReplayComplete: malformed target_head bytes".into(),
)
})?;
Ok(MigrationMessage::ReplayComplete {
daemon_origin,
replayed_seq,
target_head,
})
}
MSG_CUTOVER_NOTIFY => {
if cur.remaining() < 16 {
return Err(MigrationError::StateFailed(
"truncated CutoverNotify".into(),
));
}
Ok(MigrationMessage::CutoverNotify {
daemon_origin: cur.get_u64_le(),
target_node: cur.get_u64_le(),
})
}
MSG_CLEANUP_COMPLETE => {
if cur.remaining() < 8 {
return Err(MigrationError::StateFailed(
"truncated CleanupComplete".into(),
));
}
Ok(MigrationMessage::CleanupComplete {
daemon_origin: cur.get_u64_le(),
})
}
MSG_FAILED => {
if cur.remaining() < 8 + 2 {
return Err(MigrationError::StateFailed(
"truncated MigrationFailed header".into(),
));
}
let daemon_origin = cur.get_u64_le();
let code = cur.get_u16_le();
let reason = decode_failure_reason(&mut cur, code)?;
Ok(MigrationMessage::MigrationFailed {
daemon_origin,
reason,
})
}
MSG_BUFFERED_EVENTS => {
if cur.remaining() < 12 {
return Err(MigrationError::StateFailed(
"truncated BufferedEvents".into(),
));
}
let daemon_origin = cur.get_u64_le();
let count = cur.get_u32_le() as usize;
use crate::adapter::net::state::causal::CAUSAL_LINK_SIZE;
const MIN_EVENT_WIRE_SIZE: usize = CAUSAL_LINK_SIZE + 4 + 8;
const MAX_BUFFERED_EVENTS: usize = 1_000_000;
let max_possible = cur.remaining() / MIN_EVENT_WIRE_SIZE;
if count > max_possible || count > MAX_BUFFERED_EVENTS {
return Err(MigrationError::StateFailed(format!(
"BufferedEvents: count {} exceeds bound (remaining={}, \
min_event_size={}, max_possible={}, hard_cap={})",
count,
cur.remaining(),
MIN_EVENT_WIRE_SIZE,
max_possible,
MAX_BUFFERED_EVENTS,
)));
}
let mut events = Vec::with_capacity(count);
for _ in 0..count {
if cur.remaining() < CAUSAL_LINK_SIZE + 4 {
return Err(MigrationError::StateFailed(
"truncated buffered event".into(),
));
}
let mut link_bytes = [0u8; CAUSAL_LINK_SIZE];
cur.copy_to_slice(&mut link_bytes);
let link = CausalLink::from_bytes(&link_bytes)
.ok_or_else(|| MigrationError::StateFailed("invalid causal link".into()))?;
let payload_len = cur.get_u32_le() as usize;
if payload_len > MAX_SNAPSHOT_CHUNK_SIZE {
return Err(MigrationError::StateFailed(format!(
"buffered event payload {} exceeds per-event cap {}",
payload_len, MAX_SNAPSHOT_CHUNK_SIZE
)));
}
let need = payload_len.saturating_add(8);
if cur.remaining() < need {
return Err(MigrationError::StateFailed(
"truncated event payload".into(),
));
}
let mut payload = vec![0u8; payload_len];
cur.copy_to_slice(&mut payload);
let received_at = cur.get_u64_le();
events.push(CausalEvent {
link,
payload: bytes::Bytes::from(payload),
received_at,
});
}
Ok(MigrationMessage::BufferedEvents {
daemon_origin,
events,
})
}
MSG_ACTIVATE_TARGET => {
if cur.remaining() < 8 {
return Err(MigrationError::StateFailed(
"truncated ActivateTarget".into(),
));
}
Ok(MigrationMessage::ActivateTarget {
daemon_origin: cur.get_u64_le(),
})
}
MSG_ACTIVATE_ACK => {
if cur.remaining() < 16 {
return Err(MigrationError::StateFailed("truncated ActivateAck".into()));
}
Ok(MigrationMessage::ActivateAck {
daemon_origin: cur.get_u64_le(),
replayed_seq: cur.get_u64_le(),
})
}
_ => Err(MigrationError::StateFailed(format!(
"unknown message type: {}",
msg_type
))),
}
}
}
fn decode_failure_reason(
cur: &mut std::io::Cursor<&[u8]>,
code: u16,
) -> Result<MigrationFailureReason, MigrationError> {
match code {
0 => Ok(MigrationFailureReason::NotReady),
1 => Ok(MigrationFailureReason::FactoryNotFound),
2 => Ok(MigrationFailureReason::ComputeNotSupported),
3 => {
let msg = read_u16_string(cur, "StateFailed message")?;
Ok(MigrationFailureReason::StateFailed(msg))
}
4 => Ok(MigrationFailureReason::AlreadyMigrating),
5 => {
let msg = read_u16_string(cur, "IdentityTransportFailed message")?;
Ok(MigrationFailureReason::IdentityTransportFailed(msg))
}
6 => {
if cur.remaining() < 1 {
return Err(MigrationError::StateFailed(
"truncated NotReadyTimeout attempts field".into(),
));
}
Ok(MigrationFailureReason::NotReadyTimeout {
attempts: cur.get_u8(),
})
}
other => Err(MigrationError::StateFailed(format!(
"unknown MigrationFailureReason code {other}",
))),
}
}
fn read_u16_string(cur: &mut std::io::Cursor<&[u8]>, ctx: &str) -> Result<String, MigrationError> {
if cur.remaining() < 2 {
return Err(MigrationError::StateFailed(format!(
"truncated {ctx} length prefix",
)));
}
let len = cur.get_u16_le() as usize;
if cur.remaining() < len {
return Err(MigrationError::StateFailed(format!("truncated {ctx} body")));
}
let mut bytes = vec![0u8; len];
cur.copy_to_slice(&mut bytes);
String::from_utf8(bytes)
.map_err(|e| MigrationError::StateFailed(format!("{ctx} is not valid UTF-8: {e}")))
}
pub const MAX_SNAPSHOT_CHUNK_SIZE: usize = 7000;
pub const MAX_SNAPSHOT_SIZE: usize = u32::MAX as usize * MAX_SNAPSHOT_CHUNK_SIZE;
pub const MAX_TOTAL_CHUNKS: u32 = 700_000;
pub const MAX_PENDING_REASSEMBLY_BYTES: usize = 64 * 1024 * 1024;
pub const MAX_PENDING_REASSEMBLY_AGE: Duration = Duration::from_secs(300);
pub fn chunk_snapshot(
daemon_origin: u64,
snapshot_bytes: Vec<u8>,
seq_through: u64,
) -> Result<Vec<MigrationMessage>, MigrationError> {
if snapshot_bytes.len() <= MAX_SNAPSHOT_CHUNK_SIZE {
return Ok(vec![MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index: 0,
total_chunks: 1,
}]);
}
let total_chunks = snapshot_bytes.len().div_ceil(MAX_SNAPSHOT_CHUNK_SIZE);
let total_chunks =
u32::try_from(total_chunks).map_err(|_| MigrationError::SnapshotTooLarge {
size: snapshot_bytes.len(),
max: MAX_SNAPSHOT_SIZE,
})?;
Ok(snapshot_bytes
.chunks(MAX_SNAPSHOT_CHUNK_SIZE)
.enumerate()
.map(|(i, chunk)| MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes: chunk.to_vec(),
seq_through,
chunk_index: i as u32,
total_chunks,
})
.collect())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReassemblyError {
ZeroTotalChunks,
ChunkIndexOutOfRange {
chunk_index: u32,
total_chunks: u32,
},
TotalChunksTooLarge {
total_chunks: u32,
},
ChunkTooLarge {
len: usize,
},
TotalChunksMismatch {
got: u32,
expected: u32,
},
StaleSeqThrough {
got: u64,
latest: u64,
},
TooManyPendingBytes {
buffered: usize,
incoming: usize,
cap: usize,
},
}
impl std::fmt::Display for ReassemblyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ZeroTotalChunks => write!(f, "total_chunks == 0"),
Self::ChunkIndexOutOfRange {
chunk_index,
total_chunks,
} => write!(
f,
"chunk_index {} out of range for total_chunks {}",
chunk_index, total_chunks
),
Self::TotalChunksTooLarge { total_chunks } => write!(
f,
"total_chunks {} exceeds MAX_TOTAL_CHUNKS ({})",
total_chunks, MAX_TOTAL_CHUNKS
),
Self::ChunkTooLarge { len } => write!(
f,
"chunk length {} exceeds MAX_SNAPSHOT_CHUNK_SIZE ({})",
len, MAX_SNAPSHOT_CHUNK_SIZE
),
Self::TotalChunksMismatch { got, expected } => write!(
f,
"total_chunks {} does not match first chunk's declared {}",
got, expected
),
Self::StaleSeqThrough { got, latest } => write!(
f,
"seq_through {} is older than latest accepted {} for this daemon",
got, latest
),
Self::TooManyPendingBytes {
buffered,
incoming,
cap,
} => write!(
f,
"buffered {} + incoming {} would exceed per-entry cap {}",
buffered, incoming, cap
),
}
}
}
impl std::error::Error for ReassemblyError {}
pub struct SnapshotReassembler {
pending: std::collections::HashMap<(u64, u64), ReassemblyState>,
latest_seq: std::collections::HashMap<u64, u64>,
max_pending_age: Duration,
}
struct ReassemblyState {
total_chunks: u32,
chunks: std::collections::BTreeMap<u32, Vec<u8>>,
bytes_buffered: usize,
last_progress_at: Instant,
}
impl SnapshotReassembler {
pub fn new() -> Self {
Self::with_max_pending_age(MAX_PENDING_REASSEMBLY_AGE)
}
pub fn with_max_pending_age(max_pending_age: Duration) -> Self {
Self {
pending: std::collections::HashMap::new(),
latest_seq: std::collections::HashMap::new(),
max_pending_age,
}
}
pub fn feed(
&mut self,
daemon_origin: u64,
snapshot_bytes: Vec<u8>,
seq_through: u64,
chunk_index: u32,
total_chunks: u32,
) -> Result<Option<Vec<u8>>, ReassemblyError> {
if total_chunks == 0 {
return Err(ReassemblyError::ZeroTotalChunks);
}
if total_chunks > MAX_TOTAL_CHUNKS {
return Err(ReassemblyError::TotalChunksTooLarge { total_chunks });
}
if chunk_index >= total_chunks {
return Err(ReassemblyError::ChunkIndexOutOfRange {
chunk_index,
total_chunks,
});
}
self.sweep_stale(self.max_pending_age);
if snapshot_bytes.is_empty() {
return Err(ReassemblyError::ChunkTooLarge { len: 0 });
}
if snapshot_bytes.len() > MAX_SNAPSHOT_CHUNK_SIZE {
return Err(ReassemblyError::ChunkTooLarge {
len: snapshot_bytes.len(),
});
}
if let Some(&latest) = self.latest_seq.get(&daemon_origin) {
if seq_through < latest {
return Err(ReassemblyError::StaleSeqThrough {
got: seq_through,
latest,
});
}
}
if self
.latest_seq
.get(&daemon_origin)
.is_none_or(|&latest| seq_through > latest)
{
self.pending
.retain(|&(origin, seq), _| origin != daemon_origin || seq == seq_through);
self.latest_seq.insert(daemon_origin, seq_through);
}
if total_chunks == 1 {
if let Some(state) = self.pending.get(&(daemon_origin, seq_through)) {
if state.total_chunks != 1 {
return Err(ReassemblyError::TotalChunksMismatch {
got: 1,
expected: state.total_chunks,
});
}
}
self.pending.remove(&(daemon_origin, seq_through));
return Ok(Some(snapshot_bytes));
}
let key = (daemon_origin, seq_through);
let state = self.pending.entry(key).or_insert_with(|| ReassemblyState {
total_chunks,
chunks: std::collections::BTreeMap::new(),
bytes_buffered: 0,
last_progress_at: Instant::now(),
});
if state.total_chunks != total_chunks {
return Err(ReassemblyError::TotalChunksMismatch {
got: total_chunks,
expected: state.total_chunks,
});
}
let displaced_len = state.chunks.get(&chunk_index).map(Vec::len).unwrap_or(0);
let projected = state
.bytes_buffered
.saturating_sub(displaced_len)
.saturating_add(snapshot_bytes.len());
if projected > MAX_PENDING_REASSEMBLY_BYTES {
return Err(ReassemblyError::TooManyPendingBytes {
buffered: state.bytes_buffered,
incoming: snapshot_bytes.len(),
cap: MAX_PENDING_REASSEMBLY_BYTES,
});
}
let new_len = snapshot_bytes.len();
state.chunks.insert(chunk_index, snapshot_bytes);
state.bytes_buffered = state
.bytes_buffered
.saturating_sub(displaced_len)
.saturating_add(new_len);
state.last_progress_at = Instant::now();
if state.chunks.len() == state.total_chunks as usize {
#[expect(
clippy::unwrap_used,
reason = "pending.entry(key).or_insert_with above guarantees the key is present"
)]
let state = self.pending.remove(&key).unwrap();
let mut full = Vec::with_capacity(state.chunks.values().map(|c| c.len()).sum());
for (_idx, chunk) in state.chunks {
full.extend_from_slice(&chunk);
}
Ok(Some(full))
} else {
Ok(None)
}
}
pub fn cancel(&mut self, daemon_origin: u64) {
self.pending
.retain(|&(origin, _), _| origin != daemon_origin);
}
pub fn sweep_stale(&mut self, max_age: Duration) -> usize {
let before = self.pending.len();
let now = Instant::now();
self.pending.retain(|_, state| {
now.checked_duration_since(state.last_progress_at)
.is_none_or(|age| age < max_age)
});
before - self.pending.len()
}
pub fn pending_count(&self) -> usize {
self.pending.len()
}
}
impl Default for SnapshotReassembler {
fn default() -> Self {
Self::new()
}
}
struct MigrationRecord {
state: MigrationState,
superposition: SuperpositionState,
started_at: Instant,
}
#[derive(Clone, Debug)]
pub struct MigrationListItem {
pub daemon_origin: u64,
pub source_node: u64,
pub target_node: u64,
pub phase: MigrationPhase,
pub elapsed_ms: u64,
pub age_in_phase_ms: u64,
pub snapshot_bytes: Option<u64>,
pub retries: u32,
}
pub struct MigrationOrchestrator {
migrations: DashMap<u64, Mutex<MigrationRecord>>,
daemon_registry: Arc<DaemonRegistry>,
local_node_id: u64,
source_handler: Option<Arc<MigrationSourceHandler>>,
}
impl MigrationOrchestrator {
pub fn new(daemon_registry: Arc<DaemonRegistry>, local_node_id: u64) -> Self {
Self {
migrations: DashMap::new(),
daemon_registry,
local_node_id,
source_handler: None,
}
}
pub fn with_source_handler(mut self, source_handler: Arc<MigrationSourceHandler>) -> Self {
self.source_handler = Some(source_handler);
self
}
pub fn daemon_registry(&self) -> &Arc<DaemonRegistry> {
&self.daemon_registry
}
pub fn start_migration(
&self,
daemon_origin: u64,
source_node: u64,
target_node: u64,
) -> Result<Vec<MigrationMessage>, MigrationError> {
let entry = match self.migrations.entry(daemon_origin) {
dashmap::mapref::entry::Entry::Occupied(_) => {
return Err(MigrationError::AlreadyMigrating(daemon_origin));
}
dashmap::mapref::entry::Entry::Vacant(entry) => entry,
};
let mut state = MigrationState::new(daemon_origin, source_node, target_node);
if source_node == self.local_node_id {
let snapshot = match &self.source_handler {
Some(handler) => {
handler.start_snapshot(daemon_origin, target_node, self.local_node_id)?
}
None => {
tracing::warn!(
daemon_origin = format_args!("{:#x}", daemon_origin),
"MigrationOrchestrator::start_migration on local source without \
a source_handler installed — events arriving between snapshot \
capture and cutover may be silently lost. \
Production callers wire the handler via \
`MigrationDispatcher::new`. Direct orchestrator construction \
should call `MigrationOrchestrator::with_source_handler`."
);
self.daemon_registry
.snapshot(daemon_origin)
.map_err(|e| MigrationError::StateFailed(e.to_string()))?
.ok_or_else(|| {
MigrationError::StateFailed(
"daemon is stateless or snapshot failed".into(),
)
})?
}
};
let snapshot_bytes = snapshot
.try_to_bytes()
.map_err(|e| MigrationError::StateFailed(e.to_string()))?;
let seq_through = snapshot.through_seq;
state.set_snapshot(snapshot)?;
let source_head = state
.snapshot()
.map(|s| s.chain_link)
.unwrap_or_else(|| CausalLink::genesis(daemon_origin, 0));
let superposition = SuperpositionState::new(daemon_origin, source_head);
entry.insert(Mutex::new(MigrationRecord {
state,
superposition,
started_at: Instant::now(),
}));
chunk_snapshot(daemon_origin, snapshot_bytes, seq_through)
} else {
let source_head = CausalLink::genesis(daemon_origin, 0);
let superposition = SuperpositionState::new(daemon_origin, source_head);
entry.insert(Mutex::new(MigrationRecord {
state,
superposition,
started_at: Instant::now(),
}));
Ok(vec![MigrationMessage::TakeSnapshot {
daemon_origin,
target_node,
}])
}
}
pub fn start_migration_auto(
&self,
daemon_origin: u64,
source_node: u64,
scheduler: &super::Scheduler,
daemon_filter: &crate::adapter::net::behavior::capability::CapabilityFilter,
) -> Result<(u64, Vec<MigrationMessage>), MigrationError> {
let placement = scheduler
.place_migration_v2(daemon_filter, source_node)
.map_err(|_| MigrationError::NoTargetAvailable)?;
let target_node = placement.node_id;
let msgs = self.start_migration(daemon_origin, source_node, target_node)?;
Ok((target_node, msgs))
}
pub fn on_snapshot_ready(
&self,
daemon_origin: u64,
snapshot_bytes: Vec<u8>,
seq_through: u64,
chunk_index: u32,
total_chunks: u32,
) -> Result<MigrationMessage, MigrationError> {
let entry = self
.migrations
.get(&daemon_origin)
.ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
let mut record = entry.lock();
validate_chunk_header(chunk_index, &snapshot_bytes, daemon_origin)?;
if chunk_index == 0 && total_chunks == 1 {
let snapshot = StateSnapshot::from_bytes(&snapshot_bytes).ok_or_else(|| {
MigrationError::StateFailed("failed to parse snapshot bytes".into())
})?;
if snapshot.through_seq != seq_through {
return Err(MigrationError::StateFailed(format!(
"SnapshotReady: wire seq_through {} disagrees with snapshot.through_seq {}",
seq_through, snapshot.through_seq,
)));
}
if record.state.phase() == MigrationPhase::Snapshot {
record.state.set_snapshot(snapshot)?;
}
} else if chunk_index == 0 {
if record.state.phase() == MigrationPhase::Snapshot {
record.state.force_phase(MigrationPhase::Transfer);
}
}
if chunk_index == 0 {
record.superposition.advance(MigrationPhase::Transfer);
}
Ok(MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
})
}
pub fn on_restore_complete(
&self,
daemon_origin: u64,
_restored_seq: u64,
) -> Result<Option<MigrationMessage>, MigrationError> {
let entry = self
.migrations
.get(&daemon_origin)
.ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
let mut record = entry.lock();
if record.state.phase() == MigrationPhase::Transfer {
record.state.transfer_complete()?;
}
if record.state.phase() == MigrationPhase::Restore {
record.state.restore_complete()?;
}
record.superposition.advance(MigrationPhase::Replay);
Ok(None)
}
pub fn on_replay_complete(
&self,
daemon_origin: u64,
replayed_seq: u64,
target_head: CausalLink,
) -> Result<MigrationMessage, MigrationError> {
let entry = self
.migrations
.get(&daemon_origin)
.ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
let mut record = entry.lock();
if target_head.sequence != replayed_seq {
tracing::warn!(
daemon_origin = format_args!("{:#x}", daemon_origin),
head_seq = target_head.sequence,
replayed_seq,
"on_replay_complete: target_head.sequence disagrees with replayed_seq; \
using target_head as-is but the continuity proof anchor may not match \
the post-replay chain head"
);
}
record.superposition.target_replayed(target_head);
if record.state.phase() == MigrationPhase::Replay {
record.state.replay_complete()?;
}
record.superposition.advance(MigrationPhase::Cutover);
record.superposition.collapse();
let target_node = record.state.target_node();
Ok(MigrationMessage::CutoverNotify {
daemon_origin,
target_node,
})
}
pub fn on_cutover_acknowledged(&self, daemon_origin: u64) -> Result<(), MigrationError> {
let entry = self
.migrations
.get(&daemon_origin)
.ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
let mut record = entry.lock();
if record.state.phase() == MigrationPhase::Cutover {
record.state.cutover_complete()?;
}
record.superposition.advance(MigrationPhase::Complete);
record.superposition.resolve();
Ok(())
}
pub fn on_cleanup_complete(
&self,
daemon_origin: u64,
) -> Result<MigrationMessage, MigrationError> {
let entry = self
.migrations
.get(&daemon_origin)
.ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
let mut record = entry.lock();
if record.state.phase() == MigrationPhase::Cutover {
record.state.cutover_complete()?;
}
record.superposition.advance(MigrationPhase::Complete);
record.superposition.resolve();
Ok(MigrationMessage::ActivateTarget { daemon_origin })
}
pub fn on_activate_ack(
&self,
daemon_origin: u64,
_replayed_seq: u64,
) -> Result<(), MigrationError> {
self.migrations
.remove(&daemon_origin)
.ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
Ok(())
}
pub fn abort_migration(
&self,
daemon_origin: u64,
reason: String,
) -> Result<MigrationMessage, MigrationError> {
self.abort_migration_with_reason(daemon_origin, MigrationFailureReason::StateFailed(reason))
}
pub fn abort_migration_with_reason(
&self,
daemon_origin: u64,
reason: MigrationFailureReason,
) -> Result<MigrationMessage, MigrationError> {
self.migrations
.remove(&daemon_origin)
.ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
if let Some(source) = &self.source_handler {
let _ = source.abort(daemon_origin);
}
Ok(MigrationMessage::MigrationFailed {
daemon_origin,
reason,
})
}
pub fn is_migrating(&self, daemon_origin: u64) -> bool {
self.migrations.contains_key(&daemon_origin)
}
pub fn status(&self, daemon_origin: u64) -> Option<MigrationPhase> {
self.migrations
.get(&daemon_origin)
.map(|entry| entry.lock().state.phase())
}
pub fn source_node(&self, daemon_origin: u64) -> Option<u64> {
self.migrations
.get(&daemon_origin)
.map(|entry| entry.lock().state.source_node())
}
pub fn target_node(&self, daemon_origin: u64) -> Option<u64> {
self.migrations
.get(&daemon_origin)
.map(|entry| entry.lock().state.target_node())
}
pub fn superposition_phase(
&self,
daemon_origin: u64,
) -> Option<crate::adapter::net::continuity::superposition::SuperpositionPhase> {
self.migrations
.get(&daemon_origin)
.map(|entry| entry.lock().superposition.phase())
}
pub fn list_migrations(&self) -> Vec<MigrationListItem> {
self.migrations
.iter()
.map(|entry| {
let record = entry.lock();
let elapsed =
u64::try_from(record.started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
MigrationListItem {
daemon_origin: *entry.key(),
source_node: record.state.source_node(),
target_node: record.state.target_node(),
phase: record.state.phase(),
elapsed_ms: elapsed,
age_in_phase_ms: record.state.age_in_phase_ms(),
snapshot_bytes: record.state.snapshot_size_bytes(),
retries: record.state.retry_count(),
}
})
.collect()
}
pub fn active_count(&self) -> usize {
self.migrations.len()
}
}
impl std::fmt::Debug for MigrationOrchestrator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MigrationOrchestrator")
.field("active_migrations", &self.migrations.len())
.field("local_node_id", &format!("{:#x}", self.local_node_id))
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::capability::CapabilityFilter;
use crate::adapter::net::compute::{
DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
};
use crate::adapter::net::identity::EntityKeypair;
use bytes::{BufMut, Bytes};
fn synth_chunk_header(magic: &[u8; 4], version: u8, entity_bytes: &[u8; 32]) -> Vec<u8> {
let mut buf = Vec::with_capacity(64);
buf.extend_from_slice(magic);
buf.push(version);
buf.extend_from_slice(entity_bytes);
buf.extend_from_slice(&[0u8; 27]);
buf
}
#[test]
fn validate_chunk_header_accepts_well_formed_chunk_0() {
let kp = EntityKeypair::generate();
let bytes = synth_chunk_header(
SNAPSHOT_WIRE_MAGIC,
SNAPSHOT_VERSION,
kp.entity_id().as_bytes(),
);
let res = validate_chunk_header(0, &bytes, kp.origin_hash());
assert!(res.is_ok(), "well-formed chunk 0 must validate: {res:?}");
}
#[test]
fn validate_chunk_header_rejects_wrong_magic() {
let kp = EntityKeypair::generate();
let bytes = synth_chunk_header(b"XXXX", SNAPSHOT_VERSION, kp.entity_id().as_bytes());
let err = validate_chunk_header(0, &bytes, kp.origin_hash()).unwrap_err();
assert!(matches!(err, MigrationError::StateFailed(_)));
}
#[test]
fn validate_chunk_header_rejects_wrong_version() {
let kp = EntityKeypair::generate();
let bytes = synth_chunk_header(SNAPSHOT_WIRE_MAGIC, 0xFE, kp.entity_id().as_bytes());
let err = validate_chunk_header(0, &bytes, kp.origin_hash()).unwrap_err();
assert!(matches!(err, MigrationError::StateFailed(_)));
}
#[test]
fn validate_chunk_header_rejects_wrong_origin_claim() {
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let bytes = synth_chunk_header(
SNAPSHOT_WIRE_MAGIC,
SNAPSHOT_VERSION,
kp_a.entity_id().as_bytes(),
);
let err = validate_chunk_header(0, &bytes, kp_b.origin_hash()).unwrap_err();
assert!(matches!(err, MigrationError::StateFailed(_)));
}
#[test]
fn validate_chunk_header_rejects_too_short_chunk_0() {
let bytes = [0u8; 12]; let err = validate_chunk_header(0, &bytes, 0xDEAD_BEEF).unwrap_err();
assert!(matches!(err, MigrationError::StateFailed(_)));
}
#[test]
fn validate_chunk_header_passes_non_chunk_0_unchecked() {
let bytes = [0u8; 8]; for idx in [1u32, 2, 47, u32::MAX] {
let res = validate_chunk_header(idx, &bytes, 0xDEAD_BEEF);
assert!(res.is_ok(), "chunk_index {idx} must pass through");
}
}
struct CounterDaemon {
count: u64,
}
impl CounterDaemon {
fn new() -> Self {
Self { count: 0 }
}
}
impl MeshDaemon for CounterDaemon {
fn name(&self) -> &str {
"counter"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
self.count += 1;
Ok(vec![Bytes::from(self.count.to_le_bytes().to_vec())])
}
fn snapshot(&self) -> Option<Bytes> {
Some(Bytes::from(self.count.to_le_bytes().to_vec()))
}
fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
if state.len() != 8 {
return Err(DaemonError::RestoreFailed("bad state size".into()));
}
self.count = u64::from_le_bytes(state[..8].try_into().unwrap());
Ok(())
}
}
fn setup_registry() -> (Arc<DaemonRegistry>, u64) {
let reg = Arc::new(DaemonRegistry::new());
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let host = DaemonHost::new(
Box::new(CounterDaemon::new()),
kp,
DaemonHostConfig::default(),
);
reg.register(host).unwrap();
(reg, origin)
}
#[test]
fn cr32_unwired_source_handler_must_emit_loud_warn() {
let src = include_str!("orchestrator.rs");
let anchor = format!("{}{}{}", "Surface ", "the unwired-", "source-handler");
let anchor_idx = src.find(&anchor).expect(
"regression: the production unwired-source-handler marker in \
start_migration's None arm is gone — either the fix was reverted or \
the comment was rewritten. If the fix is intentionally being changed, \
update this test.",
);
let occurrences = src.matches(&anchor).count();
assert_eq!(
occurrences, 1,
"anchor must occur exactly once in orchestrator.rs (production \
site). Got {occurrences} occurrences — the test source likely contains \
a verbatim copy of the anchor, defeating the tripwire."
);
let block: String = src[anchor_idx..]
.lines()
.take(20)
.collect::<Vec<_>>()
.join("\n");
assert!(
block.contains("tracing::warn!"),
"regression: unwired-source-handler path must emit \
tracing::warn!. Block:\n{}",
block
);
assert!(
block.contains("source_handler"),
"regression: warn message must reference source_handler"
);
assert!(
block.contains("MigrationDispatcher::new") || block.contains("with_source_handler"),
"regression: warn message must point operators at how to wire \
the handler (`MigrationDispatcher::new` or \
`MigrationOrchestrator::with_source_handler`) so the log line is \
actionable. Block:\n{}",
block
);
}
#[test]
fn test_start_migration_local_source() {
let (reg, origin) = setup_registry();
let orch = MigrationOrchestrator::new(reg, 0x1111);
let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert!(!msgs.is_empty(), "must emit at least one chunk");
match &msgs[0] {
MigrationMessage::SnapshotReady { daemon_origin, .. } => {
assert_eq!(*daemon_origin, origin);
}
other => panic!("expected SnapshotReady, got {:?}", other),
}
assert!(orch.is_migrating(origin));
assert_eq!(orch.status(origin), Some(MigrationPhase::Transfer));
}
#[test]
fn local_source_migration_registers_in_source_handler() {
let (reg, origin) = setup_registry();
let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
let orch =
MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
assert!(!source_handler.is_migrating(origin));
assert!(!orch.is_migrating(origin));
let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert!(
source_handler.is_migrating(origin),
"regression: source_handler must have a record \
of the local-source migration after `start_migration` \
returns",
);
assert!(orch.is_migrating(origin));
}
#[test]
fn local_source_cutover_drains_buffered_events_through_source_handler() {
use crate::adapter::net::state::causal::CausalEvent;
use bytes::Bytes;
let (reg, origin) = setup_registry();
let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
let orch =
MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
for seq in 1..=2u64 {
let event = CausalEvent {
link: CausalLink {
origin_hash: origin,
horizon_encoded: 0,
sequence: seq,
parent_hash: 0,
},
payload: Bytes::from_static(b"buffered"),
received_at: 0,
};
assert!(source_handler.buffer_event(origin, event).unwrap());
}
let drained = source_handler
.on_cutover(origin)
.expect("post-fix on_cutover must find the local-source migration record");
assert_eq!(
drained.len(),
2,
"cutover must drain the buffered events for forwarding to target — \
pre-fix this returned `DaemonNotFound` for local-source migrations \
and the buffered events were silently lost",
);
}
#[test]
fn local_source_migration_enables_source_handler_buffering() {
use crate::adapter::net::state::causal::CausalEvent;
use bytes::Bytes;
let (reg, origin) = setup_registry();
let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
let orch =
MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
let event = CausalEvent {
link: CausalLink {
origin_hash: origin,
horizon_encoded: 0,
sequence: 1,
parent_hash: 0,
},
payload: Bytes::from_static(b"post-snapshot event"),
received_at: 0,
};
let buffered = source_handler.buffer_event(origin, event).unwrap();
assert!(
buffered,
"fix must enable source-handler buffering for \
local-source migrations — pre-fix `buffer_event` returned \
`Ok(false)` because the migration was never registered",
);
let drained = source_handler.take_buffered_events(origin).unwrap();
assert_eq!(
drained.len(),
1,
"buffered event must be drainable through the source handler",
);
}
#[test]
fn test_start_migration_remote_source() {
let (reg, origin) = setup_registry();
let orch = MigrationOrchestrator::new(reg, 0x3333);
let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert_eq!(
msgs.len(),
1,
"remote-source path emits exactly one TakeSnapshot"
);
match &msgs[0] {
MigrationMessage::TakeSnapshot {
daemon_origin,
target_node,
} => {
assert_eq!(*daemon_origin, origin);
assert_eq!(*target_node, 0x2222);
}
other => panic!("expected TakeSnapshot, got {:?}", other),
}
assert_eq!(orch.status(origin), Some(MigrationPhase::Snapshot));
}
#[test]
fn test_duplicate_migration_rejected() {
let (reg, origin) = setup_registry();
let orch = MigrationOrchestrator::new(reg, 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
let err = orch.start_migration(origin, 0x1111, 0x3333).unwrap_err();
assert_eq!(err, MigrationError::AlreadyMigrating(origin));
}
#[test]
fn start_migration_auto_returns_no_target_available_when_scheduler_finds_nothing() {
use crate::adapter::net::behavior::capability::{CapabilityIndex, CapabilitySet};
let (reg, origin) = setup_registry();
let orch = MigrationOrchestrator::new(reg, 0x1111);
let index = Arc::new(CapabilityIndex::new());
let scheduler = super::super::Scheduler::new(index, 0x1111, CapabilitySet::default());
let filter = CapabilityFilter::default();
let err = orch
.start_migration_auto(origin, 0x1111, &scheduler, &filter)
.unwrap_err();
assert_eq!(
err,
MigrationError::NoTargetAvailable,
"auto-placement with no candidates must surface as \
NoTargetAvailable, not TargetUnavailable(0). The 0 \
was a fake node id that pre-fix appeared in operator \
error logs as `target node 0x0 unavailable`."
);
}
#[test]
fn test_abort_migration() {
let (reg, origin) = setup_registry();
let orch = MigrationOrchestrator::new(reg, 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert!(orch.is_migrating(origin));
let msg = orch.abort_migration(origin, "test abort".into()).unwrap();
match msg {
MigrationMessage::MigrationFailed { reason, .. } => {
match reason {
MigrationFailureReason::StateFailed(msg) => {
assert_eq!(msg, "test abort")
}
other => panic!("expected StateFailed, got {other:?}"),
}
}
_ => panic!("expected MigrationFailed"),
}
assert!(!orch.is_migrating(origin));
}
#[test]
fn abort_migration_propagates_to_source_handler() {
use crate::adapter::net::compute::migration_source::MigrationSourceHandler;
let (reg, origin) = setup_registry();
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let orch = MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source.clone());
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert!(
orch.is_migrating(origin),
"orchestrator records the migration"
);
assert!(
source.is_migrating(origin),
"source handler also records the migration via the orchestrator",
);
orch.abort_migration(origin, "test abort".into()).unwrap();
assert!(
!orch.is_migrating(origin),
"orchestrator must clear its record"
);
assert!(
!source.is_migrating(origin),
"source handler must also clear its mirror entry on abort",
);
orch.start_migration(origin, 0x1111, 0x3333).unwrap();
}
#[test]
fn on_restore_complete_ships_no_buffered_events() {
let (reg, origin) = setup_registry();
let orch = MigrationOrchestrator::new(reg, 0x3333);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
{
let entry = orch.migrations.get(&origin).unwrap();
let mut record = entry.lock();
record.state.force_phase(MigrationPhase::Restore);
}
let result = orch.on_restore_complete(origin, 0).unwrap();
assert!(
result.is_none(),
"on_restore_complete must never emit BufferedEvents — \
that buffer is exclusively a source-handler surface"
);
}
#[test]
fn test_wire_roundtrip_take_snapshot() {
let msg = MigrationMessage::TakeSnapshot {
daemon_origin: 0xAAAA,
target_node: 0x2222,
};
let encoded = wire::encode(&msg).unwrap();
let decoded = wire::decode(&encoded).unwrap();
match decoded {
MigrationMessage::TakeSnapshot {
daemon_origin,
target_node,
} => {
assert_eq!(daemon_origin, 0xAAAA);
assert_eq!(target_node, 0x2222);
}
_ => panic!("expected TakeSnapshot"),
}
}
#[test]
fn test_wire_roundtrip_snapshot_ready() {
let msg = MigrationMessage::SnapshotReady {
daemon_origin: 0xBBBB,
snapshot_bytes: vec![1, 2, 3, 4, 5],
seq_through: 42,
chunk_index: 0,
total_chunks: 1,
};
let encoded = wire::encode(&msg).unwrap();
let decoded = wire::decode(&encoded).unwrap();
match decoded {
MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} => {
assert_eq!(daemon_origin, 0xBBBB);
assert_eq!(snapshot_bytes, vec![1, 2, 3, 4, 5]);
assert_eq!(seq_through, 42);
assert_eq!(chunk_index, 0);
assert_eq!(total_chunks, 1);
}
_ => panic!("expected SnapshotReady"),
}
}
#[test]
fn test_chunk_snapshot_small() {
let chunks = chunk_snapshot(0xAAAA, vec![1, 2, 3], 10).unwrap();
assert_eq!(chunks.len(), 1);
match &chunks[0] {
MigrationMessage::SnapshotReady {
chunk_index,
total_chunks,
snapshot_bytes,
..
} => {
assert_eq!(*chunk_index, 0);
assert_eq!(*total_chunks, 1);
assert_eq!(snapshot_bytes, &[1, 2, 3]);
}
_ => panic!("expected SnapshotReady"),
}
}
#[test]
fn test_chunk_snapshot_large() {
let big = vec![0xABu8; MAX_SNAPSHOT_CHUNK_SIZE * 3 + 100];
let total_len = big.len();
let chunks = chunk_snapshot(0xBBBB, big, 42).unwrap();
assert_eq!(chunks.len(), 4);
for (i, chunk) in chunks.iter().enumerate() {
match chunk {
MigrationMessage::SnapshotReady {
chunk_index,
total_chunks,
daemon_origin,
seq_through,
..
} => {
assert_eq!(*chunk_index, i as u32);
assert_eq!(*total_chunks, 4);
assert_eq!(*daemon_origin, 0xBBBB);
assert_eq!(*seq_through, 42);
}
_ => panic!("expected SnapshotReady"),
}
}
let mut reassembler = SnapshotReassembler::new();
for chunk in chunks {
if let MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} = chunk
{
let result = reassembler
.feed(
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
)
.expect("legitimate chunks must not be rejected");
if chunk_index < total_chunks - 1 {
assert!(result.is_none());
} else {
let full = result.expect("last chunk should complete reassembly");
assert_eq!(full.len(), total_len);
assert!(full.iter().all(|&b| b == 0xAB));
}
}
}
}
#[test]
fn test_reassembler_cancel() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0xAAAA, vec![1, 2], 10, 0, 3).unwrap();
assert_eq!(reassembler.pending_count(), 1);
reassembler.cancel(0xAAAA);
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn test_regression_reassembler_rejects_chunk_index_out_of_range() {
let mut reassembler = SnapshotReassembler::new();
let r0 = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3);
assert!(r0.is_ok(), "in-range chunk must be accepted: {:?}", r0);
let forged = reassembler.feed(0xAAAA, vec![2; 10], 1, 5, 3);
assert!(
matches!(
forged,
Err(ReassemblyError::ChunkIndexOutOfRange {
chunk_index: 5,
total_chunks: 3,
})
),
"chunk_index=5 with total_chunks=3 must be rejected, got {:?}",
forged
);
assert_eq!(
reassembler.pending_count(),
1,
"state must stay in-flight after rejected chunk"
);
}
#[test]
fn test_regression_reassembler_rejects_zero_total_chunks() {
let mut reassembler = SnapshotReassembler::new();
let result = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 0);
assert!(matches!(result, Err(ReassemblyError::ZeroTotalChunks)));
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn test_regression_reassembler_caps_total_chunks() {
let mut reassembler = SnapshotReassembler::new();
let result = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, u32::MAX);
assert!(matches!(
result,
Err(ReassemblyError::TotalChunksTooLarge {
total_chunks: u32::MAX
})
));
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn test_regression_reassembler_rejects_oversized_chunk() {
let mut reassembler = SnapshotReassembler::new();
let oversized = vec![0u8; MAX_SNAPSHOT_CHUNK_SIZE + 1];
let result = reassembler.feed(0xAAAA, oversized, 1, 0, 3);
assert!(
matches!(result, Err(ReassemblyError::ChunkTooLarge { .. })),
"got {:?}",
result
);
}
#[test]
fn test_regression_reassembler_rejects_total_chunks_mismatch() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
let result = reassembler.feed(0xAAAA, vec![2; 10], 1, 1, 100);
assert!(
matches!(
result,
Err(ReassemblyError::TotalChunksMismatch {
got: 100,
expected: 3,
})
),
"got {:?}",
result
);
assert_eq!(reassembler.pending_count(), 1);
}
#[test]
fn reassembler_refuses_zero_byte_chunk() {
let mut reassembler = SnapshotReassembler::new();
let result = reassembler.feed(0xAAAA, vec![], 1, 0, 3);
assert!(
matches!(result, Err(ReassemblyError::ChunkTooLarge { len: 0 })),
"got {:?}",
result
);
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn fast_path_rejects_single_chunk_after_multi_chunk_state() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0xAAAA, vec![1; 10], 7, 0, 3).unwrap();
let result = reassembler.feed(0xAAAA, vec![2; 10], 7, 0, 1);
assert!(
matches!(
result,
Err(ReassemblyError::TotalChunksMismatch {
got: 1,
expected: 3,
})
),
"fast path must refuse substitution; got {:?}",
result
);
assert_eq!(reassembler.pending_count(), 1);
}
#[test]
fn test_regression_reassembler_evicts_older_seq_per_daemon() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0xAAAA, vec![1; 10], 10, 0, 3).unwrap();
reassembler.feed(0xAAAA, vec![1; 10], 11, 0, 3).unwrap();
reassembler.feed(0xAAAA, vec![1; 10], 12, 0, 3).unwrap();
assert_eq!(
reassembler.pending_count(),
1,
"only the newest seq_through for a daemon should remain in flight"
);
let stale = reassembler.feed(0xAAAA, vec![1; 10], 5, 0, 3);
assert!(
matches!(
stale,
Err(ReassemblyError::StaleSeqThrough { got: 5, latest: 12 })
),
"stale seq_through must be rejected, got {:?}",
stale
);
assert_eq!(reassembler.pending_count(), 1);
}
#[test]
fn reassembler_refuses_chunk_that_overflows_pending_byte_cap() {
let mut reassembler = SnapshotReassembler::new();
let chunk_full = vec![0xCCu8; MAX_SNAPSHOT_CHUNK_SIZE];
let chunks_to_fill = MAX_PENDING_REASSEMBLY_BYTES / MAX_SNAPSHOT_CHUNK_SIZE;
let total_chunks = (chunks_to_fill as u32) + 2;
for i in 0..(chunks_to_fill as u32) {
reassembler
.feed(0xAAAA, chunk_full.clone(), 1, i, total_chunks)
.unwrap();
}
let next_idx = chunks_to_fill as u32;
let result = reassembler.feed(0xAAAA, chunk_full.clone(), 1, next_idx, total_chunks);
assert!(
matches!(result, Err(ReassemblyError::TooManyPendingBytes { .. })),
"chunk that would overflow the per-entry cap must be refused, got {:?}",
result,
);
let resend = reassembler.feed(0xAAAA, chunk_full.clone(), 1, 0, total_chunks);
assert!(
resend.is_ok(),
"re-sending an already-buffered chunk index must succeed, got {:?}",
resend
);
}
#[test]
fn reassembler_sweep_stale_drops_quiet_entries() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
assert_eq!(reassembler.pending_count(), 1);
std::thread::sleep(Duration::from_millis(20));
let evicted = reassembler.sweep_stale(Duration::from_millis(10));
assert_eq!(evicted, 1, "stale entry must be evicted");
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn reassembler_sweep_keeps_progressing_entries() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
std::thread::sleep(Duration::from_millis(20));
reassembler.feed(0xAAAA, vec![1; 10], 1, 1, 3).unwrap();
let evicted = reassembler.sweep_stale(Duration::from_millis(15));
assert_eq!(
evicted, 0,
"entry that received a chunk within max_age must survive"
);
assert_eq!(reassembler.pending_count(), 1);
}
#[test]
fn reassembler_opportunistic_sweep_in_feed_drops_quiet_entries() {
let mut reassembler = SnapshotReassembler::with_max_pending_age(Duration::from_millis(10));
reassembler.feed(0xBAD, vec![0xFF; 10], 1, 0, 3).unwrap();
assert_eq!(reassembler.pending_count(), 1);
std::thread::sleep(Duration::from_millis(25));
reassembler.feed(0xC0DE, vec![1; 10], 5, 0, 3).unwrap();
assert_eq!(reassembler.pending_count(), 1);
}
#[test]
fn reassembler_sweep_stale_preserves_latest_seq() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0xAAAA, vec![1; 10], 100, 0, 3).unwrap();
std::thread::sleep(Duration::from_millis(20));
let evicted = reassembler.sweep_stale(Duration::from_millis(10));
assert_eq!(evicted, 1);
let stale = reassembler.feed(0xAAAA, vec![1; 10], 50, 0, 3);
assert!(
matches!(
stale,
Err(ReassemblyError::StaleSeqThrough {
got: 50,
latest: 100,
})
),
"post-sweep replay of an older seq_through must still be rejected, got {:?}",
stale,
);
}
#[test]
fn reassembler_sweep_releases_buffer_parked_at_byte_cap() {
let mut reassembler = SnapshotReassembler::new();
let chunk_full = vec![0xCCu8; MAX_SNAPSHOT_CHUNK_SIZE];
let chunks_to_fill = MAX_PENDING_REASSEMBLY_BYTES / MAX_SNAPSHOT_CHUNK_SIZE;
let total_chunks = (chunks_to_fill as u32) + 2;
for i in 0..(chunks_to_fill as u32) {
reassembler
.feed(0xAAAA, chunk_full.clone(), 1, i, total_chunks)
.unwrap();
}
assert_eq!(reassembler.pending_count(), 1);
std::thread::sleep(Duration::from_millis(20));
let evicted = reassembler.sweep_stale(Duration::from_millis(10));
assert_eq!(evicted, 1, "parked-at-cap entry must be released by sweep");
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn test_regression_reassembler_distinct_daemons_coexist() {
let mut reassembler = SnapshotReassembler::new();
reassembler.feed(0x1111, vec![1; 10], 1, 0, 3).unwrap();
reassembler.feed(0x2222, vec![2; 10], 7, 0, 3).unwrap();
reassembler.feed(0x3333, vec![3; 10], 9, 0, 3).unwrap();
assert_eq!(reassembler.pending_count(), 3);
}
#[test]
fn test_regression_wire_decode_rejects_zero_total_chunks() {
use bytes::BufMut;
let mut buf = Vec::new();
buf.put_u8(wire::MSG_SNAPSHOT_READY);
buf.put_u64_le(0xAAAA); buf.put_u64_le(1); buf.put_u32_le(0); buf.put_u32_le(0); buf.put_u32_le(0); let err = wire::decode(&buf).expect_err("total_chunks=0 must be rejected");
let err_msg = format!("{}", err);
assert!(
err_msg.contains("total_chunks"),
"error must mention total_chunks, got {:?}",
err_msg
);
}
#[test]
fn test_regression_wire_decode_rejects_chunk_index_out_of_range() {
use bytes::BufMut;
let mut buf = Vec::new();
buf.put_u8(wire::MSG_SNAPSHOT_READY);
buf.put_u64_le(0xAAAA);
buf.put_u64_le(1);
buf.put_u32_le(5); buf.put_u32_le(3); buf.put_u32_le(0);
let err = wire::decode(&buf).expect_err("chunk_index >= total_chunks must be rejected");
let err_msg = format!("{}", err);
assert!(
err_msg.contains("chunk_index"),
"error must mention chunk_index, got {:?}",
err_msg
);
}
#[test]
fn test_regression_wire_decode_rejects_total_chunks_overflow() {
use bytes::BufMut;
let mut buf = Vec::new();
buf.put_u8(wire::MSG_SNAPSHOT_READY);
buf.put_u64_le(0xAAAA);
buf.put_u64_le(1);
buf.put_u32_le(0);
buf.put_u32_le(u32::MAX); buf.put_u32_le(0);
let err = wire::decode(&buf).expect_err("total_chunks > MAX_TOTAL_CHUNKS must be rejected");
let err_msg = format!("{}", err);
assert!(
err_msg.contains("MAX_TOTAL_CHUNKS"),
"error must mention MAX_TOTAL_CHUNKS, got {:?}",
err_msg
);
}
#[test]
fn test_regression_reassembler_end_to_end_forged_chunk_cannot_complete() {
let mut reassembler = SnapshotReassembler::new();
let r0 = reassembler.feed(0xDEAD, vec![0xA0; 10], 1, 0, 4).unwrap();
assert!(r0.is_none());
for bad_idx in [4, 5, 7, 999] {
let r = reassembler.feed(0xDEAD, vec![0xFF; 10], 1, bad_idx, 4);
assert!(
matches!(r, Err(ReassemblyError::ChunkIndexOutOfRange { .. })),
"index {} must be rejected, got {:?}",
bad_idx,
r
);
}
assert!(reassembler
.feed(0xDEAD, vec![0xA1; 10], 1, 1, 4)
.unwrap()
.is_none());
assert!(reassembler
.feed(0xDEAD, vec![0xA2; 10], 1, 2, 4)
.unwrap()
.is_none());
let full = reassembler
.feed(0xDEAD, vec![0xA3; 10], 1, 3, 4)
.unwrap()
.expect("all four real chunks received — reassembly must complete");
assert_eq!(full.len(), 40);
assert!(full[..10].iter().all(|&b| b == 0xA0));
assert!(full[10..20].iter().all(|&b| b == 0xA1));
assert!(full[20..30].iter().all(|&b| b == 0xA2));
assert!(full[30..].iter().all(|&b| b == 0xA3));
}
#[test]
fn test_wire_roundtrip_chunked_snapshot() {
let msg = MigrationMessage::SnapshotReady {
daemon_origin: 0xCCCC,
snapshot_bytes: vec![42; 100],
seq_through: 99,
chunk_index: 2,
total_chunks: 5,
};
let encoded = wire::encode(&msg).unwrap();
let decoded = wire::decode(&encoded).unwrap();
match decoded {
MigrationMessage::SnapshotReady {
chunk_index,
total_chunks,
..
} => {
assert_eq!(chunk_index, 2);
assert_eq!(total_chunks, 5);
}
_ => panic!("expected SnapshotReady"),
}
}
#[test]
fn test_wire_roundtrip_failed() {
for reason in [
MigrationFailureReason::NotReady,
MigrationFailureReason::FactoryNotFound,
MigrationFailureReason::ComputeNotSupported,
MigrationFailureReason::StateFailed("something broke".into()),
MigrationFailureReason::AlreadyMigrating,
MigrationFailureReason::IdentityTransportFailed("seal failed".into()),
MigrationFailureReason::NotReadyTimeout { attempts: 5 },
] {
let msg = MigrationMessage::MigrationFailed {
daemon_origin: 0xCCCC,
reason: reason.clone(),
};
let encoded = wire::encode(&msg).unwrap();
let decoded = wire::decode(&encoded).unwrap();
match decoded {
MigrationMessage::MigrationFailed {
daemon_origin,
reason: r,
} => {
assert_eq!(daemon_origin, 0xCCCC);
assert_eq!(r, reason);
}
_ => panic!("expected MigrationFailed"),
}
}
}
#[test]
fn test_wire_roundtrip_buffered_events() {
let events = vec![
CausalEvent {
link: CausalLink::genesis(0xAAAA, 0),
payload: Bytes::from_static(b"event1"),
received_at: 100,
},
CausalEvent {
link: CausalLink {
origin_hash: 0xAAAA,
horizon_encoded: 1,
sequence: 1,
parent_hash: 12345,
},
payload: Bytes::from_static(b"event2"),
received_at: 200,
},
];
let msg = MigrationMessage::BufferedEvents {
daemon_origin: 0xAAAA,
events,
};
let encoded = wire::encode(&msg).unwrap();
let decoded = wire::decode(&encoded).unwrap();
match decoded {
MigrationMessage::BufferedEvents {
daemon_origin,
events,
} => {
assert_eq!(daemon_origin, 0xAAAA);
assert_eq!(events.len(), 2);
assert_eq!(events[0].payload, Bytes::from_static(b"event1"));
assert_eq!(events[0].received_at, 100);
assert_eq!(events[1].link.sequence, 1);
assert_eq!(events[1].link.parent_hash, 12345);
assert_eq!(events[1].payload, Bytes::from_static(b"event2"));
assert_eq!(events[1].received_at, 200);
}
_ => panic!("expected BufferedEvents"),
}
}
#[test]
fn test_wire_encode_rejects_oversized_failure_reason() {
let oversized = "x".repeat(u16::MAX as usize + 1);
let msg = MigrationMessage::MigrationFailed {
daemon_origin: 0xDEAD,
reason: MigrationFailureReason::StateFailed(oversized),
};
let result = wire::encode(&msg);
assert!(
matches!(result, Err(MigrationError::StateFailed(_))),
"encode of oversized reason must error, got {:?}",
result
);
}
#[test]
fn test_wire_rejects_unknown_failure_code() {
let mut buf = Vec::new();
buf.put_u8(wire::MSG_FAILED);
buf.put_u64_le(0xBEEF);
buf.put_u16_le(0xFFFF); let err = wire::decode(&buf).expect_err("unknown code must reject");
match err {
MigrationError::StateFailed(msg) => {
assert!(msg.contains("unknown MigrationFailureReason code"));
}
other => panic!("expected StateFailed, got {other:?}"),
}
}
#[test]
fn test_list_migrations() {
let (reg, origin) = setup_registry();
let orch = MigrationOrchestrator::new(reg, 0x1111);
assert!(orch.list_migrations().is_empty());
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
let list = orch.list_migrations();
assert_eq!(list.len(), 1);
assert_eq!(list[0].daemon_origin, origin);
assert_eq!(list[0].source_node, 0x1111);
assert_eq!(list[0].target_node, 0x2222);
assert_eq!(list[0].retries, 0);
}
}