use std::sync::Arc;
use dashmap::DashMap;
use parking_lot::Mutex;
use crate::adapter::net::compute::migration_target::RestoreContext;
use crate::adapter::net::compute::orchestrator::wire;
use crate::adapter::net::compute::{
MigrationError, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
MigrationTargetHandler, SnapshotReassembler,
};
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::state::snapshot::StateSnapshot;
#[derive(Clone)]
pub struct MigrationIdentityContext {
pub unseal_snapshot: EnvelopeUnsealFn,
pub peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync>,
}
impl std::fmt::Debug for MigrationIdentityContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MigrationIdentityContext")
.field("unseal_snapshot", &"<fn>")
.field("peer_static_lookup", &"<fn>")
.finish()
}
}
#[derive(Debug)]
pub struct OutboundMigrationMessage {
pub dest_node: u64,
pub payload: Vec<u8>,
}
pub type PostRestoreCallback = Arc<dyn Fn(u64) + Send + Sync>;
pub type PreCleanupCallback = Arc<dyn Fn(u64) + Send + Sync>;
pub type ReadinessCallback = Arc<dyn Fn() -> bool + Send + Sync>;
pub type FailureCallback =
Arc<dyn Fn(u64, crate::adapter::net::compute::MigrationFailureReason) + Send + Sync>;
pub type EnvelopeUnsealFn = Arc<
dyn Fn(
&crate::adapter::net::state::snapshot::StateSnapshot,
) -> Result<Option<EntityKeypair>, crate::adapter::net::identity::EnvelopeError>
+ Send
+ Sync,
>;
#[derive(Default, Clone)]
pub struct MigrationHandlerHooks {
pub identity: Option<MigrationIdentityContext>,
pub post_restore: Option<PostRestoreCallback>,
pub pre_cleanup: Option<PreCleanupCallback>,
pub readiness: Option<ReadinessCallback>,
pub failure: Option<FailureCallback>,
}
pub struct MigrationSubprotocolHandler {
orchestrator: Arc<MigrationOrchestrator>,
source_handler: Arc<MigrationSourceHandler>,
target_handler: Arc<MigrationTargetHandler>,
local_node_id: u64,
reassemblers: DashMap<u64, Mutex<SnapshotReassembler>>,
identity_context: Option<MigrationIdentityContext>,
post_restore_callback: Option<PostRestoreCallback>,
pre_cleanup_callback: Option<PreCleanupCallback>,
readiness_callback: Option<ReadinessCallback>,
failure_callback: Option<FailureCallback>,
}
impl MigrationSubprotocolHandler {
pub fn new(
orchestrator: Arc<MigrationOrchestrator>,
source_handler: Arc<MigrationSourceHandler>,
target_handler: Arc<MigrationTargetHandler>,
local_node_id: u64,
) -> Self {
Self::with_hooks(
orchestrator,
source_handler,
target_handler,
local_node_id,
MigrationHandlerHooks::default(),
)
}
pub fn with_hooks(
orchestrator: Arc<MigrationOrchestrator>,
source_handler: Arc<MigrationSourceHandler>,
target_handler: Arc<MigrationTargetHandler>,
local_node_id: u64,
hooks: MigrationHandlerHooks,
) -> Self {
Self {
orchestrator,
source_handler,
target_handler,
local_node_id,
reassemblers: DashMap::new(),
identity_context: hooks.identity,
post_restore_callback: hooks.post_restore,
pre_cleanup_callback: hooks.pre_cleanup,
readiness_callback: hooks.readiness,
failure_callback: hooks.failure,
}
}
pub fn handle_message(
&self,
data: &[u8],
from_node: u64,
) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
let msg = wire::decode(data)?;
self.dispatch(msg, from_node)
}
fn dispatch(
&self,
msg: MigrationMessage,
from_node: u64,
) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
let mut outbound = Vec::new();
match msg {
MigrationMessage::TakeSnapshot {
daemon_origin,
target_node,
} => {
let mut snapshot =
self.source_handler
.start_snapshot(daemon_origin, target_node, from_node)?;
snapshot = match self.maybe_seal_envelope(snapshot, daemon_origin, target_node) {
Ok(s) => s,
Err(e) => {
let _ = self.source_handler.abort(daemon_origin);
let reason =
crate::adapter::net::compute::MigrationFailureReason::StateFailed(
format!("identity envelope seal failed: {e}"),
);
outbound.push(OutboundMigrationMessage {
dest_node: from_node,
payload: wire::encode(&MigrationMessage::MigrationFailed {
daemon_origin,
reason,
})?,
});
return Ok(outbound);
}
};
let snapshot_bytes = match snapshot.try_to_bytes() {
Ok(b) => b,
Err(e) => {
let _ = self.source_handler.abort(daemon_origin);
let reason =
crate::adapter::net::compute::MigrationFailureReason::StateFailed(
format!("snapshot serialization failed: {e}"),
);
outbound.push(OutboundMigrationMessage {
dest_node: from_node,
payload: wire::encode(&MigrationMessage::MigrationFailed {
daemon_origin,
reason,
})?,
});
return Ok(outbound);
}
};
let chunks = crate::adapter::net::compute::orchestrator::chunk_snapshot(
daemon_origin,
snapshot_bytes,
snapshot.through_seq,
)?;
let orch = self
.source_handler
.orchestrator_node(daemon_origin)
.unwrap_or(from_node);
for chunk in chunks {
outbound.push(OutboundMigrationMessage {
dest_node: orch,
payload: wire::encode(&chunk)?,
});
}
}
MigrationMessage::SnapshotReady {
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
} => {
if let Some(expected) = self.orchestrator.source_node(daemon_origin) {
if expected != from_node {
return Err(MigrationError::WrongPeer {
daemon_origin,
from: from_node,
expected,
});
}
} else if let Some(expected) = self.target_handler.orchestrator_node(daemon_origin)
{
if expected != from_node {
return Err(MigrationError::WrongPeer {
daemon_origin,
from: from_node,
expected,
});
}
} else if let Some(expected) = self
.target_handler
.factories()
.expected_orchestrator(daemon_origin)
{
if expected != from_node {
return Err(MigrationError::WrongPeer {
daemon_origin,
from: from_node,
expected,
});
}
}
let orch_target = self.orchestrator.target_node(daemon_origin);
match orch_target {
Some(target) if target == self.local_node_id => {
if let Err(e) = self.orchestrator.on_snapshot_ready(
daemon_origin,
snapshot_bytes.clone(),
seq_through,
chunk_index,
total_chunks,
) {
tracing::debug!(
?e,
origin = format!("{:#x}", daemon_origin),
"on_snapshot_ready (local target): ignored"
);
}
if let Some(out) = self.restore_on_target(
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
from_node,
)? {
outbound.extend(out);
}
}
Some(target) => {
let forward = self.orchestrator.on_snapshot_ready(
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
)?;
if let MigrationMessage::SnapshotReady { .. } = &forward {
outbound.push(OutboundMigrationMessage {
dest_node: target,
payload: wire::encode(&forward)?,
});
}
}
None => {
if let Some(out) = self.restore_on_target(
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
from_node,
)? {
outbound.extend(out);
}
}
}
}
MigrationMessage::RestoreComplete {
daemon_origin,
restored_seq,
} => {
let buffered_msg = self
.orchestrator
.on_restore_complete(daemon_origin, restored_seq)?
.unwrap_or(MigrationMessage::BufferedEvents {
daemon_origin,
events: Vec::new(),
});
outbound.push(OutboundMigrationMessage {
dest_node: from_node, payload: wire::encode(&buffered_msg)?,
});
}
MigrationMessage::ReplayComplete {
daemon_origin,
replayed_seq,
target_head,
} => {
let cutover_msg = self.orchestrator.on_replay_complete(
daemon_origin,
replayed_seq,
target_head,
)?;
if let MigrationMessage::CutoverNotify { .. } = &cutover_msg {
let source_node = self
.orchestrator
.source_node(daemon_origin)
.unwrap_or(from_node);
outbound.push(OutboundMigrationMessage {
dest_node: source_node,
payload: wire::encode(&cutover_msg)?,
});
}
}
MigrationMessage::CutoverNotify {
daemon_origin,
target_node,
} => {
let final_events = match self.source_handler.on_cutover(daemon_origin) {
Ok(events) => events,
Err(MigrationError::DaemonNotFound(_)) => Vec::new(),
Err(e) => return Err(e),
};
if !final_events.is_empty() {
let events_msg = MigrationMessage::BufferedEvents {
daemon_origin,
events: final_events,
};
outbound.push(OutboundMigrationMessage {
dest_node: target_node,
payload: wire::encode(&events_msg)?,
});
}
match self.orchestrator.on_cutover_acknowledged(daemon_origin) {
Ok(()) => {}
Err(MigrationError::DaemonNotFound(_)) => {}
Err(e) => return Err(e),
}
let dest = self
.source_handler
.orchestrator_node(daemon_origin)
.unwrap_or(from_node);
if let Some(cb) = &self.pre_cleanup_callback {
cb(daemon_origin);
}
let _ = self.source_handler.cleanup(daemon_origin);
let cleanup_msg = MigrationMessage::CleanupComplete { daemon_origin };
outbound.push(OutboundMigrationMessage {
dest_node: dest,
payload: wire::encode(&cleanup_msg)?,
});
}
MigrationMessage::CleanupComplete { daemon_origin } => {
if let Some(expected) = self.orchestrator.source_node(daemon_origin) {
if expected != from_node {
return Err(MigrationError::WrongPeer {
daemon_origin,
from: from_node,
expected,
});
}
}
let activate = self.orchestrator.on_cleanup_complete(daemon_origin)?;
let target = self
.orchestrator
.target_node(daemon_origin)
.unwrap_or(from_node);
outbound.push(OutboundMigrationMessage {
dest_node: target,
payload: wire::encode(&activate)?,
});
}
MigrationMessage::ActivateTarget { daemon_origin } => {
if let Some(expected) = self.target_handler.orchestrator_node(daemon_origin) {
if expected != from_node {
return Err(MigrationError::WrongPeer {
daemon_origin,
from: from_node,
expected,
});
}
}
let replayed_seq = self.target_handler.activate(daemon_origin)?;
let ack_dest = self
.target_handler
.orchestrator_node(daemon_origin)
.unwrap_or(from_node);
let ack = MigrationMessage::ActivateAck {
daemon_origin,
replayed_seq,
};
outbound.push(OutboundMigrationMessage {
dest_node: ack_dest,
payload: wire::encode(&ack)?,
});
let _ = self.target_handler.complete(daemon_origin);
}
MigrationMessage::ActivateAck {
daemon_origin,
replayed_seq,
} => {
self.orchestrator
.on_activate_ack(daemon_origin, replayed_seq)?;
}
MigrationMessage::MigrationFailed {
daemon_origin,
reason,
} => {
let recorded = [
self.orchestrator.source_node(daemon_origin),
self.orchestrator.target_node(daemon_origin),
self.source_handler.orchestrator_node(daemon_origin),
self.target_handler.orchestrator_node(daemon_origin),
];
let known = recorded.iter().any(|p| p.is_some());
if known && !recorded.contains(&Some(from_node)) {
return Err(MigrationError::WrongPeer {
daemon_origin,
from: from_node,
expected: recorded.iter().find_map(|p| *p).unwrap_or(0),
});
}
if let Some(cb) = &self.failure_callback {
cb(daemon_origin, reason.clone());
}
let _ = self.source_handler.abort(daemon_origin);
let _ = self.target_handler.abort(daemon_origin);
let _ = self
.orchestrator
.abort_migration_with_reason(daemon_origin, reason);
self.reassemblers.remove(&daemon_origin);
}
MigrationMessage::BufferedEvents {
daemon_origin,
events,
} => {
let replayed_seq = self.target_handler.replay_events(daemon_origin, events)?;
let target_head = self.target_handler.host_head_link(daemon_origin)?;
let reply = MigrationMessage::ReplayComplete {
daemon_origin,
replayed_seq,
target_head,
};
let dest = self
.target_handler
.orchestrator_node(daemon_origin)
.unwrap_or(from_node);
outbound.push(OutboundMigrationMessage {
dest_node: dest,
payload: wire::encode(&reply)?,
});
}
}
Ok(outbound)
}
fn restore_on_target(
&self,
daemon_origin: u64,
snapshot_bytes: Vec<u8>,
seq_through: u64,
chunk_index: u32,
total_chunks: u32,
from_node: u64,
) -> Result<Option<Vec<OutboundMigrationMessage>>, MigrationError> {
let reassembler_entry = self
.reassemblers
.entry(daemon_origin)
.or_insert_with(|| Mutex::new(SnapshotReassembler::new()));
let assembled = {
let mut reassembler = reassembler_entry.lock();
reassembler
.feed(
daemon_origin,
snapshot_bytes,
seq_through,
chunk_index,
total_chunks,
)
.map_err(|e| {
MigrationError::StateFailed(format!("snapshot reassembly failed: {:?}", e))
})?
};
drop(reassembler_entry);
let assembled_bytes = match assembled {
Some(bytes) => bytes,
None => return Ok(None), };
self.reassemblers.remove(&daemon_origin);
let snapshot = match StateSnapshot::from_bytes(&assembled_bytes) {
Some(s) => s,
None => {
return Ok(Some(self.fail_migration(
daemon_origin,
from_node,
"failed to parse snapshot bytes on target",
)?));
}
};
let source_node = self
.orchestrator
.source_node(daemon_origin)
.unwrap_or(from_node);
if !self.target_handler.is_migrating(daemon_origin) {
if let Some(readiness) = &self.readiness_callback {
if !readiness() {
return Ok(Some(self.fail_migration_with_reason(
daemon_origin,
from_node,
crate::adapter::net::compute::MigrationFailureReason::NotReady,
)?));
}
}
let inputs = match self.target_handler.factories().construct(daemon_origin) {
Some(i) => i,
None => {
return Ok(Some(self.fail_migration_with_reason(
daemon_origin,
from_node,
crate::adapter::net::compute::MigrationFailureReason::FactoryNotFound,
)?));
}
};
let keypair = match self.resolve_restore_keypair(&snapshot, inputs.keypair.as_ref()) {
Ok(kp) => kp,
Err(e) => {
return Ok(Some(self.fail_migration(
daemon_origin,
from_node,
&format!("identity envelope open failed: {e}"),
)?));
}
};
let daemon = inputs.daemon;
if let Err(e) = self.target_handler.restore_snapshot(
RestoreContext {
daemon_origin,
snapshot: &snapshot,
source_node,
orchestrator_node: from_node,
},
keypair,
move || daemon,
inputs.config,
) {
return Ok(Some(self.fail_migration(
daemon_origin,
from_node,
&format!("restore_snapshot failed: {:?}", e),
)?));
}
if let Some(cb) = &self.post_restore_callback {
cb(daemon_origin);
}
}
let reply = MigrationMessage::RestoreComplete {
daemon_origin,
restored_seq: seq_through,
};
let dest = self
.target_handler
.orchestrator_node(daemon_origin)
.unwrap_or(from_node);
Ok(Some(vec![OutboundMigrationMessage {
dest_node: dest,
payload: wire::encode(&reply)?,
}]))
}
fn maybe_seal_envelope(
&self,
snapshot: StateSnapshot,
daemon_origin: u64,
target_node: u64,
) -> Result<StateSnapshot, MigrationError> {
let Some(ctx) = &self.identity_context else {
return Ok(snapshot);
};
if snapshot.identity_envelope.is_some() {
return Ok(snapshot);
}
let Some(target_pub) = (ctx.peer_static_lookup)(target_node) else {
return Ok(snapshot);
};
let kp = match self
.source_handler_registry_keypair(daemon_origin)
.or_else(|| self.target_handler_registry_keypair(daemon_origin))
{
Some(kp) => kp,
None => return Ok(snapshot),
};
snapshot
.with_identity_envelope(&kp, target_pub)
.map_err(|e| {
MigrationError::StateFailed(format!(
"identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
))
})
}
fn source_handler_registry_keypair(&self, daemon_origin: u64) -> Option<EntityKeypair> {
let _ = daemon_origin;
self.orchestrator
.daemon_registry()
.daemon_keypair(daemon_origin)
}
fn target_handler_registry_keypair(&self, daemon_origin: u64) -> Option<EntityKeypair> {
self.orchestrator
.daemon_registry()
.daemon_keypair(daemon_origin)
}
fn resolve_restore_keypair(
&self,
snapshot: &StateSnapshot,
fallback: Option<&EntityKeypair>,
) -> Result<EntityKeypair, String> {
if let (Some(ctx), Some(_)) = (&self.identity_context, &snapshot.identity_envelope) {
return match (ctx.unseal_snapshot)(snapshot) {
Ok(Some(kp)) => Ok(kp),
Ok(None) => Err("identity envelope present on snapshot but \
`unseal_snapshot` returned Ok(None) — refusing to \
fall back to the pre-provisioned keypair; a \
present envelope mandates envelope-sourced \
identity transport"
.to_string()),
Err(e) => Err(format!("{e}")),
};
}
fallback.cloned().ok_or_else(|| {
"placeholder factory registered but snapshot has no \
identity envelope (and no local fallback keypair available)"
.to_string()
})
}
fn fail_migration(
&self,
daemon_origin: u64,
from_node: u64,
reason: &str,
) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
self.fail_migration_with_reason(
daemon_origin,
from_node,
crate::adapter::net::compute::MigrationFailureReason::StateFailed(reason.to_string()),
)
}
fn fail_migration_with_reason(
&self,
daemon_origin: u64,
from_node: u64,
reason: crate::adapter::net::compute::MigrationFailureReason,
) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
tracing::warn!(
daemon_origin = format!("{:#x}", daemon_origin),
reason = %reason,
"migration failed on target",
);
self.reassemblers.remove(&daemon_origin);
let _ = self.target_handler.abort(daemon_origin);
let msg = MigrationMessage::MigrationFailed {
daemon_origin,
reason,
};
Ok(vec![OutboundMigrationMessage {
dest_node: from_node,
payload: wire::encode(&msg)?,
}])
}
pub fn orchestrator(&self) -> &Arc<MigrationOrchestrator> {
&self.orchestrator
}
pub fn source_handler(&self) -> &Arc<MigrationSourceHandler> {
&self.source_handler
}
pub fn target_handler(&self) -> &Arc<MigrationTargetHandler> {
&self.target_handler
}
}
impl std::fmt::Debug for MigrationSubprotocolHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MigrationSubprotocolHandler")
.field("local_node_id", &format!("{:#x}", self.local_node_id))
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::capability::CapabilityFilter;
use crate::adapter::net::compute::orchestrator::wire;
use crate::adapter::net::compute::{
DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
MigrationOrchestrator, MigrationSourceHandler, MigrationTargetHandler,
};
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::state::causal::CausalEvent;
use bytes::Bytes;
#[test]
fn migration_identity_context_has_no_plaintext_secret_field_regression() {
use std::mem::size_of;
let fat_ptr = 2 * size_of::<usize>();
assert_eq!(
size_of::<MigrationIdentityContext>(),
2 * fat_ptr,
"MigrationIdentityContext must stay two Arc<dyn Fn ...> and \
nothing else — a size change means a new field was added, \
most likely re-exposing the Noise static private key the \
way `local_x25519_priv: [u8; 32]` used to.",
);
}
struct TestDaemon {
value: u64,
}
impl MeshDaemon for TestDaemon {
fn name(&self) -> &str {
"test"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
self.value += 1;
Ok(vec![])
}
fn snapshot(&self) -> Option<Bytes> {
Some(Bytes::from(self.value.to_le_bytes().to_vec()))
}
fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
self.value = u64::from_le_bytes(state[..8].try_into().unwrap());
Ok(())
}
}
fn setup() -> (MigrationSubprotocolHandler, Arc<DaemonRegistry>, u64) {
let reg = Arc::new(DaemonRegistry::new());
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let host = DaemonHost::new(
Box::new(TestDaemon { value: 100 }),
kp,
DaemonHostConfig::default(),
);
reg.register(host).unwrap();
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
let handler = MigrationSubprotocolHandler::new(orch, source, target, 0x1111);
(handler, reg, origin)
}
#[test]
fn test_handle_take_snapshot() {
let (handler, _reg, origin) = setup();
let msg = MigrationMessage::TakeSnapshot {
daemon_origin: origin,
target_node: 0x2222,
};
let encoded = wire::encode(&msg).unwrap();
let outbound = handler.handle_message(&encoded, 0x3333).unwrap();
assert!(!outbound.is_empty());
let reply = wire::decode(&outbound[0].payload).unwrap();
match reply {
MigrationMessage::SnapshotReady { daemon_origin, .. } => {
assert_eq!(daemon_origin, origin);
}
_ => panic!("expected SnapshotReady"),
}
}
#[test]
fn maybe_seal_envelope_propagates_seal_failures() {
use crate::adapter::net::identity::IdentityEnvelope;
use crate::adapter::net::state::snapshot::StateSnapshot;
use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
let mut seed = [0u8; 32];
getrandom::fill(&mut seed).unwrap();
let target_priv = X25519Secret::from(seed);
let target_pub = *X25519Pub::from(&target_priv).as_bytes();
let target_node_id: u64 = 0x2222;
let real_kp = EntityKeypair::generate();
let origin_hash = real_kp.origin_hash();
let public_only_kp = EntityKeypair::public_only(real_kp.entity_id().clone());
assert!(
public_only_kp.is_read_only(),
"fixture: must be public-only",
);
let reg = Arc::new(DaemonRegistry::new());
let host = DaemonHost::new(
Box::new(TestDaemon { value: 0 }),
public_only_kp,
DaemonHostConfig::default(),
);
reg.register(host).unwrap();
let snapshot = StateSnapshot {
version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
entity_id: real_kp.entity_id().clone(),
chain_link: crate::adapter::net::state::causal::CausalLink {
origin_hash,
horizon_encoded: 0,
sequence: 0,
parent_hash: 0,
},
through_seq: 0,
state: Bytes::from_static(&[0u8; 8]),
horizon: Default::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: None,
};
let unseal_snapshot: EnvelopeUnsealFn =
Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&target_priv));
let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
Arc::new(move |nid| {
if nid == target_node_id {
Some(target_pub)
} else {
None
}
});
let ctx = MigrationIdentityContext {
unseal_snapshot,
peer_static_lookup,
};
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg));
let handler = MigrationSubprotocolHandler::with_hooks(
orch,
source,
target,
0x1111,
MigrationHandlerHooks {
identity: Some(ctx),
..Default::default()
},
);
let err = handler
.maybe_seal_envelope(snapshot, origin_hash, target_node_id)
.expect_err(
"public-only daemon keypair must fail to seal; silently returning the \
unsealed snapshot breaks the identity-transport guarantee",
);
match err {
MigrationError::StateFailed(ref msg) => {
assert!(
msg.contains("envelope seal failed"),
"expected 'envelope seal failed' in message, got: {msg}",
);
assert!(
msg.contains(&format!("{origin_hash:#x}")),
"expected origin_hash in message, got: {msg}",
);
}
other => panic!("expected StateFailed, got {other:?}"),
}
let handler_no_ctx = MigrationSubprotocolHandler::new(
Arc::new(MigrationOrchestrator::new(
Arc::new(DaemonRegistry::new()),
0x1111,
)),
Arc::new(MigrationSourceHandler::new(Arc::new(DaemonRegistry::new()))),
Arc::new(MigrationTargetHandler::new(Arc::new(DaemonRegistry::new()))),
0x1111,
);
let snapshot2 = StateSnapshot {
version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
entity_id: real_kp.entity_id().clone(),
chain_link: crate::adapter::net::state::causal::CausalLink {
origin_hash,
horizon_encoded: 0,
sequence: 0,
parent_hash: 0,
},
through_seq: 0,
state: Bytes::from_static(&[0u8; 8]),
horizon: Default::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: None,
};
let passthrough = handler_no_ctx
.maybe_seal_envelope(snapshot2, origin_hash, target_node_id)
.expect("no ctx = ok unchanged");
assert!(passthrough.identity_envelope.is_none());
let _ = IdentityEnvelope::new; }
#[test]
fn take_snapshot_seal_failure_emits_migration_failed_reply() {
use crate::adapter::net::state::snapshot::StateSnapshot;
use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
let mut x25519_seed = [0u8; 32];
getrandom::fill(&mut x25519_seed).unwrap();
let target_priv = X25519Secret::from(x25519_seed);
let target_pub = *X25519Pub::from(&target_priv).as_bytes();
let target_node_id: u64 = 0x2222;
let orchestrator_node_id: u64 = 0x3333;
let real_kp = EntityKeypair::generate();
let origin = real_kp.origin_hash();
let public_only_kp = EntityKeypair::public_only(real_kp.entity_id().clone());
let reg = Arc::new(DaemonRegistry::new());
let host = DaemonHost::new(
Box::new(TestDaemon { value: 7 }),
public_only_kp,
DaemonHostConfig::default(),
);
reg.register(host).unwrap();
let unseal: EnvelopeUnsealFn =
Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&target_priv));
let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
Arc::new(move |nid| {
if nid == target_node_id {
Some(target_pub)
} else {
None
}
});
let ctx = MigrationIdentityContext {
unseal_snapshot: unseal,
peer_static_lookup,
};
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg));
let handler = MigrationSubprotocolHandler::with_hooks(
orch,
source.clone(),
target,
0x1111,
MigrationHandlerHooks {
identity: Some(ctx),
..Default::default()
},
);
let msg = MigrationMessage::TakeSnapshot {
daemon_origin: origin,
target_node: target_node_id,
};
let encoded = wire::encode(&msg).unwrap();
let outbound = handler
.handle_message(&encoded, orchestrator_node_id)
.expect("seal failure must not bubble up as dispatcher error");
assert_eq!(
outbound.len(),
1,
"expected single MigrationFailed reply, got {} messages",
outbound.len(),
);
assert_eq!(outbound[0].dest_node, orchestrator_node_id);
let reply = wire::decode(&outbound[0].payload).unwrap();
match reply {
MigrationMessage::MigrationFailed {
daemon_origin,
reason,
} => {
assert_eq!(daemon_origin, origin);
let reason_msg = format!("{reason}");
assert!(
reason_msg.contains("identity envelope seal failed"),
"expected seal-failure reason, got: {reason_msg}",
);
}
other => panic!("expected MigrationFailed, got {other:?}"),
}
assert!(
source.phase(origin).is_none(),
"source_handler must have cleared its record for {origin:#x} after a failed TakeSnapshot",
);
}
#[test]
fn test_handle_migration_failed() {
let (handler, _reg, origin) = setup();
let msg = MigrationMessage::MigrationFailed {
daemon_origin: origin,
reason: crate::adapter::net::compute::MigrationFailureReason::StateFailed(
"test failure".into(),
),
};
let encoded = wire::encode(&msg).unwrap();
let outbound = handler.handle_message(&encoded, 0x3333).unwrap();
assert!(outbound.is_empty());
}
#[test]
fn envelope_keypair_overrides_fallback_placeholder() {
use crate::adapter::net::identity::IdentityEnvelope;
use crate::adapter::net::state::causal::CausalLink;
use crate::adapter::net::state::snapshot::StateSnapshot;
use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
let mut seed = [0u8; 32];
getrandom::fill(&mut seed).unwrap();
let target_priv = X25519Secret::from(seed);
let target_pub = *X25519Pub::from(&target_priv).as_bytes();
let real_kp = EntityKeypair::generate();
let wrong_fallback = EntityKeypair::generate();
assert_ne!(
real_kp.entity_id(),
wrong_fallback.entity_id(),
"fixture: real and fallback must differ",
);
let chain_link = CausalLink {
origin_hash: real_kp.origin_hash(),
horizon_encoded: 0,
sequence: 0,
parent_hash: 0,
};
let envelope =
IdentityEnvelope::new(&real_kp, target_pub, &chain_link).expect("seal envelope");
let snapshot = StateSnapshot {
version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
entity_id: real_kp.entity_id().clone(),
chain_link,
through_seq: 0,
state: Bytes::new(),
horizon: Default::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: Some(envelope),
head_payload: None,
};
let priv_for_closure = target_priv.clone();
let unseal_snapshot: EnvelopeUnsealFn =
Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&priv_for_closure));
let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
Arc::new(|_| None);
let ctx = MigrationIdentityContext {
unseal_snapshot,
peer_static_lookup,
};
let reg = Arc::new(DaemonRegistry::new());
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg));
let handler = MigrationSubprotocolHandler::with_hooks(
orch,
source,
target,
0x1111,
MigrationHandlerHooks {
identity: Some(ctx),
..Default::default()
},
);
let resolved = handler
.resolve_restore_keypair(&snapshot, Some(&wrong_fallback))
.expect("resolve");
assert_eq!(
resolved.entity_id(),
real_kp.entity_id(),
"envelope's keypair must win over the pre-provisioned fallback — \
flipping this order silently downgrades identity transport to \
whatever the factory registry was pre-populated with",
);
assert_ne!(
resolved.entity_id(),
wrong_fallback.entity_id(),
"fallback must NOT leak through when the envelope is valid",
);
let snapshot_no_envelope = StateSnapshot {
identity_envelope: None,
head_payload: None,
..snapshot.clone()
};
let resolved_fallback = handler
.resolve_restore_keypair(&snapshot_no_envelope, Some(&wrong_fallback))
.expect("resolve with fallback only");
assert_eq!(resolved_fallback.entity_id(), wrong_fallback.entity_id());
}
#[test]
fn envelope_present_but_unseal_returns_none_fails_rather_than_fallback() {
use crate::adapter::net::identity::IdentityEnvelope;
use crate::adapter::net::state::snapshot::StateSnapshot;
use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
let mut seed = [0u8; 32];
getrandom::fill(&mut seed).unwrap();
let target_priv = X25519Secret::from(seed);
let target_pub = *X25519Pub::from(&target_priv).as_bytes();
let real_kp = EntityKeypair::generate();
let chain_link = crate::adapter::net::state::causal::CausalLink {
origin_hash: real_kp.origin_hash(),
horizon_encoded: 0,
sequence: 0,
parent_hash: 0,
};
let envelope =
IdentityEnvelope::new(&real_kp, target_pub, &chain_link).expect("seal envelope");
let snapshot = StateSnapshot {
version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
entity_id: real_kp.entity_id().clone(),
chain_link,
through_seq: 0,
state: Bytes::new(),
horizon: Default::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: Some(envelope),
head_payload: None,
};
let unseal_snapshot: EnvelopeUnsealFn = Arc::new(|_snap: &StateSnapshot| Ok(None));
let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
Arc::new(|_| None);
let ctx = MigrationIdentityContext {
unseal_snapshot,
peer_static_lookup,
};
let reg = Arc::new(DaemonRegistry::new());
let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
let target = Arc::new(MigrationTargetHandler::new(reg));
let handler = MigrationSubprotocolHandler::with_hooks(
orch,
source,
target,
0x1111,
MigrationHandlerHooks {
identity: Some(ctx),
..Default::default()
},
);
let wrong_fallback = EntityKeypair::generate();
let err = handler
.resolve_restore_keypair(&snapshot, Some(&wrong_fallback))
.expect_err(
"envelope present + unseal Ok(None) must fail; silently \
returning the fallback downgrades identity transport",
);
assert!(
err.contains("refusing to fall back"),
"expected 'refusing to fall back' in error message, got: {err}",
);
}
#[test]
fn cr24_no_per_daemon_migration_coupling_in_standby_or_capability() {
let standby_src = include_str!("../compute/standby_group.rs");
let capability_src = include_str!("../behavior/capability.rs");
fn strip_comments(src: &str) -> String {
let bytes = src.as_bytes();
let mut out = Vec::with_capacity(bytes.len());
let mut i = 0;
while i < bytes.len() {
if i + 1 < bytes.len() && bytes[i] == b'/' && bytes[i + 1] == b'*' {
i += 2;
while i + 1 < bytes.len() && !(bytes[i] == b'*' && bytes[i + 1] == b'/') {
if bytes[i] == b'\n' {
out.push(b'\n');
}
i += 1;
}
if i + 1 < bytes.len() {
i += 2; }
continue;
}
out.push(bytes[i]);
i += 1;
}
String::from_utf8_lossy(&out).into_owned()
}
let capability_clean = strip_comments(capability_src);
let standby_clean = strip_comments(standby_src);
let capability_uses_daemon_origin = capability_clean.lines().any(|line| {
let trimmed = line.trim_start();
!trimmed.starts_with("//") && trimmed.contains("daemon_origin")
});
assert!(
!capability_uses_daemon_origin,
"CR-24 regression: CapabilityIndex now references `daemon_origin` in \
non-comment source. The audit's CR-24 concern was that capabilities \
tied to a migrating daemon need teardown on MigrationFailed. With \
this new coupling the migration_handler MUST call \
`capability_index.cleanup_origin(daemon_origin)` (or equivalent) \
in the MigrationFailed arm. Add the call AND update this test."
);
let standby_has_pending = standby_clean.lines().any(|line| {
let trimmed = line.trim_start();
if trimmed.starts_with("//") {
return false;
}
trimmed.contains("pending_promotion")
|| trimmed.contains("migration_in_flight")
|| trimmed.contains("in_migration:")
});
assert!(
!standby_has_pending,
"CR-24 regression: StandbyGroup now has a pending-promotion or \
in-migration field. The audit's CR-24 concern was that a mid- \
flight standby promotion needs teardown on MigrationFailed. With \
this new coupling the migration_handler MUST call rollback on \
StandbyGroup in the MigrationFailed arm. Add the rollback call \
AND update this test."
);
}
}