#![cfg(feature = "net")]
use std::sync::Arc;
use bytes::Bytes;
use net::adapter::net::behavior::capability::{
CapabilityAnnouncement, CapabilityFilter, CapabilitySet,
};
use net::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
use net::adapter::net::behavior::loadbalance::{RequestContext, Strategy};
use net::adapter::net::compute::migration_target::RestoreContext;
use net::adapter::net::compute::{
chunk_snapshot, DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, ForkGroup,
ForkGroupConfig, GroupCoordinator, GroupError, GroupHealth, MemberInfo, MemberRole, MeshDaemon,
MigrationMessage, MigrationOrchestrator, MigrationPhase, MigrationSourceHandler,
MigrationTargetHandler, ReplicaGroup, ReplicaGroupConfig, Scheduler, SnapshotReassembler,
StandbyGroup, StandbyGroupConfig, MAX_SNAPSHOT_CHUNK_SIZE,
};
use net::adapter::net::continuity::discontinuity::fork_sentinel;
use net::adapter::net::identity::{EntityId, EntityKeypair};
fn test_entity_id() -> EntityId {
EntityId::from_bytes([0u8; 32])
}
use net::adapter::net::state::causal::{CausalEvent, CausalLink};
use net::adapter::net::state::snapshot::StateSnapshot;
use net::adapter::net::subprotocol::SubprotocolRegistry;
struct CounterDaemon {
count: u64,
}
impl CounterDaemon {
fn new() -> Self {
Self { count: 0 }
}
fn with_count(count: u64) -> Self {
Self { count }
}
}
impl MeshDaemon for CounterDaemon {
fn name(&self) -> &str {
"counter"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
self.count += 1;
Ok(vec![Bytes::from(self.count.to_le_bytes().to_vec())])
}
fn snapshot(&self) -> Option<Bytes> {
Some(Bytes::from(self.count.to_le_bytes().to_vec()))
}
fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
if state.len() != 8 {
return Err(DaemonError::RestoreFailed("bad state size".into()));
}
self.count = u64::from_le_bytes(state[..8].try_into().unwrap());
Ok(())
}
}
fn make_event(origin: u64, seq: u64) -> CausalEvent {
CausalEvent {
link: CausalLink {
origin_hash: origin,
horizon_encoded: 0,
sequence: seq,
parent_hash: 0,
},
payload: Bytes::from(format!("event-{}", seq)),
received_at: seq * 1000,
}
}
fn make_link(origin: u64, seq: u64) -> CausalLink {
CausalLink {
origin_hash: origin,
horizon_encoded: 0,
sequence: seq,
parent_hash: 0,
}
}
fn register_counter_daemon(registry: &DaemonRegistry, initial_count: u64) -> (EntityKeypair, u64) {
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let host = DaemonHost::new(
Box::new(CounterDaemon::with_count(initial_count)),
kp.clone(),
DaemonHostConfig::default(),
);
registry.register(host).unwrap();
(kp, origin)
}
#[test]
fn test_orchestrator_full_phase_chain() {
let reg = Arc::new(DaemonRegistry::new());
let (_kp, origin) = register_counter_daemon(®, 42);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Transfer));
let _snapshot_bytes = match &msgs[0] {
MigrationMessage::SnapshotReady { snapshot_bytes, .. } => snapshot_bytes.clone(),
other => panic!("expected SnapshotReady, got {:?}", other),
};
let buffered = orch.on_restore_complete(origin, 42).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Replay));
assert!(buffered.is_none());
let cutover_msg = orch
.on_replay_complete(origin, 42, make_link(origin, 42))
.unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Cutover));
match cutover_msg {
MigrationMessage::CutoverNotify { target_node, .. } => {
assert_eq!(target_node, 0x2222);
}
_ => panic!("expected CutoverNotify"),
}
orch.on_cutover_acknowledged(origin).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Complete));
let activate = orch.on_cleanup_complete(origin).unwrap();
match activate {
MigrationMessage::ActivateTarget { daemon_origin } => {
assert_eq!(daemon_origin, origin);
}
_ => panic!("expected ActivateTarget"),
}
orch.on_activate_ack(origin, 42).unwrap();
assert!(!orch.is_migrating(origin));
assert_eq!(orch.active_count(), 0);
}
#[test]
fn test_orchestrator_phase_chain() {
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 10);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
let result = orch.on_restore_complete(origin, 10).unwrap();
assert!(
result.is_none(),
"on_restore_complete must not emit BufferedEvents from the orchestrator side"
);
orch.on_replay_complete(origin, 15, make_link(origin, 15))
.unwrap();
orch.on_cutover_acknowledged(origin).unwrap();
orch.on_cleanup_complete(origin).unwrap();
orch.on_activate_ack(origin, 15).unwrap();
assert!(!orch.is_migrating(origin));
}
#[test]
fn test_end_to_end_migration_local_source() {
let source_reg = Arc::new(DaemonRegistry::new());
let target_reg = Arc::new(DaemonRegistry::new());
let (kp, origin) = register_counter_daemon(&source_reg, 100);
for seq in 1..=5 {
source_reg
.deliver(origin, &make_event(0xFFFF, seq))
.unwrap();
}
let source_handler = MigrationSourceHandler::new(source_reg.clone());
let target_handler = MigrationTargetHandler::new(target_reg.clone());
let orch = MigrationOrchestrator::new(source_reg.clone(), 0x1111);
let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
let mut snapshot_bytes: Vec<u8> = Vec::new();
let mut seq_through: u64 = 0;
for m in &msgs {
match m {
MigrationMessage::SnapshotReady {
snapshot_bytes: chunk,
seq_through: sq,
..
} => {
snapshot_bytes.extend_from_slice(chunk);
seq_through = *sq;
}
other => panic!("expected SnapshotReady, got {:?}", other),
}
}
let snapshot = StateSnapshot::from_bytes(&snapshot_bytes).unwrap();
target_handler
.restore_snapshot(
RestoreContext {
daemon_origin: origin,
snapshot: &snapshot,
source_node: 0x1111,
orchestrator_node: 0x1111,
},
kp.clone(),
|| Box::new(CounterDaemon::new()),
DaemonHostConfig::default(),
)
.unwrap();
assert!(target_reg.contains(origin));
source_handler
.start_snapshot(origin, 0x2222, 0x1111)
.unwrap();
source_handler
.buffer_event(origin, make_event(0xFFFF, 6))
.unwrap();
source_handler
.buffer_event(origin, make_event(0xFFFF, 7))
.unwrap();
let _buffered_msg = orch.on_restore_complete(origin, seq_through).unwrap();
let buffered_events = source_handler.take_buffered_events(origin).unwrap();
assert_eq!(buffered_events.len(), 2);
let replayed_through = target_handler
.replay_events(origin, buffered_events)
.unwrap();
let cutover_msg = orch
.on_replay_complete(
origin,
replayed_through,
make_link(origin, replayed_through),
)
.unwrap();
match &cutover_msg {
MigrationMessage::CutoverNotify { target_node, .. } => {
assert_eq!(*target_node, 0x2222);
}
_ => panic!("expected CutoverNotify"),
}
let final_events = source_handler.on_cutover(origin).unwrap();
assert!(final_events.is_empty());
target_handler.activate(origin).unwrap();
orch.on_cutover_acknowledged(origin).unwrap();
source_handler.cleanup(origin).unwrap();
assert!(!source_reg.contains(origin));
target_handler.complete(origin).unwrap();
orch.on_cleanup_complete(origin).unwrap();
orch.on_activate_ack(origin, 5).unwrap();
assert!(target_reg.contains(origin));
assert!(!source_reg.contains(origin));
assert!(!orch.is_migrating(origin));
}
#[test]
fn test_start_migration_auto() {
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 50);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let target_caps = CapabilitySet::new().add_tag("subprotocol:0x0500");
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(0x2222, test_entity_id(), 1, target_caps),
)
.expect("apply legacy announcement in fixture");
let local_caps = CapabilitySet::new();
let scheduler = Scheduler::new(fold, 0x1111, local_caps);
let (target_node, msgs) = orch
.start_migration_auto(origin, 0x1111, &scheduler, &CapabilityFilter::default())
.unwrap();
assert_eq!(target_node, 0x2222);
assert_eq!(orch.target_node(origin), Some(0x2222));
assert!(!msgs.is_empty(), "must emit at least one chunk");
match &msgs[0] {
MigrationMessage::SnapshotReady { daemon_origin, .. } => {
assert_eq!(*daemon_origin, origin);
}
other => panic!("expected SnapshotReady, got {:?}", other),
}
}
#[test]
fn test_start_migration_auto_no_targets() {
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 50);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let scheduler = Scheduler::new(fold, 0x1111, CapabilitySet::new());
let err = orch
.start_migration_auto(origin, 0x1111, &scheduler, &CapabilityFilter::default())
.unwrap_err();
match err {
net::adapter::net::MigrationError::NoTargetAvailable => {}
_ => panic!("expected NoTargetAvailable, got {:?}", err),
}
}
#[test]
fn test_subprotocol_handler_snapshot_ready_dispatch() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 25);
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source, target, 0x1111);
let take_msg = MigrationMessage::TakeSnapshot {
daemon_origin: origin,
target_node: 0x2222,
};
let outbound = handler
.handle_message(&wire::encode(&take_msg).unwrap(), 0x3333)
.unwrap();
assert!(!outbound.is_empty());
let reply = wire::decode(&outbound[0].payload).unwrap();
match reply {
MigrationMessage::SnapshotReady {
daemon_origin,
chunk_index,
total_chunks,
..
} => {
assert_eq!(daemon_origin, origin);
assert_eq!(chunk_index, 0);
assert_eq!(total_chunks, 1); }
_ => panic!("expected SnapshotReady"),
}
}
#[test]
fn test_subprotocol_handler_restore_complete_emits_no_buffered_events() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 10);
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x3333));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source, target, 0x3333);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
let restore_msg = MigrationMessage::RestoreComplete {
daemon_origin: origin,
restored_seq: 10,
};
let outbound = handler
.handle_message(&wire::encode(&restore_msg).unwrap(), 0x2222)
.unwrap();
let buffered_events: Vec<_> = outbound
.iter()
.filter_map(|frame| match wire::decode(&frame.payload) {
Ok(MigrationMessage::BufferedEvents { events, .. }) => Some(events),
_ => None,
})
.collect();
assert_eq!(
buffered_events.len(),
1,
"expected a single BufferedEvents reply"
);
assert!(
buffered_events[0].is_empty(),
"orchestrator-driven BufferedEvents must carry zero events — the dead \
buffer_event surface was removed; source-side cutover is the only buffer source"
);
}
#[test]
fn test_subprotocol_handler_cutover_notify_dispatch() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 5);
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source.clone(), target, 0x1111);
source.start_snapshot(origin, 0x2222, 0x1111).unwrap();
source.buffer_event(origin, make_event(0xFFFF, 1)).unwrap();
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
orch.on_restore_complete(origin, 5).unwrap();
orch.on_replay_complete(origin, 5, make_link(origin, 5))
.unwrap();
let cutover_msg = MigrationMessage::CutoverNotify {
daemon_origin: origin,
target_node: 0x2222,
};
let outbound = handler
.handle_message(&wire::encode(&cutover_msg).unwrap(), 0x3333)
.unwrap();
assert!(!outbound.is_empty());
let has_cleanup = outbound.iter().any(|o| {
matches!(
wire::decode(&o.payload),
Ok(MigrationMessage::CleanupComplete { .. })
)
});
assert!(has_cleanup, "expected CleanupComplete in outbound");
}
#[test]
fn test_subprotocol_handler_cleanup_complete_dispatch() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 1);
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source, target, 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
orch.on_restore_complete(origin, 1).unwrap();
orch.on_replay_complete(origin, 1, make_link(origin, 1))
.unwrap();
orch.on_cutover_acknowledged(origin).unwrap();
assert!(orch.is_migrating(origin));
let cleanup_msg = MigrationMessage::CleanupComplete {
daemon_origin: origin,
};
let outbound = handler
.handle_message(&wire::encode(&cleanup_msg).unwrap(), 0x1111)
.unwrap();
assert_eq!(outbound.len(), 1);
assert_eq!(outbound[0].dest_node, 0x2222, "ActivateTarget to target");
match wire::decode(&outbound[0].payload).unwrap() {
MigrationMessage::ActivateTarget { daemon_origin } => {
assert_eq!(daemon_origin, origin);
}
other => panic!("expected ActivateTarget, got {:?}", other),
}
assert!(orch.is_migrating(origin), "record kept until activate ack");
let ack = MigrationMessage::ActivateAck {
daemon_origin: origin,
replayed_seq: 1,
};
let outbound = handler
.handle_message(&wire::encode(&ack).unwrap(), 0x2222)
.unwrap();
assert!(outbound.is_empty());
assert!(!orch.is_migrating(origin));
}
#[test]
fn test_regression_dispatch_arms_reject_unrelated_from_node() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::compute::MigrationError;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 1);
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source, target, 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
orch.on_restore_complete(origin, 1).unwrap();
orch.on_replay_complete(origin, 1, make_link(origin, 1))
.unwrap();
orch.on_cutover_acknowledged(origin).unwrap();
let forged = MigrationMessage::CleanupComplete {
daemon_origin: origin,
};
match handler.handle_message(&wire::encode(&forged).unwrap(), 0x9999) {
Err(MigrationError::WrongPeer {
daemon_origin: o,
from,
expected,
}) => {
assert_eq!(o, origin);
assert_eq!(from, 0x9999);
assert_eq!(expected, 0x1111);
}
other => panic!("expected WrongPeer rejection, got {:?}", other),
}
assert!(
orch.is_migrating(origin),
"forged CleanupComplete must not advance orchestrator state"
);
let outbound = handler
.handle_message(&wire::encode(&forged).unwrap(), 0x1111)
.expect("recorded source can drive CleanupComplete");
assert_eq!(outbound.len(), 1);
let failed = MigrationMessage::MigrationFailed {
daemon_origin: origin,
reason: net::adapter::net::compute::MigrationFailureReason::StateFailed(
"synthetic".to_string(),
),
};
match handler.handle_message(&wire::encode(&failed).unwrap(), 0x9999) {
Err(MigrationError::WrongPeer { from, .. }) => {
assert_eq!(from, 0x9999);
}
other => panic!(
"expected WrongPeer rejection for forged MigrationFailed, got {:?}",
other
),
}
}
#[test]
fn snapshot_ready_rejects_unexpected_orchestrator_when_bound() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::compute::{DaemonFactoryRegistry, MigrationError};
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let target_reg = Arc::new(DaemonRegistry::new());
let factories = Arc::new(DaemonFactoryRegistry::new());
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
factories
.register(kp.clone(), DaemonHostConfig::default(), || {
Box::new(CounterDaemon::new())
})
.unwrap();
let expected_orchestrator: u64 = 0x3333;
let attacker: u64 = 0x9999;
assert!(
factories.bind_expected_orchestrator(origin, expected_orchestrator),
"bind must succeed once factory is registered",
);
let target_node: u64 = 0x2222;
let target_handler = Arc::new(
net::adapter::net::compute::MigrationTargetHandler::new_with_factories(
target_reg.clone(),
factories.clone(),
),
);
let orch = Arc::new(MigrationOrchestrator::new(target_reg.clone(), target_node));
let source = Arc::new(MigrationSourceHandler::new(target_reg.clone()));
let handler =
MigrationSubprotocolHandler::new(orch.clone(), source, target_handler.clone(), target_node);
let chain_head = {
let mut chain = net::adapter::net::state::causal::CausalChainBuilder::new(origin);
chain.append(Bytes::from_static(b"x"), 0).unwrap();
*chain.head()
};
let snapshot = StateSnapshot::new(
kp.entity_id().clone(),
chain_head,
Bytes::from(0u64.to_le_bytes().to_vec()),
net::adapter::net::state::horizon::ObservedHorizon::new(),
);
let snapshot_msg = MigrationMessage::SnapshotReady {
daemon_origin: origin,
snapshot_bytes: snapshot.to_bytes(),
seq_through: snapshot.through_seq,
chunk_index: 0,
total_chunks: 1,
};
match handler.handle_message(&wire::encode(&snapshot_msg).unwrap(), attacker) {
Err(MigrationError::WrongPeer {
daemon_origin: o,
from,
expected,
}) => {
assert_eq!(o, origin);
assert_eq!(from, attacker);
assert_eq!(expected, expected_orchestrator);
}
other => panic!("expected WrongPeer rejection, got {:?}", other),
}
assert!(
!target_handler.is_migrating(origin),
"attacker SnapshotReady must NOT register a migration record",
);
assert!(factories.bind_expected_orchestrator(origin, expected_orchestrator));
assert!(!factories.bind_expected_orchestrator(origin, attacker));
}
#[test]
fn test_reassembler_out_of_order_chunks() {
let data = vec![0xABu8; MAX_SNAPSHOT_CHUNK_SIZE * 3 + 500];
let total_len = data.len();
let chunks = chunk_snapshot(0xAAAA, data, 99).unwrap();
assert_eq!(chunks.len(), 4);
let mut reassembler = SnapshotReassembler::new();
let feed_order = [3, 1, 0, 2];
for &i in &feed_order[..3] {
let chunk = &chunks[i as usize];
if let MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} = chunk
{
let result = reassembler.feed(
*daemon_origin,
snapshot_bytes.clone(),
*seq_through,
*chunk_index,
*total_chunks,
);
assert!(result.unwrap().is_none(), "chunk {} should not complete", i);
}
}
let last = &chunks[feed_order[3] as usize];
if let MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} = last
{
let result = reassembler
.feed(
*daemon_origin,
snapshot_bytes.clone(),
*seq_through,
*chunk_index,
*total_chunks,
)
.expect("last chunk should complete reassembly")
.expect("last chunk should return data");
assert_eq!(result.len(), total_len);
assert!(result.iter().all(|&b| b == 0xAB));
}
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn test_reassembler_duplicate_chunks_handled() {
let data = vec![0xCDu8; MAX_SNAPSHOT_CHUNK_SIZE * 2 + 100];
let total_len = data.len();
let chunks = chunk_snapshot(0xBBBB, data, 50).unwrap();
assert_eq!(chunks.len(), 3);
let mut reassembler = SnapshotReassembler::new();
if let MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} = &chunks[0]
{
let _ = reassembler.feed(
*daemon_origin,
snapshot_bytes.clone(),
*seq_through,
*chunk_index,
*total_chunks,
);
let _ = reassembler.feed(
*daemon_origin,
snapshot_bytes.clone(),
*seq_through,
*chunk_index,
*total_chunks,
);
}
for chunk in &chunks[1..] {
if let MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} = chunk
{
let result = reassembler.feed(
*daemon_origin,
snapshot_bytes.clone(),
*seq_through,
*chunk_index,
*total_chunks,
);
if *chunk_index == *total_chunks - 1 {
let full = result
.expect("feed should not error")
.expect("last chunk should complete");
assert_eq!(full.len(), total_len);
}
}
}
}
#[test]
fn test_event_buffer_flows_to_target_replay() {
let source_reg = Arc::new(DaemonRegistry::new());
let target_reg = Arc::new(DaemonRegistry::new());
let (kp, origin) = register_counter_daemon(&source_reg, 0);
for seq in 1..=10 {
source_reg
.deliver(origin, &make_event(0xFFFF, seq))
.unwrap();
}
let source_handler = MigrationSourceHandler::new(source_reg.clone());
let target_handler = MigrationTargetHandler::new(target_reg.clone());
let snapshot = source_handler
.start_snapshot(origin, 0x2222, 0x1111)
.unwrap();
for seq in 11..=15 {
source_handler
.buffer_event(origin, make_event(0xFFFF, seq))
.unwrap();
}
target_handler
.restore_snapshot(
RestoreContext {
daemon_origin: origin,
snapshot: &snapshot,
source_node: 0x1111,
orchestrator_node: 0x1111,
},
kp.clone(),
|| Box::new(CounterDaemon::new()),
DaemonHostConfig::default(),
)
.unwrap();
let buffered = source_handler.take_buffered_events(origin).unwrap();
assert_eq!(buffered.len(), 5);
let replayed = target_handler.replay_events(origin, buffered).unwrap();
assert_eq!(replayed, 15);
let target_stats = target_reg.stats(origin).unwrap();
assert_eq!(target_stats.events_processed, 5);
target_handler.activate(origin).unwrap();
target_handler.complete(origin).unwrap();
assert!(target_reg.contains(origin));
}
#[test]
fn test_concurrent_migrations_no_interference() {
let reg = Arc::new(DaemonRegistry::new());
let (_, origin_a) = register_counter_daemon(®, 100);
let (_, origin_b) = register_counter_daemon(®, 200);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
orch.start_migration(origin_a, 0x1111, 0x2222).unwrap();
orch.start_migration(origin_b, 0x1111, 0x3333).unwrap();
assert_eq!(orch.active_count(), 2);
orch.on_restore_complete(origin_a, 100).unwrap();
orch.on_replay_complete(origin_a, 100, make_link(origin_a, 100))
.unwrap();
orch.on_cutover_acknowledged(origin_a).unwrap();
orch.on_cleanup_complete(origin_a).unwrap();
orch.on_activate_ack(origin_a, 100).unwrap();
assert!(!orch.is_migrating(origin_a));
assert!(orch.is_migrating(origin_b));
assert_eq!(orch.status(origin_b), Some(MigrationPhase::Transfer));
orch.on_restore_complete(origin_b, 200).unwrap();
orch.on_replay_complete(origin_b, 200, make_link(origin_b, 200))
.unwrap();
orch.on_cutover_acknowledged(origin_b).unwrap();
orch.on_cleanup_complete(origin_b).unwrap();
orch.on_activate_ack(origin_b, 200).unwrap();
assert_eq!(orch.active_count(), 0);
}
#[test]
fn test_enriched_capabilities_discoverable_by_scheduler() {
let subproto_reg = SubprotocolRegistry::with_defaults();
let node_a_caps = subproto_reg.enrich_capabilities(CapabilitySet::new());
assert!(node_a_caps.has_tag("subprotocol:0x0500"));
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(0xAAAA, test_entity_id(), 1, node_a_caps),
)
.expect("apply legacy announcement in fixture");
let node_b_caps = CapabilitySet::new();
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(0xBBBB, test_entity_id(), 1, node_b_caps),
)
.expect("apply legacy announcement in fixture");
let scheduler = Scheduler::new(fold, 0xCCCC, CapabilitySet::new());
let targets = scheduler.find_migration_targets(&CapabilityFilter::default(), 0xCCCC);
assert_eq!(targets.len(), 1);
assert_eq!(targets[0], 0xAAAA);
}
#[test]
fn test_wire_roundtrip_all_message_types() {
use net::adapter::net::compute::orchestrator::wire;
let messages: Vec<MigrationMessage> = vec![
MigrationMessage::TakeSnapshot {
daemon_origin: 0x1111,
target_node: 0x2222,
},
MigrationMessage::SnapshotReady {
daemon_origin: 0x3333,
snapshot_bytes: vec![1, 2, 3, 4, 5],
seq_through: 42,
chunk_index: 0,
total_chunks: 1,
},
MigrationMessage::SnapshotReady {
daemon_origin: 0x3333,
snapshot_bytes: vec![6, 7, 8],
seq_through: 42,
chunk_index: 2,
total_chunks: 5,
},
MigrationMessage::RestoreComplete {
daemon_origin: 0x4444,
restored_seq: 100,
},
MigrationMessage::ReplayComplete {
daemon_origin: 0x5555,
replayed_seq: 200,
target_head: CausalLink {
origin_hash: 0x5555,
horizon_encoded: 0,
sequence: 200,
parent_hash: 0xDEAD_BEEF,
},
},
MigrationMessage::CutoverNotify {
daemon_origin: 0x6666,
target_node: 0x7777,
},
MigrationMessage::CleanupComplete {
daemon_origin: 0x8888,
},
MigrationMessage::MigrationFailed {
daemon_origin: 0x9999,
reason: net::adapter::net::compute::MigrationFailureReason::StateFailed(
"test failure".into(),
),
},
MigrationMessage::BufferedEvents {
daemon_origin: 0xAAAA,
events: vec![make_event(0xBBBB, 1), make_event(0xBBBB, 2)],
},
];
for msg in &messages {
let encoded = wire::encode(msg).unwrap();
let decoded = wire::decode(&encoded).unwrap();
assert_eq!(
std::mem::discriminant(msg),
std::mem::discriminant(&decoded),
"roundtrip failed for {:?}",
msg,
);
}
}
#[test]
fn test_abort_at_each_phase() {
let reg = Arc::new(DaemonRegistry::new());
{
let (_, origin) = register_counter_daemon(®, 1);
let orch = MigrationOrchestrator::new(reg.clone(), 0x3333);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Snapshot));
orch.abort_migration(origin, "abort at snapshot".into())
.unwrap();
assert!(!orch.is_migrating(origin));
}
{
let (_, origin) = register_counter_daemon(®, 2);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Transfer));
orch.abort_migration(origin, "abort at transfer".into())
.unwrap();
assert!(!orch.is_migrating(origin));
}
{
let (_, origin) = register_counter_daemon(®, 3);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
orch.on_restore_complete(origin, 3).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Replay));
orch.abort_migration(origin, "abort at replay".into())
.unwrap();
assert!(!orch.is_migrating(origin));
}
{
let (_, origin) = register_counter_daemon(®, 4);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
orch.on_restore_complete(origin, 4).unwrap();
orch.on_replay_complete(origin, 4, make_link(origin, 4))
.unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Cutover));
orch.abort_migration(origin, "abort at cutover".into())
.unwrap();
assert!(!orch.is_migrating(origin));
}
}
#[test]
fn test_regression_cutover_routed_to_source_not_target() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 10);
let source_node: u64 = 0x1111;
let target_node: u64 = 0x2222;
let orchestrator_node: u64 = 0x3333;
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), orchestrator_node));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source, target, orchestrator_node);
orch.start_migration(origin, source_node, target_node)
.unwrap();
orch.on_restore_complete(origin, 10).unwrap();
let replay_msg = MigrationMessage::ReplayComplete {
daemon_origin: origin,
replayed_seq: 10,
target_head: CausalLink {
origin_hash: origin,
horizon_encoded: 0,
sequence: 10,
parent_hash: 0,
},
};
let outbound = handler
.handle_message(&wire::encode(&replay_msg).unwrap(), target_node)
.unwrap();
let cutover = outbound
.iter()
.find(|o| {
matches!(
wire::decode(&o.payload),
Ok(MigrationMessage::CutoverNotify { .. })
)
})
.expect("expected CutoverNotify in outbound");
assert_eq!(
cutover.dest_node, source_node,
"CutoverNotify must be routed to source node {:#x}, not target {:#x}",
source_node, cutover.dest_node,
);
}
#[test]
fn test_regression_snapshot_forwarded_to_actual_target() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let source_reg = Arc::new(DaemonRegistry::new());
let orch_reg = Arc::new(DaemonRegistry::new());
let (_kp, origin) = register_counter_daemon(&source_reg, 5);
let source_node: u64 = 0x1111;
let target_node: u64 = 0x2222;
let orchestrator_node: u64 = 0x3333;
let orch = Arc::new(MigrationOrchestrator::new(
orch_reg.clone(),
orchestrator_node,
));
let source = Arc::new(MigrationSourceHandler::new(orch_reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(orch_reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source, target, orchestrator_node);
orch.start_migration(origin, source_node, target_node)
.unwrap();
let real_snapshot = source_reg.snapshot(origin).unwrap().unwrap();
let snapshot_bytes = real_snapshot.to_bytes();
let snapshot_msg = MigrationMessage::SnapshotReady {
daemon_origin: origin,
snapshot_bytes,
seq_through: real_snapshot.through_seq,
chunk_index: 0,
total_chunks: 1,
};
let outbound = handler
.handle_message(&wire::encode(&snapshot_msg).unwrap(), source_node)
.unwrap();
let forwarded = outbound
.iter()
.find(|o| {
matches!(
wire::decode(&o.payload),
Ok(MigrationMessage::SnapshotReady { .. })
)
})
.expect("expected SnapshotReady forwarded in outbound");
assert_eq!(
forwarded.dest_node, target_node,
"SnapshotReady must be forwarded to target {:#x}, got {:#x}",
target_node, forwarded.dest_node,
);
}
#[test]
fn test_regression_start_migration_atomic_duplicate_check() {
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 1);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
let err = orch.start_migration(origin, 0x1111, 0x3333).unwrap_err();
assert!(
matches!(err, net::adapter::net::MigrationError::AlreadyMigrating(_)),
"expected AlreadyMigrating, got {:?}",
err,
);
assert_eq!(orch.target_node(origin), Some(0x2222));
}
#[test]
fn test_regression_start_snapshot_atomic_duplicate_check() {
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 1);
let handler = MigrationSourceHandler::new(reg.clone());
handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
let err = handler.start_snapshot(origin, 0x3333, 0x1111).unwrap_err();
assert!(
matches!(err, net::adapter::net::MigrationError::AlreadyMigrating(_)),
"expected AlreadyMigrating, got {:?}",
err,
);
}
#[test]
fn test_regression_drain_pending_error_propagated() {
let reg = Arc::new(DaemonRegistry::new());
let handler = MigrationTargetHandler::new(reg.clone());
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let mut chain = net::adapter::net::state::causal::CausalChainBuilder::new(origin);
for _ in 0..5 {
chain.append(Bytes::from_static(b"x"), 0);
}
let snapshot = StateSnapshot::new(
kp.entity_id().clone(),
*chain.head(),
Bytes::from(0u64.to_le_bytes().to_vec()),
net::adapter::net::state::horizon::ObservedHorizon::new(),
);
handler
.restore_snapshot(
RestoreContext {
daemon_origin: origin,
snapshot: &snapshot,
source_node: 0x1111,
orchestrator_node: 0x1111,
},
kp.clone(),
|| Box::new(CounterDaemon::new()),
DaemonHostConfig::default(),
)
.unwrap();
let result = handler.buffer_event(origin, make_event(0xFFFF, 6));
assert!(result.is_ok(), "buffer_event should propagate success");
let result = handler.buffer_event(origin, make_event(0xFFFF, 7));
assert!(result.is_ok());
assert_eq!(handler.replayed_through(origin), Some(7));
}
#[test]
fn test_regression_snapshot_through_seq_correct() {
let reg = Arc::new(DaemonRegistry::new());
let (_kp, origin) = register_counter_daemon(®, 50);
for seq in 1..=10 {
reg.deliver(origin, &make_event(0xFFFF, seq)).unwrap();
}
let snapshot = reg.snapshot(origin).unwrap().unwrap();
assert_eq!(
snapshot.through_seq, 10,
"snapshot through_seq should reflect daemon's chain sequence"
);
assert_eq!(snapshot.chain_link.sequence, 10);
}
#[test]
fn test_regression_full_handler_routing_chain() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 20);
let source_node: u64 = 0xAAAA;
let target_node: u64 = 0xBBBB;
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0xCCCC));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch.clone(), source.clone(), target, 0xCCCC);
orch.start_migration(origin, source_node, target_node)
.unwrap();
orch.on_restore_complete(origin, 20).unwrap();
let replay_msg = MigrationMessage::ReplayComplete {
daemon_origin: origin,
replayed_seq: 20,
target_head: CausalLink {
origin_hash: origin,
horizon_encoded: 0,
sequence: 20,
parent_hash: 0,
},
};
let outbound = handler
.handle_message(&wire::encode(&replay_msg).unwrap(), target_node)
.unwrap();
let cutover_out = outbound
.iter()
.find(|o| {
matches!(
wire::decode(&o.payload),
Ok(MigrationMessage::CutoverNotify { .. })
)
})
.expect("expected CutoverNotify");
assert_eq!(cutover_out.dest_node, source_node);
source.start_snapshot(origin, target_node, 0xCCCC).unwrap();
let cutover_outbound = handler
.handle_message(&cutover_out.payload, 0xCCCC) .unwrap();
let cleanup_out = cutover_outbound
.iter()
.find(|o| {
matches!(
wire::decode(&o.payload),
Ok(MigrationMessage::CleanupComplete { .. })
)
})
.expect("expected CleanupComplete");
assert_eq!(cleanup_out.dest_node, 0xCCCC);
for out in &cutover_outbound {
if let Ok(MigrationMessage::BufferedEvents { .. }) = wire::decode(&out.payload) {
assert_eq!(
out.dest_node, target_node,
"BufferedEvents must go to target"
);
}
}
}
#[test]
fn test_regression_on_replay_complete_third_party_orchestrator_no_local_daemon() {
let orch_reg = Arc::new(DaemonRegistry::new());
let (_kp, origin) = register_counter_daemon(&orch_reg, 0);
let target_node: u64 = 0xBBBB;
let orch_node: u64 = 0xCCCC;
let orch = MigrationOrchestrator::new(orch_reg.clone(), orch_node);
orch.start_migration(origin, orch_node, target_node)
.unwrap();
orch.on_restore_complete(origin, 42).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Replay));
orch_reg.unregister(origin).unwrap();
let cutover_msg = orch
.on_replay_complete(origin, 42, make_link(origin, 42))
.expect("third-party orchestrator must advance past Replay");
match cutover_msg {
MigrationMessage::CutoverNotify {
target_node: tn, ..
} => {
assert_eq!(tn, target_node);
}
other => panic!("expected CutoverNotify, got {:?}", other),
}
assert_eq!(orch.status(origin), Some(MigrationPhase::Cutover));
}
#[test]
fn test_regression_cleanup_complete_prefers_recorded_orchestrator() {
use net::adapter::net::compute::orchestrator::wire;
use net::adapter::net::subprotocol::MigrationSubprotocolHandler;
let reg = Arc::new(DaemonRegistry::new());
let (_, origin) = register_counter_daemon(®, 7);
let local_node: u64 = 0x1234;
let target_node: u64 = 0xCAFE;
let orchestrator_node: u64 = 0xAAAA;
let relay_node: u64 = 0xBBBB;
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), local_node));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch, source.clone(), target, local_node);
source
.start_snapshot(origin, target_node, orchestrator_node)
.unwrap();
let cutover = MigrationMessage::CutoverNotify {
daemon_origin: origin,
target_node,
};
let outbound = handler
.handle_message(&wire::encode(&cutover).unwrap(), relay_node)
.unwrap();
let cleanup = outbound
.iter()
.find(|o| {
matches!(
wire::decode(&o.payload),
Ok(MigrationMessage::CleanupComplete { .. })
)
})
.expect("expected CleanupComplete outbound");
assert_eq!(
cleanup.dest_node, orchestrator_node,
"CleanupComplete must go to the recorded orchestrator ({:#x}), not to the \
wire hop {:#x}",
orchestrator_node, relay_node
);
}
#[test]
fn test_regression_reassembler_rejects_mixed_seq_through() {
let mut reassembler = SnapshotReassembler::new();
let result = reassembler.feed(0xAAAA, vec![1, 2, 3], 100, 0, 2).unwrap();
assert!(result.is_none());
let result = reassembler.feed(0xAAAA, vec![4, 5, 6], 200, 0, 2).unwrap();
assert!(result.is_none());
let result = reassembler.feed(0xAAAA, vec![7, 8, 9], 200, 1, 2).unwrap();
assert!(result.is_some());
let full = result.unwrap();
assert_eq!(full, vec![4, 5, 6, 7, 8, 9]);
reassembler.cancel(0xAAAA);
assert_eq!(reassembler.pending_count(), 0);
}
#[test]
fn test_regression_multi_chunk_advances_past_snapshot_phase() {
let reg = Arc::new(DaemonRegistry::new());
let (kp, origin) = register_counter_daemon(®, 10);
let orch = MigrationOrchestrator::new(reg.clone(), 0x3333);
orch.start_migration(origin, 0x1111, 0x2222).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Snapshot));
let mut chunk0 = Vec::with_capacity(48);
chunk0.extend_from_slice(b"CDS1");
chunk0.push(2); chunk0.extend_from_slice(kp.entity_id().as_bytes());
chunk0.extend_from_slice(&[0u8; 11]); orch.on_snapshot_ready(origin, chunk0, 10, 0, 3).unwrap();
assert_ne!(
orch.status(origin),
Some(MigrationPhase::Snapshot),
"multi-chunk first chunk must advance phase past Snapshot"
);
assert_eq!(orch.status(origin), Some(MigrationPhase::Transfer));
orch.on_snapshot_ready(origin, vec![4, 5, 6], 10, 1, 3)
.unwrap();
orch.on_snapshot_ready(origin, vec![7, 8], 10, 2, 3)
.unwrap();
orch.on_restore_complete(origin, 10).unwrap();
assert_eq!(orch.status(origin), Some(MigrationPhase::Replay));
}
#[test]
fn test_regression_chunk_count_boundary() {
let chunks = chunk_snapshot(0xAAAA, vec![0u8; MAX_SNAPSHOT_CHUNK_SIZE], 1).unwrap();
assert_eq!(chunks.len(), 1);
let chunks = chunk_snapshot(0xAAAA, vec![0u8; MAX_SNAPSHOT_CHUNK_SIZE + 1], 1).unwrap();
assert_eq!(chunks.len(), 2);
let chunks = chunk_snapshot(0xAAAA, vec![0u8; MAX_SNAPSHOT_CHUNK_SIZE * 100], 1).unwrap();
assert_eq!(chunks.len(), 100);
for (i, chunk) in chunks.iter().enumerate() {
if let MigrationMessage::SnapshotReady {
chunk_index,
total_chunks,
..
} = chunk
{
assert_eq!(*chunk_index, i as u32);
assert_eq!(*total_chunks, 100);
}
}
}
fn make_scheduler_for_groups() -> Scheduler {
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
for node_id in [0x1111u64, 0x2222, 0x3333] {
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(node_id, test_entity_id(), 1, CapabilitySet::new()),
)
.expect("apply legacy announcement in fixture");
}
Scheduler::new(fold, 0x1111, CapabilitySet::new())
}
#[test]
fn test_replica_group_route_and_deliver() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let group = ReplicaGroup::spawn(
ReplicaGroupConfig {
replica_count: 3,
group_seed: [99u8; 32],
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let ctx = RequestContext::default();
let origin = group.route_event(&ctx).unwrap();
let event = make_event(0xFFFF, 1);
let outputs = reg.deliver(origin, &event).unwrap();
assert_eq!(outputs.len(), 1);
let count = u64::from_le_bytes(outputs[0].payload[..8].try_into().unwrap());
assert_eq!(count, 1);
let event2 = make_event(0xFFFF, 2);
let outputs2 = reg.deliver(origin, &event2).unwrap();
let count2 = u64::from_le_bytes(outputs2[0].payload[..8].try_into().unwrap());
assert_eq!(count2, 2);
}
#[test]
fn test_fork_group_causal_chain_carries_sentinel() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let parent_origin: u64 = 0xAAAA;
let fork_seq: u64 = 100;
let group = ForkGroup::fork(
parent_origin,
fork_seq,
ForkGroupConfig {
fork_count: 2,
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let expected_sentinel = fork_sentinel(parent_origin, fork_seq);
for record in group.fork_records() {
assert_eq!(record.original_origin, parent_origin);
assert_eq!(record.fork_seq, fork_seq);
assert_eq!(record.fork_genesis.parent_hash, expected_sentinel);
assert_eq!(record.fork_genesis.sequence, 0);
assert!(record.verify());
}
let ctx = RequestContext::default();
let origin = group.route_event(&ctx).unwrap();
let event = make_event(0xFFFF, 1);
let outputs = reg.deliver(origin, &event).unwrap();
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].link.origin_hash, origin);
assert_ne!(outputs[0].link.origin_hash, parent_origin);
assert_eq!(outputs[0].link.sequence, 1);
assert_ne!(outputs[0].link.parent_hash, 0);
}
#[test]
fn test_fork_group_recovery_preserves_chain_identity() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let parent_origin: u64 = 0xBBBB;
let fork_seq: u64 = 50;
let mut group = ForkGroup::fork(
parent_origin,
fork_seq,
ForkGroupConfig {
fork_count: 2,
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let fork_0_origin = group.members()[0].origin_hash;
let fork_0_node = group.members()[0].node_id;
let event = make_event(0xFFFF, 1);
reg.deliver(fork_0_origin, &event).unwrap();
let replaced = group
.on_node_failure(fork_0_node, || Box::new(CounterDaemon::new()), &sched, ®)
.unwrap();
assert!(!replaced.is_empty());
assert!(group
.members()
.iter()
.any(|m| m.origin_hash == fork_0_origin));
let event2 = make_event(0xFFFF, 2);
let outputs = reg.deliver(fork_0_origin, &event2).unwrap();
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].link.origin_hash, fork_0_origin);
assert!(group.verify_lineage());
let expected_sentinel = fork_sentinel(parent_origin, fork_seq);
for record in group.fork_records() {
assert_eq!(record.fork_genesis.parent_hash, expected_sentinel);
}
}
#[test]
fn test_fork_then_migrate() {
let source_reg = Arc::new(DaemonRegistry::new());
let _target_reg = Arc::new(DaemonRegistry::new());
let sched = make_scheduler_for_groups();
let parent_origin: u64 = 0xCCCC;
let fork_seq: u64 = 200;
let group = ForkGroup::fork(
parent_origin,
fork_seq,
ForkGroupConfig {
fork_count: 2,
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
&source_reg,
)
.unwrap();
let fork_origin = group.members()[0].origin_hash;
for seq in 1..=5 {
source_reg
.deliver(fork_origin, &make_event(0xFFFF, seq))
.unwrap();
}
let snapshot = source_reg.snapshot(fork_origin).unwrap().unwrap();
assert_eq!(snapshot.through_seq, 5);
let orch = MigrationOrchestrator::new(source_reg.clone(), 0x1111);
let msgs = orch.start_migration(fork_origin, 0x1111, 0x2222).unwrap();
assert!(!msgs.is_empty(), "must emit at least one chunk");
match &msgs[0] {
MigrationMessage::SnapshotReady {
daemon_origin,
seq_through,
..
} => {
assert_eq!(*daemon_origin, fork_origin);
assert_eq!(*seq_through, 5);
}
other => panic!("expected SnapshotReady for fork, got {:?}", other),
}
assert!(orch.is_migrating(fork_origin));
}
#[test]
fn test_group_coordinator_route_delivers_to_daemon() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let replica_group = ReplicaGroup::spawn(
ReplicaGroupConfig {
replica_count: 2,
group_seed: [11u8; 32],
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let ctx = RequestContext::default();
let replica_origin = replica_group.route_event(&ctx).unwrap();
let outputs = reg.deliver(replica_origin, &make_event(0xFFFF, 1)).unwrap();
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].link.origin_hash, replica_origin);
let replica_origin_2 = replica_group.route_event(&ctx).unwrap();
let outputs2 = reg
.deliver(replica_origin_2, &make_event(0xFFFF, 2))
.unwrap();
assert_eq!(outputs2.len(), 1);
assert_eq!(outputs2[0].link.origin_hash, replica_origin_2);
assert!(replica_group
.replicas()
.iter()
.any(|r| r.origin_hash == replica_origin));
assert!(replica_group
.replicas()
.iter()
.any(|r| r.origin_hash == replica_origin_2));
let fork_reg = DaemonRegistry::new();
let fork_group = ForkGroup::fork(
0xDDDD,
300,
ForkGroupConfig {
fork_count: 2,
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
&fork_reg,
)
.unwrap();
let fork_origin = fork_group.route_event(&ctx).unwrap();
let fork_outputs = fork_reg
.deliver(fork_origin, &make_event(0xFFFF, 1))
.unwrap();
assert_eq!(fork_outputs.len(), 1);
assert_eq!(fork_outputs[0].link.origin_hash, fork_origin);
assert_eq!(fork_outputs[0].link.sequence, 1);
assert_ne!(fork_origin, 0xDDDD);
}
#[test]
fn test_standby_sync_promote_state_continuity() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let mut group = StandbyGroup::spawn(
StandbyGroupConfig {
member_count: 3,
group_seed: [77u8; 32],
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let active = group.active_origin();
for seq in 1..=10 {
let event = make_event(0xFFFF, seq);
let outputs = reg.deliver(active, &event).unwrap();
let val = u64::from_le_bytes(outputs[0].payload[..8].try_into().unwrap());
assert_eq!(val, seq);
group.on_event_delivered(event);
}
let synced = group.sync_standbys(®).unwrap();
assert_eq!(synced, 10);
assert_eq!(group.buffered_event_count(), 0);
for seq in 11..=13 {
let event = make_event(0xFFFF, seq);
let outputs = reg.deliver(active, &event).unwrap();
let val = u64::from_le_bytes(outputs[0].payload[..8].try_into().unwrap());
assert_eq!(val, seq);
group.on_event_delivered(event);
}
assert_eq!(group.buffered_event_count(), 3);
let new_active = group
.promote(|| Box::new(CounterDaemon::new()), ®, &sched)
.unwrap();
assert_ne!(new_active, active);
assert_eq!(group.buffered_event_count(), 0);
let event = make_event(0xFFFF, 14);
let outputs = reg.deliver(new_active, &event).unwrap();
assert_eq!(outputs.len(), 1);
let val = u64::from_le_bytes(outputs[0].payload[..8].try_into().unwrap());
assert_eq!(val, 14);
assert_eq!(outputs[0].link.origin_hash, active);
}
#[test]
fn test_standby_promote_then_continue() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let mut group = StandbyGroup::spawn(
StandbyGroupConfig {
member_count: 2,
group_seed: [88u8; 32],
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let active = group.active_origin();
for seq in 1..=5 {
let event = make_event(0xFFFF, seq);
reg.deliver(active, &event).unwrap();
group.on_event_delivered(event);
}
group.sync_standbys(®).unwrap();
let new_active = group
.promote(|| Box::new(CounterDaemon::new()), ®, &sched)
.unwrap();
for seq in 1..=10 {
let event = make_event(0xFFFF, 100 + seq);
let outputs = reg.deliver(new_active, &event).unwrap();
assert_eq!(outputs.len(), 1);
let val = u64::from_le_bytes(outputs[0].payload[..8].try_into().unwrap());
assert_eq!(val, 5 + seq); assert_eq!(outputs[0].link.origin_hash, active);
}
assert_eq!(
group.member_role(group.active_index()),
Some(MemberRole::Active)
);
assert!(group.active_healthy());
}
#[test]
fn test_standby_group_active_migrates_via_mikoshi() {
let reg = Arc::new(DaemonRegistry::new());
let sched = make_scheduler_for_groups();
let mut group = StandbyGroup::spawn(
StandbyGroupConfig {
member_count: 3,
group_seed: [99u8; 32],
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let active = group.active_origin();
for seq in 1..=7 {
let event = make_event(0xFFFF, seq);
reg.deliver(active, &event).unwrap();
group.on_event_delivered(event);
}
let snapshot = reg.snapshot(active).unwrap().unwrap();
assert_eq!(snapshot.through_seq, 7);
let orch = MigrationOrchestrator::new(reg.clone(), 0x1111);
let msgs = orch.start_migration(active, 0x1111, 0x2222).unwrap();
assert!(orch.is_migrating(active));
assert!(!msgs.is_empty(), "must emit at least one chunk");
match &msgs[0] {
MigrationMessage::SnapshotReady {
daemon_origin,
seq_through,
..
} => {
assert_eq!(*daemon_origin, active);
assert_eq!(*seq_through, 7);
}
other => panic!("expected SnapshotReady, got {:?}", other),
}
assert_eq!(group.active_origin(), active);
assert!(group.active_healthy());
assert_eq!(group.member_count(), 3);
assert_eq!(group.standby_count(), 2);
}
#[test]
fn test_gap_recovery_skips_unregistered_member() {
let reg = DaemonRegistry::new();
let mut coord = GroupCoordinator::new(Strategy::RoundRobin);
let kp0 = EntityKeypair::generate();
let kp1 = EntityKeypair::generate();
let host1 = DaemonHost::new(
Box::new(CounterDaemon::new()),
kp1.clone(),
DaemonHostConfig::default(),
);
reg.register(host1).unwrap();
coord.add_member(MemberInfo {
index: 0,
origin_hash: kp0.origin_hash(),
node_id: 0x1111,
entity_id_bytes: *kp0.entity_id().as_bytes(),
healthy: false, });
coord.add_member(MemberInfo {
index: 1,
origin_hash: kp1.origin_hash(),
node_id: 0x1111,
entity_id_bytes: *kp1.entity_id().as_bytes(),
healthy: false,
});
assert_eq!(coord.health(), GroupHealth::Dead);
coord.on_node_recovery(0x1111, ®);
assert_eq!(
coord.health(),
GroupHealth::Degraded {
healthy: 1,
total: 2
},
"unregistered member should stay unhealthy after recovery"
);
assert!(
!coord.members()[0].healthy,
"kp0 not in registry, should stay unhealthy"
);
assert!(
coord.members()[1].healthy,
"kp1 in registry, should be healthy"
);
}
#[test]
fn test_gap_promote_no_healthy_standbys() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let mut group = StandbyGroup::spawn(
StandbyGroupConfig {
member_count: 2,
group_seed: [202u8; 32],
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
group
.promote(|| Box::new(CounterDaemon::new()), ®, &sched)
.unwrap();
let err = group
.promote(|| Box::new(CounterDaemon::new()), ®, &sched)
.unwrap_err();
assert_eq!(err, GroupError::NoHealthyMember);
}
#[test]
fn test_gap_from_fork_origin_mismatch_returns_err() {
use net::adapter::net::state::causal::CausalChainBuilder;
let keypair_a = EntityKeypair::generate();
let keypair_b = EntityKeypair::generate();
let chain = CausalChainBuilder::new(keypair_a.origin_hash());
struct NoopDaemon;
impl MeshDaemon for NoopDaemon {
fn name(&self) -> &str {
"noop"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
Ok(vec![])
}
}
let result = DaemonHost::from_fork(
Box::new(NoopDaemon),
keypair_b,
chain,
DaemonHostConfig::default(),
);
assert!(matches!(result, Err(DaemonError::RestoreFailed(_))));
}
#[test]
fn test_gap_reassembler_mismatched_total_chunks() {
let mut reassembler = SnapshotReassembler::new();
let result = reassembler.feed(0xAAAA, vec![1, 2], 100, 0, 3).unwrap();
assert!(result.is_none());
let result = reassembler.feed(0xAAAA, vec![3, 4], 100, 1, 4);
assert!(result.is_err(), "mismatched total_chunks should error");
let result = reassembler.feed(0xAAAA, vec![3, 4], 100, 1, 3).unwrap();
assert!(result.is_none());
let result = reassembler.feed(0xAAAA, vec![5, 6], 100, 2, 3).unwrap();
assert!(result.is_some());
let full = result.unwrap();
assert_eq!(full, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_gap_group_coordinator_standalone() {
let mut coord = GroupCoordinator::new(Strategy::RoundRobin);
assert_eq!(coord.member_count(), 0);
assert_eq!(coord.health(), GroupHealth::Dead);
let kp0 = EntityKeypair::generate();
let kp1 = EntityKeypair::generate();
coord.add_member(MemberInfo {
index: 0,
origin_hash: kp0.origin_hash(),
node_id: 0x1111,
entity_id_bytes: *kp0.entity_id().as_bytes(),
healthy: true,
});
coord.add_member(MemberInfo {
index: 1,
origin_hash: kp1.origin_hash(),
node_id: 0x2222,
entity_id_bytes: *kp1.entity_id().as_bytes(),
healthy: true,
});
assert_eq!(coord.member_count(), 2);
assert_eq!(coord.healthy_count(), 2);
assert_eq!(coord.health(), GroupHealth::Healthy);
coord.mark_unhealthy(0);
assert_eq!(coord.healthy_count(), 1);
assert_eq!(
coord.health(),
GroupHealth::Degraded {
healthy: 1,
total: 2
}
);
coord.mark_healthy(0);
assert_eq!(coord.health(), GroupHealth::Healthy);
assert_eq!(coord.members_on_node(0x1111), vec![0]);
assert_eq!(coord.members_on_node(0x2222), vec![1]);
assert_eq!(coord.members_on_node(0x9999), Vec::<u8>::new());
let removed = coord.remove_last().unwrap();
assert_eq!(removed.index, 1);
assert_eq!(coord.member_count(), 1);
let ctx = RequestContext::default();
let origin = coord.route_event(&ctx).unwrap();
assert_eq!(origin, kp0.origin_hash());
}
#[test]
fn test_gap_scale_to_same_size_noop() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let mut replica_group = ReplicaGroup::spawn(
ReplicaGroupConfig {
replica_count: 3,
group_seed: [206u8; 32],
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let origins_before: Vec<u64> = replica_group
.replicas()
.iter()
.map(|r| r.origin_hash)
.collect();
replica_group
.scale_to(3, || Box::new(CounterDaemon::new()), &sched, ®)
.unwrap();
let origins_after: Vec<u64> = replica_group
.replicas()
.iter()
.map(|r| r.origin_hash)
.collect();
assert_eq!(origins_before, origins_after);
assert_eq!(reg.count(), 3);
}
#[test]
fn test_gap_sync_stateless_active_errors() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
struct StatelessDaemon;
impl MeshDaemon for StatelessDaemon {
fn name(&self) -> &str {
"stateless"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
Ok(vec![])
}
}
let mut group = StandbyGroup::spawn(
StandbyGroupConfig {
member_count: 2,
group_seed: [207u8; 32],
host_config: DaemonHostConfig::default(),
},
|| Box::new(StatelessDaemon),
&sched,
®,
)
.unwrap();
let err = group.sync_standbys(®).unwrap_err();
match err {
GroupError::RegistryFailed(msg) => {
assert!(
msg.contains("stateless"),
"expected stateless error, got: {}",
msg
);
}
_ => panic!("expected RegistryFailed, got {:?}", err),
}
}
#[test]
fn test_gap_chunk_empty_snapshot() {
let chunks = chunk_snapshot(0xAAAA, vec![], 0).unwrap();
assert_eq!(chunks.len(), 1);
match &chunks[0] {
MigrationMessage::SnapshotReady {
snapshot_bytes,
chunk_index,
total_chunks,
..
} => {
assert!(snapshot_bytes.is_empty());
assert_eq!(*chunk_index, 0);
assert_eq!(*total_chunks, 1);
}
_ => panic!("expected SnapshotReady"),
}
}
#[test]
fn test_gap_wire_decode_truncated_messages() {
use net::adapter::net::compute::orchestrator::wire;
let test_cases: Vec<(&str, Vec<u8>)> = vec![
("TakeSnapshot", vec![0]), ("SnapshotReady", vec![1]), ("RestoreComplete", vec![2]), ("ReplayComplete", vec![3]), ("CutoverNotify", vec![4]), ("CleanupComplete", vec![5]), ("MigrationFailed", vec![6]), ("BufferedEvents", vec![7]), ("empty", vec![]), ];
for (name, data) in test_cases {
let result = wire::decode(&data);
assert!(
result.is_err(),
"truncated {} should fail to decode, but got: {:?}",
name,
result,
);
}
let result = wire::decode(&[255]);
assert!(result.is_err());
}
#[test]
fn test_regression_buffered_events_rejects_unbounded_count() {
use bytes::BufMut;
use net::adapter::net::compute::orchestrator::wire;
let mut bad = Vec::new();
bad.put_u8(7); bad.put_u64_le(0xAAAA_BBBB); bad.put_u32_le(u32::MAX);
let result = wire::decode(&bad);
assert!(
result.is_err(),
"decoder must reject count that exceeds remaining wire bytes; \
got {:?}",
result
);
let err = format!("{:?}", result.unwrap_err());
assert!(
err.contains("exceeds bound") || err.contains("count"),
"expected a count-bound error, got: {}",
err
);
let mut bad2 = Vec::new();
bad2.put_u8(7);
bad2.put_u64_le(0);
bad2.put_u32_le(2_000_000); bad2.resize(bad2.len() + 2_000_000 * 36, 0);
let result = wire::decode(&bad2);
assert!(
result.is_err(),
"decoder must reject count above the MAX_BUFFERED_EVENTS cap"
);
let mut good = Vec::new();
good.put_u8(7);
good.put_u64_le(0x1234);
good.put_u32_le(0);
let result = wire::decode(&good);
assert!(result.is_ok(), "count=0 must still decode: {:?}", result);
}
#[test]
fn test_gap_promote_empty_buffer() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_for_groups();
let mut group = StandbyGroup::spawn(
StandbyGroupConfig {
member_count: 2,
group_seed: [210u8; 32],
host_config: DaemonHostConfig::default(),
},
|| Box::new(CounterDaemon::new()),
&sched,
®,
)
.unwrap();
let active_before = group.active_origin();
group.sync_standbys(®).unwrap();
assert_eq!(group.buffered_event_count(), 0);
let new_active = group
.promote(|| Box::new(CounterDaemon::new()), ®, &sched)
.unwrap();
assert_ne!(new_active, active_before);
assert_eq!(group.buffered_event_count(), 0);
let event = make_event(0xFFFF, 1);
let outputs = reg.deliver(new_active, &event).unwrap();
assert_eq!(outputs.len(), 1);
}
use net::adapter::net::compute::{DaemonFactoryRegistry, MigrationError};
use net::adapter::net::subprotocol::{MigrationSubprotocolHandler, OutboundMigrationMessage};
struct WireNode {
node_id: u64,
reg: Arc<DaemonRegistry>,
factories: Arc<DaemonFactoryRegistry>,
handler: Arc<MigrationSubprotocolHandler>,
orch: Arc<MigrationOrchestrator>,
}
impl WireNode {
fn new(node_id: u64) -> Self {
let reg = Arc::new(DaemonRegistry::new());
let factories = Arc::new(DaemonFactoryRegistry::new());
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let orch = Arc::new(
MigrationOrchestrator::new(reg.clone(), node_id).with_source_handler(source.clone()),
);
let target = Arc::new(MigrationTargetHandler::new_with_factories(
reg.clone(),
factories.clone(),
));
let handler = Arc::new(MigrationSubprotocolHandler::new(
orch.clone(),
source,
target,
node_id,
));
Self {
node_id,
reg,
factories,
handler,
orch,
}
}
}
fn pump_messages(
nodes: &std::collections::HashMap<u64, Arc<MigrationSubprotocolHandler>>,
mut queue: Vec<(u64, OutboundMigrationMessage)>,
) -> Result<(), MigrationError> {
let mut iterations = 0;
while let Some((from, msg)) = queue.pop() {
iterations += 1;
assert!(
iterations < 100,
"message pump runaway — likely a feedback loop"
);
let dest = nodes
.get(&msg.dest_node)
.unwrap_or_else(|| panic!("no node for dest {:#x}", msg.dest_node));
let outbound = dest.handle_message(&msg.payload, from)?;
for out in outbound {
queue.push((msg.dest_node, out));
}
}
Ok(())
}
#[test]
fn test_migration_full_lifecycle_over_subprotocol_single_chunk() {
let source = WireNode::new(0x1111);
let target = WireNode::new(0x2222);
let (kp, origin) = register_counter_daemon(&source.reg, 100);
for seq in 1..=5 {
source
.reg
.deliver(origin, &make_event(0xFFFF, seq))
.unwrap();
}
target
.factories
.register(kp.clone(), DaemonHostConfig::default(), || {
Box::new(CounterDaemon::new())
})
.unwrap();
let nodes: std::collections::HashMap<u64, Arc<MigrationSubprotocolHandler>> = [
(source.node_id, source.handler.clone()),
(target.node_id, target.handler.clone()),
]
.into_iter()
.collect();
let start_msgs = source
.orch
.start_migration(origin, source.node_id, target.node_id)
.unwrap();
let initial: Vec<(u64, OutboundMigrationMessage)> = start_msgs
.iter()
.map(|m| {
(
source.node_id,
OutboundMigrationMessage {
dest_node: source.node_id,
payload: net::adapter::net::compute::orchestrator::wire::encode(m).unwrap(),
},
)
})
.collect();
pump_messages(&nodes, initial).unwrap();
assert!(target.reg.contains(origin), "daemon should be on target");
assert!(
!source.reg.contains(origin),
"daemon should be gone from source"
);
assert!(
!source.orch.is_migrating(origin),
"orchestrator record removed"
);
assert!(!target.factories.contains(origin));
}
#[test]
fn test_migration_full_lifecycle_over_subprotocol_multi_chunk() {
struct BigBlobDaemon {
state: Vec<u8>,
}
impl MeshDaemon for BigBlobDaemon {
fn name(&self) -> &str {
"blob"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
Ok(vec![])
}
fn snapshot(&self) -> Option<Bytes> {
Some(Bytes::from(self.state.clone()))
}
fn restore(&mut self, s: Bytes) -> Result<(), DaemonError> {
self.state = s.to_vec();
Ok(())
}
}
let source = WireNode::new(0x1111);
let target = WireNode::new(0x2222);
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let blob_size = MAX_SNAPSHOT_CHUNK_SIZE * 3 + 500;
let blob = vec![0xABu8; blob_size];
let host = DaemonHost::new(
Box::new(BigBlobDaemon {
state: blob.clone(),
}),
kp.clone(),
DaemonHostConfig::default(),
);
source.reg.register(host).unwrap();
target
.factories
.register(kp.clone(), DaemonHostConfig::default(), move || {
Box::new(BigBlobDaemon { state: Vec::new() })
})
.unwrap();
let nodes: std::collections::HashMap<u64, Arc<MigrationSubprotocolHandler>> = [
(source.node_id, source.handler.clone()),
(target.node_id, target.handler.clone()),
]
.into_iter()
.collect();
let snapshot = source.reg.snapshot(origin).unwrap().unwrap();
assert!(snapshot.state.len() >= blob_size);
let snapshot_bytes = snapshot.to_bytes();
let chunks = chunk_snapshot(origin, snapshot_bytes, snapshot.through_seq).unwrap();
assert!(chunks.len() >= 2, "expected multi-chunk snapshot");
source
.orch
.start_migration(origin, source.node_id, target.node_id)
.unwrap();
let mut queue: Vec<(u64, OutboundMigrationMessage)> = Vec::new();
for chunk in chunks {
let encoded = net::adapter::net::compute::orchestrator::wire::encode(&chunk).unwrap();
queue.push((
source.node_id,
OutboundMigrationMessage {
dest_node: target.node_id,
payload: encoded,
},
));
}
pump_messages(&nodes, queue).unwrap();
assert!(target.reg.contains(origin), "daemon restored on target");
}
#[test]
fn test_migration_fails_when_no_factory_registered() {
let source = WireNode::new(0x1111);
let target = WireNode::new(0x2222);
let (_kp, origin) = register_counter_daemon(&source.reg, 7);
let nodes: std::collections::HashMap<u64, Arc<MigrationSubprotocolHandler>> = [
(source.node_id, source.handler.clone()),
(target.node_id, target.handler.clone()),
]
.into_iter()
.collect();
let start_msgs = source
.orch
.start_migration(origin, source.node_id, target.node_id)
.unwrap();
let initial: Vec<(u64, OutboundMigrationMessage)> = start_msgs
.iter()
.map(|m| {
(
source.node_id,
OutboundMigrationMessage {
dest_node: source.node_id,
payload: net::adapter::net::compute::orchestrator::wire::encode(m).unwrap(),
},
)
})
.collect();
pump_messages(&nodes, initial).unwrap();
assert!(
!target.reg.contains(origin),
"target must not restore without factory"
);
assert!(
source.reg.contains(origin),
"source daemon preserved on failure"
);
assert!(!source.orch.is_migrating(origin));
}
#[test]
fn test_migration_fails_on_corrupted_snapshot() {
let source = WireNode::new(0x1111);
let target = WireNode::new(0x2222);
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
target
.factories
.register(kp, DaemonHostConfig::default(), || {
Box::new(CounterDaemon::new())
})
.unwrap();
let junk = MigrationMessage::SnapshotReady {
daemon_origin: origin,
snapshot_bytes: vec![0xFFu8; 32], seq_through: 0,
chunk_index: 0,
total_chunks: 1,
};
let payload = net::adapter::net::compute::orchestrator::wire::encode(&junk).unwrap();
let outbound = target
.handler
.handle_message(&payload, source.node_id)
.unwrap();
let failed = outbound
.iter()
.find_map(|o| {
match net::adapter::net::compute::orchestrator::wire::decode(&o.payload).ok()? {
MigrationMessage::MigrationFailed { reason, .. } => Some(reason),
_ => None,
}
})
.expect("expected MigrationFailed");
let failed_msg = match &failed {
net::adapter::net::compute::MigrationFailureReason::StateFailed(m) => m.clone(),
other => panic!("expected StateFailed-wrapped reason, got {other:?}"),
};
assert!(
failed_msg.contains("parse snapshot") || failed_msg.contains("reassembly"),
"unexpected failure reason: {failed_msg}",
);
assert!(!target.reg.contains(origin));
assert!(target.factories.contains(origin));
}
#[test]
fn test_regression_factory_preserved_for_retry_after_restore_failure() {
let source = WireNode::new(0x1111);
let target = WireNode::new(0x2222);
let (kp, origin) = register_counter_daemon(&source.reg, 7);
for seq in 1..=3 {
source
.reg
.deliver(origin, &make_event(0xFFFF, seq))
.unwrap();
}
let snapshot = source.reg.snapshot(origin).unwrap().unwrap();
let valid_bytes = snapshot.to_bytes();
target
.factories
.register(kp.clone(), DaemonHostConfig::default(), || {
Box::new(CounterDaemon::new())
})
.unwrap();
let corrupt = MigrationMessage::SnapshotReady {
daemon_origin: origin,
snapshot_bytes: vec![0xFFu8; 32],
seq_through: 0,
chunk_index: 0,
total_chunks: 1,
};
let payload = net::adapter::net::compute::orchestrator::wire::encode(&corrupt).unwrap();
let outbound = target
.handler
.handle_message(&payload, source.node_id)
.unwrap();
assert!(
outbound.iter().any(|o| matches!(
net::adapter::net::compute::orchestrator::wire::decode(&o.payload),
Ok(MigrationMessage::MigrationFailed { .. })
)),
"first attempt must emit MigrationFailed"
);
assert!(
target.factories.contains(origin),
"factory must remain registered after a failed restore so a \
retry can use it without manual re-registration"
);
assert!(!target.reg.contains(origin));
let good = MigrationMessage::SnapshotReady {
daemon_origin: origin,
snapshot_bytes: valid_bytes,
seq_through: snapshot.through_seq,
chunk_index: 0,
total_chunks: 1,
};
let payload = net::adapter::net::compute::orchestrator::wire::encode(&good).unwrap();
let outbound = target
.handler
.handle_message(&payload, source.node_id)
.unwrap();
assert!(
outbound.iter().any(|o| matches!(
net::adapter::net::compute::orchestrator::wire::decode(&o.payload),
Ok(MigrationMessage::RestoreComplete { .. })
)),
"second attempt must emit RestoreComplete"
);
assert!(
target.reg.contains(origin),
"daemon must be restored on target"
);
assert!(target.factories.contains(origin));
}
#[test]
fn test_regression_snapshot_ready_retry_after_successful_restore_is_idempotent() {
let source = WireNode::new(0x1111);
let target = WireNode::new(0x2222);
let (kp, origin) = register_counter_daemon(&source.reg, 9);
for seq in 1..=3 {
source
.reg
.deliver(origin, &make_event(0xFFFF, seq))
.unwrap();
}
let snapshot = source.reg.snapshot(origin).unwrap().unwrap();
let snapshot_bytes = snapshot.to_bytes();
target
.factories
.register(kp.clone(), DaemonHostConfig::default(), || {
Box::new(CounterDaemon::new())
})
.unwrap();
let snapshot_ready = MigrationMessage::SnapshotReady {
daemon_origin: origin,
snapshot_bytes,
seq_through: snapshot.through_seq,
chunk_index: 0,
total_chunks: 1,
};
let payload = net::adapter::net::compute::orchestrator::wire::encode(&snapshot_ready).unwrap();
let outbound1 = target
.handler
.handle_message(&payload, source.node_id)
.unwrap();
assert!(
outbound1.iter().any(|o| matches!(
net::adapter::net::compute::orchestrator::wire::decode(&o.payload),
Ok(MigrationMessage::RestoreComplete { .. })
)),
"first attempt must emit RestoreComplete"
);
assert!(target.reg.contains(origin), "daemon must be on target");
let outbound2 = target
.handler
.handle_message(&payload, source.node_id)
.unwrap();
let restore_complete_count = outbound2
.iter()
.filter(|o| {
matches!(
net::adapter::net::compute::orchestrator::wire::decode(&o.payload),
Ok(MigrationMessage::RestoreComplete { .. })
)
})
.count();
assert_eq!(
restore_complete_count, 1,
"retry must emit exactly one RestoreComplete"
);
let migration_failed_count = outbound2
.iter()
.filter(|o| {
matches!(
net::adapter::net::compute::orchestrator::wire::decode(&o.payload),
Ok(MigrationMessage::MigrationFailed { .. })
)
})
.count();
assert_eq!(
migration_failed_count, 0,
"retry must not emit MigrationFailed — the daemon is already \
restored here, so this is an idempotent retry"
);
assert!(target.reg.contains(origin));
assert!(
target.factories.contains(origin),
"factory must still be registered until caller explicitly removes it"
);
}
#[test]
fn test_activate_target_without_prior_restore_errors_gracefully() {
let target = WireNode::new(0x2222);
let msg = MigrationMessage::ActivateTarget {
daemon_origin: 0xDEADBEEF,
};
let payload = net::adapter::net::compute::orchestrator::wire::encode(&msg).unwrap();
let result = target.handler.handle_message(&payload, 0x1111);
assert!(
matches!(result, Err(MigrationError::DaemonNotFound(0xDEADBEEF))),
"expected DaemonNotFound, got {:?}",
result
);
}