net/adapter/net/subprotocol/migration_handler.rs
1//! Migration subprotocol message handler.
2//!
3//! Dispatches inbound migration messages (subprotocol 0x0500) to the
4//! appropriate handler: orchestrator, source, or target.
5
6use std::sync::Arc;
7
8use dashmap::DashMap;
9use parking_lot::Mutex;
10
11use crate::adapter::net::compute::migration_target::RestoreContext;
12use crate::adapter::net::compute::orchestrator::wire;
13use crate::adapter::net::compute::{
14 MigrationError, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
15 MigrationTargetHandler, SnapshotReassembler,
16};
17use crate::adapter::net::identity::EntityKeypair;
18use crate::adapter::net::state::snapshot::StateSnapshot;
19
20/// Identity-transport context for automatic envelope seal/open in
21/// the migration dispatcher.
22///
23/// When populated, the handler:
24/// - **Source path**: after taking a snapshot, if the target's
25/// X25519 static pub is known (via `peer_static_lookup`) AND the
26/// local daemon's keypair is available, seal the envelope into
27/// the snapshot before chunking.
28/// - **Target path**: on `SnapshotReady` with an attached envelope,
29/// call `unseal_snapshot` to recover the daemon's keypair, and
30/// pass that into `restore_snapshot` instead of whatever the
31/// factory registry has pre-registered.
32///
33/// A `None` context means the dispatcher ignores envelopes — the
34/// pre-identity-envelope fallback path where both nodes pre-register
35/// matching keypairs.
36///
37/// # Key hygiene
38///
39/// This struct used to carry the local Noise static private key as a
40/// `pub [u8; 32]` field. Any SDK caller with access to a
41/// `MigrationIdentityContext` could copy the node's long-term secret
42/// out, which is unacceptable — the Noise static is what backs the
43/// node's identity in the mesh, not just the envelope-open path.
44///
45/// The private key is now captured inside the `unseal_snapshot`
46/// closure (built by [`MeshNode::migration_identity_context`](crate::adapter::net::MeshNode::migration_identity_context)) and
47/// never surfaced as a struct field. Callers can still hand the
48/// context to the dispatcher, but they cannot extract the key from
49/// it. This matches how `peer_static_lookup` already worked.
50#[derive(Clone)]
51pub struct MigrationIdentityContext {
52 /// Open an identity envelope attached to `snapshot`, if present,
53 /// using the local static X25519 private key captured at
54 /// construction time. Returns `Ok(None)` when the snapshot has
55 /// no envelope, `Ok(Some(kp))` on a successful open, and an
56 /// error string on seal-open / attestation failure.
57 ///
58 /// Built by [`MeshNode::migration_identity_context`](crate::adapter::net::MeshNode::migration_identity_context); the
59 /// closure owns a `zeroize`-on-drop `StaticSecret` so the key
60 /// material is wiped when the context is dropped.
61 pub unseal_snapshot: EnvelopeUnsealFn,
62 /// Callback: given a peer node_id, return its X25519 static
63 /// public key if we have an active session with it. Used by the
64 /// source path to pick the seal recipient.
65 pub peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync>,
66}
67
68impl std::fmt::Debug for MigrationIdentityContext {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("MigrationIdentityContext")
71 .field("unseal_snapshot", &"<fn>")
72 .field("peer_static_lookup", &"<fn>")
73 .finish()
74 }
75}
76
77/// Outbound message with destination node.
78#[derive(Debug)]
79pub struct OutboundMigrationMessage {
80 /// Destination node ID.
81 pub dest_node: u64,
82 /// Encoded wire message.
83 pub payload: Vec<u8>,
84}
85
86/// Handles inbound migration subprotocol messages.
87///
88/// Routes each message type to the orchestrator, source handler, or target
89/// handler as appropriate, and produces outbound response messages.
90/// Callback fired after the target-side dispatcher successfully
91/// restores a daemon from a migration snapshot. Invoked with the
92/// daemon's `origin_hash`. Used by the SDK to drive channel
93/// re-bind replay (`DAEMON_CHANNEL_REBIND_PLAN.md` Stage 3): the
94/// callback walks the restored daemon's subscription ledger and
95/// spawns asynchronous `subscribe_channel` calls so publishers
96/// start fanning out to the target before the source tears down.
97///
98/// The callback runs synchronously on the dispatcher thread; it
99/// should kick off any async work via `tokio::spawn` rather than
100/// blocking the dispatch loop.
101pub type PostRestoreCallback = Arc<dyn Fn(u64) + Send + Sync>;
102
103/// Callback fired on the source side at `CutoverNotify` handling,
104/// immediately before `source_handler.cleanup` unregisters the
105/// daemon. Stage 4 of `DAEMON_CHANNEL_REBIND_PLAN.md`: the SDK's
106/// hook snapshots the daemon's subscription ledger here and spawns
107/// async `unsubscribe_channel` calls to each publisher so rosters
108/// drop the source immediately rather than aging out over the
109/// session-timeout window.
110///
111/// Sync on the dispatcher thread; async work must `tokio::spawn`.
112pub type PreCleanupCallback = Arc<dyn Fn(u64) + Send + Sync>;
113
114/// Readiness predicate — returns `true` when the local runtime is
115/// prepared to accept inbound migrations (target path). When
116/// populated and the predicate returns `false`, the dispatcher
117/// responds to `SnapshotReady` with
118/// [`MigrationFailureReason::NotReady`](crate::adapter::net::compute::MigrationFailureReason::NotReady)
119/// so the source can retry with backoff rather than surfacing a
120/// terminal error while the target is still booting.
121///
122/// The callback is consulted synchronously on the dispatcher
123/// thread — it must return quickly.
124pub type ReadinessCallback = Arc<dyn Fn() -> bool + Send + Sync>;
125
126/// Callback fired on the source side whenever the dispatcher
127/// observes an inbound `MigrationFailed` message. The SDK uses this
128/// to surface the structured reason code to the
129/// [`MigrationHandle::wait`](crate) caller so they can distinguish
130/// retriable from terminal failures.
131///
132/// Sync on the dispatcher thread.
133pub type FailureCallback =
134 Arc<dyn Fn(u64, crate::adapter::net::compute::MigrationFailureReason) + Send + Sync>;
135
136/// Callback that opens an identity envelope carried on a
137/// [`crate::adapter::net::state::snapshot::StateSnapshot`], using a
138/// local private key captured inside the closure. See
139/// [`MigrationIdentityContext`] for key-hygiene rationale.
140pub type EnvelopeUnsealFn = Arc<
141 dyn Fn(
142 &crate::adapter::net::state::snapshot::StateSnapshot,
143 ) -> Result<Option<EntityKeypair>, crate::adapter::net::identity::EnvelopeError>
144 + Send
145 + Sync,
146>;
147
148/// Optional cross-cutting hooks consumed by
149/// [`MigrationSubprotocolHandler::with_hooks`]. Each field is
150/// independently opt-in so a test that cares about one hook doesn't
151/// have to fabricate the others; `Default` = nothing wired.
152///
153/// | Field | Side | Purpose |
154/// |-------------------|----------|--------------------------------------------|
155/// | `identity` | both | auto-seal / auto-open identity envelopes |
156/// | `post_restore` | target | kick off channel re-bind replay |
157/// | `pre_cleanup` | source | source-side Unsubscribe teardown |
158/// | `readiness` | target | gate inbound migrations on runtime state |
159/// | `failure` | source | surface structured reason to SDK caller |
160#[derive(Default, Clone)]
161pub struct MigrationHandlerHooks {
162 /// Identity-transport context. `None` = envelopes ignored
163 /// (pre-identity-envelope fallback).
164 pub identity: Option<MigrationIdentityContext>,
165 /// Target-side post-restore callback — drives subscription
166 /// replay.
167 pub post_restore: Option<PostRestoreCallback>,
168 /// Source-side pre-cleanup callback — drives Unsubscribe
169 /// teardown.
170 pub pre_cleanup: Option<PreCleanupCallback>,
171 /// Target-side readiness predicate — returns `false` to reply
172 /// `NotReady` instead of attempting restore.
173 pub readiness: Option<ReadinessCallback>,
174 /// Source-side failure observer — surfaces structured reason
175 /// codes to the SDK.
176 pub failure: Option<FailureCallback>,
177}
178
179/// Dispatcher for migration subprotocol (`0x0500`) messages.
180///
181/// Wraps the three handler halves — orchestrator, source, target —
182/// plus the optional cross-cutting hooks that let the SDK drive
183/// identity-envelope seal/open, channel-re-bind replay,
184/// source-side Unsubscribe teardown, runtime-readiness gating, and
185/// source-side failure observation. Constructed by the SDK's
186/// `DaemonRuntime::start` via [`Self::with_hooks`]; tests use
187/// [`Self::new`] when they don't need any hooks.
188///
189/// Install onto a `MeshNode` via `MeshNode::set_migration_handler`.
190pub struct MigrationSubprotocolHandler {
191 orchestrator: Arc<MigrationOrchestrator>,
192 source_handler: Arc<MigrationSourceHandler>,
193 target_handler: Arc<MigrationTargetHandler>,
194 local_node_id: u64,
195 /// Per-target reassemblers for incoming snapshot chunks. One entry
196 /// per in-flight inbound migration. Lazily created on first chunk,
197 /// torn down after successful restore or failure.
198 ///
199 /// Wrapped in `Mutex` because `SnapshotReassembler::feed` requires
200 /// `&mut self`.
201 reassemblers: DashMap<u64, Mutex<SnapshotReassembler>>,
202 /// Identity-transport context. `None` = envelopes ignored
203 /// (pre-identity-envelope fallback).
204 identity_context: Option<MigrationIdentityContext>,
205 /// Post-restore callback, fired on the target side after
206 /// `restore_snapshot` succeeds. Used by the SDK to drive
207 /// subscription replay. `None` = no hook (used by tests and
208 /// pre-Stage-3 callers).
209 post_restore_callback: Option<PostRestoreCallback>,
210 /// Pre-cleanup callback, fired on the source side at
211 /// `CutoverNotify` handling just before the daemon is
212 /// unregistered. Drives source-side Unsubscribe teardown
213 /// (Stage 4 of the channel re-bind plan). `None` = no hook.
214 pre_cleanup_callback: Option<PreCleanupCallback>,
215 /// Target-side readiness predicate. When `Some` and returns
216 /// `false`, the dispatcher responds to inbound migration
217 /// attempts with `NotReady` instead of attempting restore.
218 /// Drives the runtime-readiness retry path
219 /// (`DAEMON_RUNTIME_READINESS_PLAN.md` Stage 3). `None` = always
220 /// treat target as ready (pre-Stage-3 behaviour).
221 readiness_callback: Option<ReadinessCallback>,
222 /// Source-side failure observer — fired when the dispatcher
223 /// receives an inbound `MigrationFailed` message. Lets the SDK
224 /// hand the structured reason to the caller via
225 /// [`MigrationHandle::wait`] rather than swallowing it at the
226 /// dispatcher layer. `None` = no hook; failures are still
227 /// processed (orchestrator aborted) but the SDK can't
228 /// distinguish retriable (NotReady) from terminal.
229 failure_callback: Option<FailureCallback>,
230}
231
232impl MigrationSubprotocolHandler {
233 /// Create a new handler with no hooks wired. Envelopes on
234 /// inbound snapshots are ignored; outbound snapshots don't
235 /// carry an envelope; readiness is treated as always-ready;
236 /// no failure observer; no channel-re-bind callbacks. Matches
237 /// the pre-Stage-5b behaviour and is the right shape for tests
238 /// that only need the bare three-handler dispatcher.
239 pub fn new(
240 orchestrator: Arc<MigrationOrchestrator>,
241 source_handler: Arc<MigrationSourceHandler>,
242 target_handler: Arc<MigrationTargetHandler>,
243 local_node_id: u64,
244 ) -> Self {
245 Self::with_hooks(
246 orchestrator,
247 source_handler,
248 target_handler,
249 local_node_id,
250 MigrationHandlerHooks::default(),
251 )
252 }
253
254 /// Create a handler with an explicit [`MigrationHandlerHooks`]
255 /// bundle. Each hook field is independently optional; the SDK
256 /// supplies all five at once from `DaemonRuntime::start`, tests
257 /// populate only the subset they need.
258 pub fn with_hooks(
259 orchestrator: Arc<MigrationOrchestrator>,
260 source_handler: Arc<MigrationSourceHandler>,
261 target_handler: Arc<MigrationTargetHandler>,
262 local_node_id: u64,
263 hooks: MigrationHandlerHooks,
264 ) -> Self {
265 Self {
266 orchestrator,
267 source_handler,
268 target_handler,
269 local_node_id,
270 reassemblers: DashMap::new(),
271 identity_context: hooks.identity,
272 post_restore_callback: hooks.post_restore,
273 pre_cleanup_callback: hooks.pre_cleanup,
274 readiness_callback: hooks.readiness,
275 failure_callback: hooks.failure,
276 }
277 }
278
279 /// Handle an inbound migration message.
280 ///
281 /// Returns zero or more outbound messages to send to other nodes.
282 pub fn handle_message(
283 &self,
284 data: &[u8],
285 from_node: u64,
286 ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
287 let msg = wire::decode(data)?;
288 self.dispatch(msg, from_node)
289 }
290
291 /// Dispatch a decoded message to the appropriate handler.
292 fn dispatch(
293 &self,
294 msg: MigrationMessage,
295 from_node: u64,
296 ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
297 let mut outbound = Vec::new();
298
299 match msg {
300 MigrationMessage::TakeSnapshot {
301 daemon_origin,
302 target_node,
303 } => {
304 // We are the source — take snapshot and reply.
305 // Record `from_node` as the orchestrator: it's the node
306 // that sent us TakeSnapshot, and replies (SnapshotReady,
307 // CleanupComplete) must reach it. The source-side handler
308 // stores this so subsequent replies don't drift if a
309 // future forwarding layer rewrites `from_node`.
310 let mut snapshot =
311 self.source_handler
312 .start_snapshot(daemon_origin, target_node, from_node)?;
313
314 // Identity envelope: if we have a transport context
315 // and can find the target's X25519 pubkey, seal the
316 // daemon's keypair into the snapshot before chunking
317 // so the target can reconstruct identity without an
318 // out-of-band pre-registration.
319 //
320 // Seal failure needs a wire reply, not just `?`. The
321 // orchestrator (remote) is blocked waiting for this
322 // node's `SnapshotReady`; bailing with a dispatcher
323 // error would leave it waiting forever and leave our
324 // own `source_handler.start_snapshot` state dangling
325 // for `daemon_origin`. Convert to a `MigrationFailed`
326 // reply — the orchestrator consumes it, the SDK
327 // surfaces the reason, retry semantics kick in if
328 // applicable.
329 snapshot = match self.maybe_seal_envelope(snapshot, daemon_origin, target_node) {
330 Ok(s) => s,
331 Err(e) => {
332 let _ = self.source_handler.abort(daemon_origin);
333 let reason =
334 crate::adapter::net::compute::MigrationFailureReason::StateFailed(
335 format!("identity envelope seal failed: {e}"),
336 );
337 outbound.push(OutboundMigrationMessage {
338 dest_node: from_node,
339 payload: wire::encode(&MigrationMessage::MigrationFailed {
340 daemon_origin,
341 reason,
342 })?,
343 });
344 return Ok(outbound);
345 }
346 };
347
348 // Chunk the snapshot for transport
349 //
350 // Oversized state/bindings surfaces as a
351 // `MigrationFailed` reply (StateFailed) rather than
352 // a panic in the dispatch task. The orchestrator
353 // consumes the reply and retry semantics kick in,
354 // the same shape as `maybe_seal_envelope` failure
355 // above.
356 let snapshot_bytes = match snapshot.try_to_bytes() {
357 Ok(b) => b,
358 Err(e) => {
359 let _ = self.source_handler.abort(daemon_origin);
360 let reason =
361 crate::adapter::net::compute::MigrationFailureReason::StateFailed(
362 format!("snapshot serialization failed: {e}"),
363 );
364 outbound.push(OutboundMigrationMessage {
365 dest_node: from_node,
366 payload: wire::encode(&MigrationMessage::MigrationFailed {
367 daemon_origin,
368 reason,
369 })?,
370 });
371 return Ok(outbound);
372 }
373 };
374 let chunks = crate::adapter::net::compute::orchestrator::chunk_snapshot(
375 daemon_origin,
376 snapshot_bytes,
377 snapshot.through_seq,
378 )?;
379 let orch = self
380 .source_handler
381 .orchestrator_node(daemon_origin)
382 .unwrap_or(from_node);
383 for chunk in chunks {
384 outbound.push(OutboundMigrationMessage {
385 dest_node: orch,
386 payload: wire::encode(&chunk)?,
387 });
388 }
389 }
390
391 MigrationMessage::SnapshotReady {
392 daemon_origin,
393 snapshot_bytes,
394 seq_through,
395 chunk_index,
396 total_chunks,
397 } => {
398 // Peer-auth gate. SnapshotReady is always source→
399 // {orchestrator,target}. The orchestrator records
400 // source_node at start_migration time; the target's
401 // local record (if any) keeps the orchestrator
402 // binding from its own start path. Reject if the
403 // sender doesn't match the recorded principal for
404 // whichever role we are in.
405 //
406 // Third-tier fallback closes a TOFU window: when
407 // neither orchestrator-side nor target-side state
408 // has a record (the orchestrator lives on a remote
409 // node and we've not yet received any messages for
410 // this origin), the first `SnapshotReady` would
411 // otherwise bind whoever sent it as the orchestrator
412 // inside `restore_on_target`. Operators who know
413 // the orchestrator out-of-band can pre-bind it via
414 // `DaemonFactoryRegistry::bind_expected_orchestrator`;
415 // when bound, a mismatching sender is rejected
416 // here, before `restore_on_target` records them.
417 if let Some(expected) = self.orchestrator.source_node(daemon_origin) {
418 if expected != from_node {
419 return Err(MigrationError::WrongPeer {
420 daemon_origin,
421 from: from_node,
422 expected,
423 });
424 }
425 } else if let Some(expected) = self.target_handler.orchestrator_node(daemon_origin)
426 {
427 if expected != from_node {
428 return Err(MigrationError::WrongPeer {
429 daemon_origin,
430 from: from_node,
431 expected,
432 });
433 }
434 } else if let Some(expected) = self
435 .target_handler
436 .factories()
437 .expected_orchestrator(daemon_origin)
438 {
439 if expected != from_node {
440 return Err(MigrationError::WrongPeer {
441 daemon_origin,
442 from: from_node,
443 expected,
444 });
445 }
446 }
447 // If the orchestrator is local, let it record this chunk and
448 // forward to target. `target_node` identifies where the
449 // snapshot should be restored; if that's us, we reassemble
450 // and restore instead of forwarding.
451 let orch_target = self.orchestrator.target_node(daemon_origin);
452
453 match orch_target {
454 Some(target) if target == self.local_node_id => {
455 // We are the target — advance orchestrator state
456 // (safe: on_snapshot_ready is idempotent on the
457 // target side because it just re-derives the
458 // forward message we ignore), then reassemble.
459 //
460 // Errors are informational: the restore path below
461 // is the authoritative check for whether the
462 // snapshot is usable. We log at `debug` rather
463 // than ignore so non-idempotent failures (e.g.,
464 // unexpected phase transitions on a stale record)
465 // are observable during triage.
466 if let Err(e) = self.orchestrator.on_snapshot_ready(
467 daemon_origin,
468 snapshot_bytes.clone(),
469 seq_through,
470 chunk_index,
471 total_chunks,
472 ) {
473 tracing::debug!(
474 ?e,
475 origin = format!("{:#x}", daemon_origin),
476 "on_snapshot_ready (local target): ignored"
477 );
478 }
479
480 if let Some(out) = self.restore_on_target(
481 daemon_origin,
482 snapshot_bytes,
483 seq_through,
484 chunk_index,
485 total_chunks,
486 from_node,
487 )? {
488 outbound.extend(out);
489 }
490 }
491 Some(target) => {
492 // Middle of the chain (or orchestrator node forwarding
493 // to a remote target). Let the orchestrator update its
494 // own phase state and emit the forward.
495 let forward = self.orchestrator.on_snapshot_ready(
496 daemon_origin,
497 snapshot_bytes,
498 seq_through,
499 chunk_index,
500 total_chunks,
501 )?;
502 if let MigrationMessage::SnapshotReady { .. } = &forward {
503 outbound.push(OutboundMigrationMessage {
504 dest_node: target,
505 payload: wire::encode(&forward)?,
506 });
507 }
508 }
509 None => {
510 // No local migration record — this node may be a
511 // target that has no orchestrator-side state (the
512 // orchestrator lives on a different node). Try to
513 // restore anyway; the factory registry is the
514 // authority on whether this node should accept.
515 if let Some(out) = self.restore_on_target(
516 daemon_origin,
517 snapshot_bytes,
518 seq_through,
519 chunk_index,
520 total_chunks,
521 from_node,
522 )? {
523 outbound.extend(out);
524 }
525 }
526 }
527 }
528
529 MigrationMessage::RestoreComplete {
530 daemon_origin,
531 restored_seq,
532 } => {
533 // Target has restored — orchestrator may send buffered events.
534 // If there are no buffered events, send an empty BufferedEvents
535 // anyway: the target's reply (ReplayComplete) is what drives
536 // the chain forward into Cutover. Dropping the message here
537 // would stall any migration whose source never buffered.
538 let buffered_msg = self
539 .orchestrator
540 .on_restore_complete(daemon_origin, restored_seq)?
541 .unwrap_or(MigrationMessage::BufferedEvents {
542 daemon_origin,
543 events: Vec::new(),
544 });
545 outbound.push(OutboundMigrationMessage {
546 dest_node: from_node, // send back to target
547 payload: wire::encode(&buffered_msg)?,
548 });
549 }
550
551 MigrationMessage::ReplayComplete {
552 daemon_origin,
553 replayed_seq,
554 target_head,
555 } => {
556 // Target finished replay — orchestrator initiates cutover
557 let cutover_msg = self.orchestrator.on_replay_complete(
558 daemon_origin,
559 replayed_seq,
560 target_head,
561 )?;
562
563 // Send CutoverNotify to source (from_node is the target that reported)
564 if let MigrationMessage::CutoverNotify { .. } = &cutover_msg {
565 let source_node = self
566 .orchestrator
567 .source_node(daemon_origin)
568 .unwrap_or(from_node);
569
570 outbound.push(OutboundMigrationMessage {
571 dest_node: source_node,
572 payload: wire::encode(&cutover_msg)?,
573 });
574 }
575 }
576
577 MigrationMessage::CutoverNotify {
578 daemon_origin,
579 target_node,
580 } => {
581 // We are the source — stop accepting writes.
582 //
583 // `on_cutover` returns `DaemonNotFound` if this node didn't
584 // handle a `TakeSnapshot` (the orchestrator took the snapshot
585 // locally and never involved `source_handler`). Treat that as
586 // "no buffered events to drain" rather than a hard error so
587 // local-source migrations can still reach cleanup.
588 let final_events = match self.source_handler.on_cutover(daemon_origin) {
589 Ok(events) => events,
590 Err(MigrationError::DaemonNotFound(_)) => Vec::new(),
591 Err(e) => return Err(e),
592 };
593
594 // If there are last-moment events, send them to target
595 if !final_events.is_empty() {
596 let events_msg = MigrationMessage::BufferedEvents {
597 daemon_origin,
598 events: final_events,
599 };
600 outbound.push(OutboundMigrationMessage {
601 dest_node: target_node,
602 payload: wire::encode(&events_msg)?,
603 });
604 }
605
606 // Acknowledge cutover to the local orchestrator. When the
607 // orchestrator lives on a different node, this local call
608 // has no record to advance; the remote orchestrator learns
609 // about cutover from `CleanupComplete`, which does the same
610 // phase advance there.
611 match self.orchestrator.on_cutover_acknowledged(daemon_origin) {
612 Ok(()) => {}
613 Err(MigrationError::DaemonNotFound(_)) => {}
614 Err(e) => return Err(e),
615 }
616
617 // Capture the orchestrator BEFORE `cleanup()` clears the
618 // source-side migration record — once it's gone,
619 // `orchestrator_node()` returns None and we'd silently
620 // fall back to `from_node`, defeating the whole point of
621 // recording the orchestrator at `start_snapshot` time.
622 let dest = self
623 .source_handler
624 .orchestrator_node(daemon_origin)
625 .unwrap_or(from_node);
626
627 // Fire the pre-cleanup callback BEFORE unregistering
628 // the daemon — the host still holds the subscription
629 // ledger, which the SDK's hook snapshots here so it
630 // can send `Unsubscribe` messages to every publisher
631 // after cleanup. This is Stage 4 of the channel
632 // re-bind plan: without it, the publishers' rosters
633 // keep pointing at the source until their session
634 // timeout (~30 s), causing duplicate deliveries to
635 // a now-gone daemon and unnecessary bandwidth.
636 if let Some(cb) = &self.pre_cleanup_callback {
637 cb(daemon_origin);
638 }
639
640 // Cleanup source. No-op if this node never authored the
641 // migration (e.g. a replayed CutoverNotify after the
642 // record was already cleared); only an authored
643 // migration in Cutover phase actually unregisters the
644 // local daemon. A forged CutoverNotify for an origin
645 // we never migrated leaves the local daemon untouched.
646 let _ = self.source_handler.cleanup(daemon_origin);
647
648 let cleanup_msg = MigrationMessage::CleanupComplete { daemon_origin };
649 outbound.push(OutboundMigrationMessage {
650 dest_node: dest,
651 payload: wire::encode(&cleanup_msg)?,
652 });
653 }
654
655 MigrationMessage::CleanupComplete { daemon_origin } => {
656 // Peer-auth gate. CleanupComplete is source→
657 // orchestrator. Without binding, a forged
658 // CleanupComplete from any peer makes the
659 // orchestrator emit ActivateTarget to a target that
660 // hasn't necessarily finished restore.
661 if let Some(expected) = self.orchestrator.source_node(daemon_origin) {
662 if expected != from_node {
663 return Err(MigrationError::WrongPeer {
664 daemon_origin,
665 from: from_node,
666 expected,
667 });
668 }
669 }
670 // Source reports its cleanup done. The orchestrator now
671 // tells the target to activate.
672 let activate = self.orchestrator.on_cleanup_complete(daemon_origin)?;
673 // Route the ActivateTarget to whichever node is the target.
674 let target = self
675 .orchestrator
676 .target_node(daemon_origin)
677 .unwrap_or(from_node);
678 outbound.push(OutboundMigrationMessage {
679 dest_node: target,
680 payload: wire::encode(&activate)?,
681 });
682 }
683
684 MigrationMessage::ActivateTarget { daemon_origin } => {
685 // Peer-auth gate. ActivateTarget is orchestrator→
686 // target. Without binding, any peer with subprotocol
687 // reach forces the target to flip live while the
688 // source still believes it owns the daemon —
689 // divergent chain heads. The target_handler records
690 // the orchestrator at restore_snapshot time; reject
691 // unless from_node matches.
692 if let Some(expected) = self.target_handler.orchestrator_node(daemon_origin) {
693 if expected != from_node {
694 return Err(MigrationError::WrongPeer {
695 daemon_origin,
696 from: from_node,
697 expected,
698 });
699 }
700 }
701 // We are the target — drain remaining events and go live.
702 // Retry-safe: `activate()` is idempotent once the migration
703 // has been completed, and we route the ack to the recorded
704 // orchestrator BEFORE `complete()` transitions state to the
705 // idempotency index. If the ack packet is lost, a retried
706 // ActivateTarget will find the completed record, return the
707 // same replayed_seq, and re-send the ack. The orchestrator
708 // therefore can't get wedged waiting for a completion that
709 // already happened.
710 let replayed_seq = self.target_handler.activate(daemon_origin)?;
711 let ack_dest = self
712 .target_handler
713 .orchestrator_node(daemon_origin)
714 .unwrap_or(from_node);
715 let ack = MigrationMessage::ActivateAck {
716 daemon_origin,
717 replayed_seq,
718 };
719 outbound.push(OutboundMigrationMessage {
720 dest_node: ack_dest,
721 payload: wire::encode(&ack)?,
722 });
723 // `complete()` is idempotent: a retried ActivateTarget
724 // after a lost ack re-runs `activate()` (idempotent) and
725 // `complete()` (no-op once Complete).
726 let _ = self.target_handler.complete(daemon_origin);
727 }
728
729 MigrationMessage::ActivateAck {
730 daemon_origin,
731 replayed_seq,
732 } => {
733 // Target acknowledged — migration terminus on the
734 // orchestrator.
735 self.orchestrator
736 .on_activate_ack(daemon_origin, replayed_seq)?;
737 }
738
739 MigrationMessage::MigrationFailed {
740 daemon_origin,
741 reason,
742 } => {
743 // Peer-auth gate. MigrationFailed can come from any
744 // participant (orchestrator, source, or target).
745 // Without binding, a forged MigrationFailed from any
746 // peer drives a rollback after a legitimate
747 // cutover. Accept only when from_node matches a
748 // recorded principal on at least one of the local
749 // handler views; if no record exists at all (e.g.,
750 // late-arriving for a migration already cleaned up),
751 // drop silently rather than abort phantom state.
752 let recorded = [
753 self.orchestrator.source_node(daemon_origin),
754 self.orchestrator.target_node(daemon_origin),
755 self.source_handler.orchestrator_node(daemon_origin),
756 self.target_handler.orchestrator_node(daemon_origin),
757 ];
758 let known = recorded.iter().any(|p| p.is_some());
759 if known && !recorded.contains(&Some(from_node)) {
760 return Err(MigrationError::WrongPeer {
761 daemon_origin,
762 from: from_node,
763 expected: recorded.iter().find_map(|p| *p).unwrap_or(0),
764 });
765 }
766 // Fire the SDK's observer BEFORE abort, so the
767 // observer sees the structured reason while the
768 // migration record is still alive — the SDK uses
769 // this to surface NotReady vs terminal to the
770 // caller of `MigrationHandle::wait`.
771 if let Some(cb) = &self.failure_callback {
772 cb(daemon_origin, reason.clone());
773 }
774 // Abort on all local handlers. This is correct for
775 // terminal reasons; for `NotReady` the SDK may
776 // elect to retry, which will re-`start_migration`
777 // from scratch (re-snapshotting on local source,
778 // re-sending TakeSnapshot on remote source).
779 let _ = self.source_handler.abort(daemon_origin);
780 let _ = self.target_handler.abort(daemon_origin);
781 let _ = self
782 .orchestrator
783 .abort_migration_with_reason(daemon_origin, reason);
784 // Also drop any partial reassembler we accumulated
785 // as the target. The local-source-failure path
786 // (`fail_migration_with_reason`, line ~1061)
787 // already clears this; an inbound `MigrationFailed`
788 // after the target had received some snapshot
789 // chunks would otherwise leave ~`chunk_size *
790 // chunks_received` bytes pinned in the DashMap
791 // forever (or until the same origin migrated again
792 // with a higher `seq_through`). With many ephemeral
793 // daemons this would be an unbounded leak.
794 self.reassemblers.remove(&daemon_origin);
795
796 // Cleanup completeness: neither `StandbyGroup` nor
797 // `CapabilityIndex` holds per-daemon
798 // migration-coupled state today.
799 //
800 // * `StandbyGroup::promote` is synchronous —
801 // either succeeds or rolls back atomically.
802 // There is no "promotion in flight across
803 // migration phases" state.
804 // * `CapabilityIndex` indexes by `node_id`, not
805 // by `daemon_origin` (verified by `grep -rn
806 // daemon_origin src/adapter/net/behavior/
807 // capability.rs` returning no matches).
808 // Capabilities are node-level; failure of a
809 // specific daemon's migration doesn't change
810 // what the source node is advertising.
811 //
812 // So no additional teardown is needed today. If a
813 // future change adds per-daemon coupling in either
814 // subsystem, the regression test for this invariant
815 // fires loudly and the maintainer must wire
816 // teardown HERE.
817 }
818
819 MigrationMessage::BufferedEvents {
820 daemon_origin,
821 events,
822 } => {
823 // We are the target — replay events
824 let replayed_seq = self.target_handler.replay_events(daemon_origin, events)?;
825
826 // Ship the freshly-replayed daemon's chain head to
827 // the orchestrator so its `SuperpositionState`'s
828 // continuity-proof anchor is the real cryptographic
829 // head, not a synthetic placeholder the orchestrator
830 // would have to fabricate when it lives on a third
831 // node and has no local registry entry to read from.
832 let target_head = self.target_handler.host_head_link(daemon_origin)?;
833
834 // Tell orchestrator we're done replaying
835 let reply = MigrationMessage::ReplayComplete {
836 daemon_origin,
837 replayed_seq,
838 target_head,
839 };
840 let dest = self
841 .target_handler
842 .orchestrator_node(daemon_origin)
843 .unwrap_or(from_node);
844 outbound.push(OutboundMigrationMessage {
845 dest_node: dest,
846 payload: wire::encode(&reply)?,
847 });
848 }
849 }
850
851 Ok(outbound)
852 }
853
854 /// Feed a snapshot chunk into the target-side reassembler. When the
855 /// full snapshot is assembled, resolve a factory for the daemon and
856 /// call `restore_snapshot`, then emit `RestoreComplete` back to the
857 /// source node (`from_node`).
858 ///
859 /// Returns `Ok(None)` while waiting for more chunks, `Ok(Some(outbound))`
860 /// with the `RestoreComplete` (or a `MigrationFailed`) once restore has
861 /// been attempted.
862 fn restore_on_target(
863 &self,
864 daemon_origin: u64,
865 snapshot_bytes: Vec<u8>,
866 seq_through: u64,
867 chunk_index: u32,
868 total_chunks: u32,
869 from_node: u64,
870 ) -> Result<Option<Vec<OutboundMigrationMessage>>, MigrationError> {
871 let reassembler_entry = self
872 .reassemblers
873 .entry(daemon_origin)
874 .or_insert_with(|| Mutex::new(SnapshotReassembler::new()));
875
876 let assembled = {
877 let mut reassembler = reassembler_entry.lock();
878 reassembler
879 .feed(
880 daemon_origin,
881 snapshot_bytes,
882 seq_through,
883 chunk_index,
884 total_chunks,
885 )
886 .map_err(|e| {
887 MigrationError::StateFailed(format!("snapshot reassembly failed: {:?}", e))
888 })?
889 };
890 drop(reassembler_entry); // release DashMap read lock
891
892 let assembled_bytes = match assembled {
893 Some(bytes) => bytes,
894 None => return Ok(None), // still waiting for more chunks
895 };
896
897 // Drop the reassembler entry now that we've completed.
898 self.reassemblers.remove(&daemon_origin);
899
900 // Parse the snapshot. A parse failure is a hard migration failure.
901 let snapshot = match StateSnapshot::from_bytes(&assembled_bytes) {
902 Some(s) => s,
903 None => {
904 return Ok(Some(self.fail_migration(
905 daemon_origin,
906 from_node,
907 "failed to parse snapshot bytes on target",
908 )?));
909 }
910 };
911
912 // `source_node` is the daemon's pre-migration host — tracked here
913 // only for the target-handler's internal bookkeeping. It is NOT
914 // where `RestoreComplete` gets sent (see below).
915 let source_node = self
916 .orchestrator
917 .source_node(daemon_origin)
918 .unwrap_or(from_node);
919
920 // If this origin is already under migration here, the source is
921 // retrying because our earlier `RestoreComplete` didn't make it
922 // back. Don't touch the already-restored daemon; just re-emit
923 // `RestoreComplete` so the orchestrator can advance. This also
924 // means we DO NOT consume the factory on the retry — the factory
925 // registration must survive until the migration is truly complete
926 // (`ActivateAck`), not just until the first locally-successful
927 // restore.
928 if !self.target_handler.is_migrating(daemon_origin) {
929 // Readiness check first: if the runtime is still in
930 // `Registering`, respond `NotReady` so the source can
931 // retry with backoff rather than burning the attempt
932 // on a target that's still booting.
933 if let Some(readiness) = &self.readiness_callback {
934 if !readiness() {
935 return Ok(Some(self.fail_migration_with_reason(
936 daemon_origin,
937 from_node,
938 crate::adapter::net::compute::MigrationFailureReason::NotReady,
939 )?));
940 }
941 }
942
943 let inputs = match self.target_handler.factories().construct(daemon_origin) {
944 Some(i) => i,
945 None => {
946 return Ok(Some(self.fail_migration_with_reason(
947 daemon_origin,
948 from_node,
949 crate::adapter::net::compute::MigrationFailureReason::FactoryNotFound,
950 )?));
951 }
952 };
953
954 // Identity envelope: if the snapshot carries one and we
955 // have the X25519 private key to unseal it, the envelope
956 // supplies the real daemon keypair. Otherwise fall back
957 // to whatever keypair the factory was registered with —
958 // which, for public-identity migrations or pre-Stage-5b
959 // callers, is either a placeholder or a manually-shared
960 // keypair. A present-but-invalid envelope is a hard
961 // failure, not a fallback — otherwise an attacker who
962 // tampers with the envelope could downgrade identity
963 // transport silently.
964 let keypair = match self.resolve_restore_keypair(&snapshot, inputs.keypair.as_ref()) {
965 Ok(kp) => kp,
966 Err(e) => {
967 return Ok(Some(self.fail_migration(
968 daemon_origin,
969 from_node,
970 &format!("identity envelope open failed: {e}"),
971 )?));
972 }
973 };
974
975 let daemon = inputs.daemon;
976 if let Err(e) = self.target_handler.restore_snapshot(
977 RestoreContext {
978 daemon_origin,
979 snapshot: &snapshot,
980 source_node,
981 // orchestrator: whoever forwarded SnapshotReady to us
982 orchestrator_node: from_node,
983 },
984 keypair,
985 move || daemon,
986 inputs.config,
987 ) {
988 // Factory is still registered — next `SnapshotReady` for
989 // this origin (e.g., from an orchestrator retry) can try
990 // again. On successful completion (`complete()`), the
991 // factory is auto-removed so a stale or replayed
992 // SnapshotReady can't re-trigger restore against what is
993 // already the authoritative copy.
994 return Ok(Some(self.fail_migration(
995 daemon_origin,
996 from_node,
997 &format!("restore_snapshot failed: {:?}", e),
998 )?));
999 }
1000
1001 // Fire the post-restore callback. The SDK-supplied hook
1002 // drives channel re-bind replay (Stage 3 of the channel
1003 // re-bind plan): walks the restored daemon's ledger and
1004 // spawns async `subscribe_channel` calls so publishers
1005 // start fanning out to the target before the source
1006 // tears down. Sync callback; the hook itself should
1007 // `tokio::spawn` the actual work.
1008 if let Some(cb) = &self.post_restore_callback {
1009 cb(daemon_origin);
1010 }
1011 }
1012
1013 // Route `RestoreComplete` to the recorded orchestrator. Only the
1014 // orchestrator holds the migration record; sending to a relay
1015 // would stall the state machine. `from_node` is used as a
1016 // fallback when the target-side record has been lost (e.g. a
1017 // very late chunk after the migration record timed out).
1018 let reply = MigrationMessage::RestoreComplete {
1019 daemon_origin,
1020 restored_seq: seq_through,
1021 };
1022 let dest = self
1023 .target_handler
1024 .orchestrator_node(daemon_origin)
1025 .unwrap_or(from_node);
1026 Ok(Some(vec![OutboundMigrationMessage {
1027 dest_node: dest,
1028 payload: wire::encode(&reply)?,
1029 }]))
1030 }
1031
1032 /// Source-side helper: if we have an identity-transport context
1033 /// and the target's X25519 pubkey is available, seal the
1034 /// daemon's keypair into the snapshot.
1035 ///
1036 /// Resolution:
1037 /// - **Prerequisite missing** (no context, target key not known,
1038 /// or daemon keypair absent from the local registry): return
1039 /// the snapshot unchanged. This is the legitimate public-
1040 /// identity / NKpsk0-responder fallback — the target is
1041 /// expected to have a pre-registered keypair.
1042 /// - **Prerequisites met, seal crypto succeeds**: return the
1043 /// sealed snapshot.
1044 /// - **Prerequisites met, seal crypto fails**: fail the
1045 /// migration. Silently downgrading to unsealed transport would
1046 /// break the identity-transport guarantee the caller installed
1047 /// `identity_context` to obtain — the target would restore
1048 /// using whatever fallback keypair the factory registry carries
1049 /// (possibly nothing, possibly a stale mismatch), and any later
1050 /// signature the daemon produces on the target would be bound
1051 /// to the wrong identity. The only honest response is to abort.
1052 fn maybe_seal_envelope(
1053 &self,
1054 snapshot: StateSnapshot,
1055 daemon_origin: u64,
1056 target_node: u64,
1057 ) -> Result<StateSnapshot, MigrationError> {
1058 let Some(ctx) = &self.identity_context else {
1059 return Ok(snapshot);
1060 };
1061 // Skip if the snapshot already carries an envelope (e.g. the
1062 // SDK pre-sealed at `start_migration` time for a local-source
1063 // case).
1064 if snapshot.identity_envelope.is_some() {
1065 return Ok(snapshot);
1066 }
1067 let Some(target_pub) = (ctx.peer_static_lookup)(target_node) else {
1068 return Ok(snapshot);
1069 };
1070 // Find the daemon's keypair in the local registry. The
1071 // orchestrator + source_handler + target_handler share one
1072 // registry, so whichever owns this daemon, we see it.
1073 let kp = match self
1074 .source_handler_registry_keypair(daemon_origin)
1075 .or_else(|| self.target_handler_registry_keypair(daemon_origin))
1076 {
1077 Some(kp) => kp,
1078 None => return Ok(snapshot),
1079 };
1080 snapshot
1081 .with_identity_envelope(&kp, target_pub)
1082 .map_err(|e| {
1083 MigrationError::StateFailed(format!(
1084 "identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
1085 ))
1086 })
1087 }
1088
1089 /// Read-only keypair fetch from the shared daemon registry
1090 /// reachable via `source_handler`. `source_handler` and
1091 /// `target_handler` hold `Arc` clones of the same registry in
1092 /// typical wiring, so checking via source is sufficient; the
1093 /// `target_handler_registry_keypair` fallback exists for
1094 /// asymmetric setups where they diverge.
1095 fn source_handler_registry_keypair(&self, daemon_origin: u64) -> Option<EntityKeypair> {
1096 let _ = daemon_origin;
1097 // `MigrationSourceHandler` doesn't expose the registry
1098 // publicly, so reach through the orchestrator which shares
1099 // the same `Arc<DaemonRegistry>`.
1100 self.orchestrator
1101 .daemon_registry()
1102 .daemon_keypair(daemon_origin)
1103 }
1104
1105 fn target_handler_registry_keypair(&self, daemon_origin: u64) -> Option<EntityKeypair> {
1106 // Parallel path — the target-side registry may in some
1107 // configurations be distinct. Today it's the same `Arc`, so
1108 // this returns the same value as the source path; kept as a
1109 // seam.
1110 self.orchestrator
1111 .daemon_registry()
1112 .daemon_keypair(daemon_origin)
1113 }
1114
1115 /// Target-side helper: pick the keypair to hand to
1116 /// `restore_snapshot`. Resolution order:
1117 ///
1118 /// 1. If the snapshot carries an identity envelope AND we have
1119 /// the X25519 private key to unseal it → use the envelope's
1120 /// keypair. (Non-envelope cases fall through.)
1121 /// 2. Otherwise, if `fallback` was provided — the factory was
1122 /// registered via `DaemonFactoryRegistry::register` with a
1123 /// pre-provisioned keypair — use that.
1124 /// 3. If neither is available (placeholder registration +
1125 /// no envelope in the snapshot), fail: a placeholder factory
1126 /// expects the envelope to supply the keypair, and its
1127 /// absence means the source skipped identity transport
1128 /// without the target being prepared for that.
1129 ///
1130 /// Once an envelope is **present** on the snapshot, envelope
1131 /// transport is mandatory — the fallback keypair is NEVER used
1132 /// on that path. Present-but-invalid envelopes are terminal
1133 /// (propagating the envelope error rather than falling back
1134 /// prevents an attacker from downgrading identity transport by
1135 /// tampering with the envelope bytes), and a misbehaving
1136 /// `unseal_snapshot` that returns `Ok(None)` despite an
1137 /// envelope being present is treated as a terminal error for
1138 /// the same reason: silently falling through to the
1139 /// pre-provisioned keypair would defeat the identity-transport
1140 /// guarantee callers installed the context to obtain.
1141 fn resolve_restore_keypair(
1142 &self,
1143 snapshot: &StateSnapshot,
1144 fallback: Option<&EntityKeypair>,
1145 ) -> Result<EntityKeypair, String> {
1146 if let (Some(ctx), Some(_)) = (&self.identity_context, &snapshot.identity_envelope) {
1147 // The private key stays inside the closure owned by
1148 // `ctx.unseal_snapshot` — the dispatcher never sees it.
1149 //
1150 // Both `Ok(None)` and `Err` terminate resolution. The
1151 // envelope has been attached to the snapshot, so we've
1152 // already committed to envelope transport; falling back
1153 // to the pre-provisioned keypair here would silently
1154 // downgrade a migration the caller asked to carry
1155 // identity. A conforming `unseal_snapshot` returns
1156 // `Ok(Some(_))` or `Err(_)` when handed a snapshot with
1157 // a present envelope — `Ok(None)` indicates a broken
1158 // unsealer and must not mask the breakage.
1159 return match (ctx.unseal_snapshot)(snapshot) {
1160 Ok(Some(kp)) => Ok(kp),
1161 Ok(None) => Err("identity envelope present on snapshot but \
1162 `unseal_snapshot` returned Ok(None) — refusing to \
1163 fall back to the pre-provisioned keypair; a \
1164 present envelope mandates envelope-sourced \
1165 identity transport"
1166 .to_string()),
1167 Err(e) => Err(format!("{e}")),
1168 };
1169 }
1170 fallback.cloned().ok_or_else(|| {
1171 "placeholder factory registered but snapshot has no \
1172 identity envelope (and no local fallback keypair available)"
1173 .to_string()
1174 })
1175 }
1176
1177 /// Build a `MigrationFailed` outbound message and clean up local state.
1178 /// Convenience wrapper that wraps `reason` in
1179 /// [`MigrationFailureReason::StateFailed`] for generic failures;
1180 /// callers that need a specific reason code (e.g. `NotReady`,
1181 /// `FactoryNotFound`) should use
1182 /// [`Self::fail_migration_with_reason`].
1183 fn fail_migration(
1184 &self,
1185 daemon_origin: u64,
1186 from_node: u64,
1187 reason: &str,
1188 ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
1189 self.fail_migration_with_reason(
1190 daemon_origin,
1191 from_node,
1192 crate::adapter::net::compute::MigrationFailureReason::StateFailed(reason.to_string()),
1193 )
1194 }
1195
1196 /// Build a `MigrationFailed` outbound message with a structured
1197 /// reason. Clean-up is the same as [`Self::fail_migration`]: the
1198 /// reassembler entry + target-handler state are dropped so a
1199 /// retry from the source can start fresh (unless the reason is
1200 /// `FactoryNotFound` — a retry won't find what isn't there).
1201 fn fail_migration_with_reason(
1202 &self,
1203 daemon_origin: u64,
1204 from_node: u64,
1205 reason: crate::adapter::net::compute::MigrationFailureReason,
1206 ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
1207 tracing::warn!(
1208 daemon_origin = format!("{:#x}", daemon_origin),
1209 reason = %reason,
1210 "migration failed on target",
1211 );
1212 self.reassemblers.remove(&daemon_origin);
1213 let _ = self.target_handler.abort(daemon_origin);
1214 let msg = MigrationMessage::MigrationFailed {
1215 daemon_origin,
1216 reason,
1217 };
1218 Ok(vec![OutboundMigrationMessage {
1219 dest_node: from_node,
1220 payload: wire::encode(&msg)?,
1221 }])
1222 }
1223
1224 /// Get a reference to the orchestrator.
1225 pub fn orchestrator(&self) -> &Arc<MigrationOrchestrator> {
1226 &self.orchestrator
1227 }
1228
1229 /// Get a reference to the source handler.
1230 pub fn source_handler(&self) -> &Arc<MigrationSourceHandler> {
1231 &self.source_handler
1232 }
1233
1234 /// Get a reference to the target handler.
1235 pub fn target_handler(&self) -> &Arc<MigrationTargetHandler> {
1236 &self.target_handler
1237 }
1238}
1239
1240impl std::fmt::Debug for MigrationSubprotocolHandler {
1241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1242 f.debug_struct("MigrationSubprotocolHandler")
1243 .field("local_node_id", &format!("{:#x}", self.local_node_id))
1244 .finish()
1245 }
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250 use super::*;
1251 use crate::adapter::net::behavior::capability::CapabilityFilter;
1252 use crate::adapter::net::compute::orchestrator::wire;
1253 use crate::adapter::net::compute::{
1254 DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
1255 MigrationOrchestrator, MigrationSourceHandler, MigrationTargetHandler,
1256 };
1257 use crate::adapter::net::identity::EntityKeypair;
1258 use crate::adapter::net::state::causal::CausalEvent;
1259 use bytes::Bytes;
1260
1261 /// Regression (Cubic-AI P1: leaking Noise static private key):
1262 /// `MigrationIdentityContext` previously exposed
1263 /// `pub local_x25519_priv: [u8; 32]`, which meant any SDK caller
1264 /// holding a context (or calling the now-removed
1265 /// `MeshNode::static_x25519_priv`) could copy the node's
1266 /// long-term identity key out.
1267 ///
1268 /// The fix moves the key into an `unseal_snapshot` closure owned
1269 /// by the context, with the raw bytes never reachable as a
1270 /// readable field. This test pins the struct's size so a
1271 /// blind re-add of a `[u8; 32]` or similar secret-bearing field
1272 /// trips the canary.
1273 ///
1274 /// Two `Arc<dyn Fn ...>` are fat pointers — two `usize`s each —
1275 /// so the context is `4 * size_of::<usize>()`. If someone adds a
1276 /// 32-byte key field, size jumps to that + 32 and this assertion
1277 /// fails. Not a true API-surface guard (PR review is the real
1278 /// guard) but an honest canary for the specific regression.
1279 #[test]
1280 fn migration_identity_context_has_no_plaintext_secret_field_regression() {
1281 use std::mem::size_of;
1282 let fat_ptr = 2 * size_of::<usize>();
1283 assert_eq!(
1284 size_of::<MigrationIdentityContext>(),
1285 2 * fat_ptr,
1286 "MigrationIdentityContext must stay two Arc<dyn Fn ...> and \
1287 nothing else — a size change means a new field was added, \
1288 most likely re-exposing the Noise static private key the \
1289 way `local_x25519_priv: [u8; 32]` used to.",
1290 );
1291 }
1292
1293 struct TestDaemon {
1294 value: u64,
1295 }
1296
1297 impl MeshDaemon for TestDaemon {
1298 fn name(&self) -> &str {
1299 "test"
1300 }
1301 fn requirements(&self) -> CapabilityFilter {
1302 CapabilityFilter::default()
1303 }
1304 fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
1305 self.value += 1;
1306 Ok(vec![])
1307 }
1308 fn snapshot(&self) -> Option<Bytes> {
1309 Some(Bytes::from(self.value.to_le_bytes().to_vec()))
1310 }
1311 fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
1312 self.value = u64::from_le_bytes(state[..8].try_into().unwrap());
1313 Ok(())
1314 }
1315 }
1316
1317 fn setup() -> (MigrationSubprotocolHandler, Arc<DaemonRegistry>, u64) {
1318 let reg = Arc::new(DaemonRegistry::new());
1319 let kp = EntityKeypair::generate();
1320 let origin = kp.origin_hash();
1321 let host = DaemonHost::new(
1322 Box::new(TestDaemon { value: 100 }),
1323 kp,
1324 DaemonHostConfig::default(),
1325 );
1326 reg.register(host).unwrap();
1327
1328 let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1329 let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1330 let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
1331
1332 let handler = MigrationSubprotocolHandler::new(orch, source, target, 0x1111);
1333 (handler, reg, origin)
1334 }
1335
1336 #[test]
1337 fn test_handle_take_snapshot() {
1338 let (handler, _reg, origin) = setup();
1339
1340 let msg = MigrationMessage::TakeSnapshot {
1341 daemon_origin: origin,
1342 target_node: 0x2222,
1343 };
1344 let encoded = wire::encode(&msg).unwrap();
1345
1346 let outbound = handler.handle_message(&encoded, 0x3333).unwrap();
1347 assert!(!outbound.is_empty());
1348
1349 // Should get SnapshotReady back
1350 let reply = wire::decode(&outbound[0].payload).unwrap();
1351 match reply {
1352 MigrationMessage::SnapshotReady { daemon_origin, .. } => {
1353 assert_eq!(daemon_origin, origin);
1354 }
1355 _ => panic!("expected SnapshotReady"),
1356 }
1357 }
1358
1359 /// Regression (Cubic-AI P2): `maybe_seal_envelope` used to
1360 /// swallow seal-crypto errors (e.g. a public-only source
1361 /// keypair) and return the **unsealed** snapshot to the caller,
1362 /// downgrading identity transport silently. Any target that
1363 /// relied on the envelope to supply the daemon keypair
1364 /// (`expect_migration` placeholder + no out-of-band keypair)
1365 /// would then fail to restore, or — worse — restore under a
1366 /// stale fallback keypair and produce mis-signed outputs.
1367 ///
1368 /// The fix makes the helper return `Result` and propagate seal
1369 /// failures as `MigrationError::StateFailed`. Callers abort
1370 /// rather than ship an unsealed snapshot they didn't ask for.
1371 ///
1372 /// This test stages the exact failure mode: register a daemon
1373 /// with a public-only `EntityKeypair`, install an identity
1374 /// context that successfully resolves the target's static, then
1375 /// ask `maybe_seal_envelope` to seal. The underlying crypto
1376 /// rejects (public-only can't sign the attestation) and the
1377 /// helper must surface it.
1378 #[test]
1379 fn maybe_seal_envelope_propagates_seal_failures() {
1380 use crate::adapter::net::identity::IdentityEnvelope;
1381 use crate::adapter::net::state::snapshot::StateSnapshot;
1382 use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1383
1384 // Target's X25519 static — arbitrary fresh key.
1385 let mut seed = [0u8; 32];
1386 getrandom::fill(&mut seed).unwrap();
1387 let target_priv = X25519Secret::from(seed);
1388 let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1389 let target_node_id: u64 = 0x2222;
1390
1391 // Daemon keypair: generate a real one for the entity_id,
1392 // then strip the signing half via `public_only`. The
1393 // registry will hand this out; the seal will try to sign
1394 // the attestation transcript and fail.
1395 let real_kp = EntityKeypair::generate();
1396 let origin_hash = real_kp.origin_hash();
1397 let public_only_kp = EntityKeypair::public_only(real_kp.entity_id().clone());
1398 assert!(
1399 public_only_kp.is_read_only(),
1400 "fixture: must be public-only",
1401 );
1402
1403 // Register a DaemonHost using the public-only keypair so
1404 // `source_handler_registry_keypair` → `daemon_keypair`
1405 // returns it. The daemon body is irrelevant.
1406 let reg = Arc::new(DaemonRegistry::new());
1407 let host = DaemonHost::new(
1408 Box::new(TestDaemon { value: 0 }),
1409 public_only_kp,
1410 DaemonHostConfig::default(),
1411 );
1412 reg.register(host).unwrap();
1413
1414 // Build a matching snapshot the normal way — same origin,
1415 // same entity_id.
1416 let snapshot = StateSnapshot {
1417 version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1418 entity_id: real_kp.entity_id().clone(),
1419 chain_link: crate::adapter::net::state::causal::CausalLink {
1420 origin_hash,
1421 horizon_encoded: 0,
1422 sequence: 0,
1423 parent_hash: 0,
1424 },
1425 through_seq: 0,
1426 state: Bytes::from_static(&[0u8; 8]),
1427 horizon: Default::default(),
1428 created_at: 0,
1429 bindings_bytes: Vec::new(),
1430 identity_envelope: None,
1431 head_payload: None,
1432 };
1433
1434 // Wire the identity context so `peer_static_lookup` returns
1435 // the target pub — i.e., every prerequisite is satisfied.
1436 // The unseal closure isn't exercised on the source path.
1437 let unseal_snapshot: EnvelopeUnsealFn =
1438 Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&target_priv));
1439 let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1440 Arc::new(move |nid| {
1441 if nid == target_node_id {
1442 Some(target_pub)
1443 } else {
1444 None
1445 }
1446 });
1447 let ctx = MigrationIdentityContext {
1448 unseal_snapshot,
1449 peer_static_lookup,
1450 };
1451
1452 let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1453 let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1454 let target = Arc::new(MigrationTargetHandler::new(reg));
1455 let handler = MigrationSubprotocolHandler::with_hooks(
1456 orch,
1457 source,
1458 target,
1459 0x1111,
1460 MigrationHandlerHooks {
1461 identity: Some(ctx),
1462 ..Default::default()
1463 },
1464 );
1465
1466 // With all prerequisites satisfied but crypto guaranteed to
1467 // fail (public-only keypair can't attest), the helper must
1468 // surface an error — not return the unsealed snapshot.
1469 let err = handler
1470 .maybe_seal_envelope(snapshot, origin_hash, target_node_id)
1471 .expect_err(
1472 "public-only daemon keypair must fail to seal; silently returning the \
1473 unsealed snapshot breaks the identity-transport guarantee",
1474 );
1475 match err {
1476 MigrationError::StateFailed(ref msg) => {
1477 assert!(
1478 msg.contains("envelope seal failed"),
1479 "expected 'envelope seal failed' in message, got: {msg}",
1480 );
1481 assert!(
1482 msg.contains(&format!("{origin_hash:#x}")),
1483 "expected origin_hash in message, got: {msg}",
1484 );
1485 }
1486 other => panic!("expected StateFailed, got {other:?}"),
1487 }
1488
1489 // Belt-and-braces: the unsealed-fallback path (no context)
1490 // still works — proves this test isn't accidentally
1491 // asserting `maybe_seal_envelope` always errors.
1492 let handler_no_ctx = MigrationSubprotocolHandler::new(
1493 Arc::new(MigrationOrchestrator::new(
1494 Arc::new(DaemonRegistry::new()),
1495 0x1111,
1496 )),
1497 Arc::new(MigrationSourceHandler::new(Arc::new(DaemonRegistry::new()))),
1498 Arc::new(MigrationTargetHandler::new(Arc::new(DaemonRegistry::new()))),
1499 0x1111,
1500 );
1501 let snapshot2 = StateSnapshot {
1502 version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1503 entity_id: real_kp.entity_id().clone(),
1504 chain_link: crate::adapter::net::state::causal::CausalLink {
1505 origin_hash,
1506 horizon_encoded: 0,
1507 sequence: 0,
1508 parent_hash: 0,
1509 },
1510 through_seq: 0,
1511 state: Bytes::from_static(&[0u8; 8]),
1512 horizon: Default::default(),
1513 created_at: 0,
1514 bindings_bytes: Vec::new(),
1515 identity_envelope: None,
1516 head_payload: None,
1517 };
1518 let passthrough = handler_no_ctx
1519 .maybe_seal_envelope(snapshot2, origin_hash, target_node_id)
1520 .expect("no ctx = ok unchanged");
1521 assert!(passthrough.identity_envelope.is_none());
1522 let _ = IdentityEnvelope::new; // silence unused import
1523 }
1524
1525 /// Regression (Cubic-AI P1): seal failure inside the
1526 /// `TakeSnapshot` dispatcher path was propagated as a
1527 /// dispatcher error via `?`, leaving the source's
1528 /// `start_snapshot` record in place AND starving the remote
1529 /// orchestrator — it's waiting for a `SnapshotReady` that
1530 /// will never arrive.
1531 ///
1532 /// The fix converts seal failures into a `MigrationFailed`
1533 /// wire reply back to the orchestrator, aborts the local
1534 /// source-handler record, and returns the single-message
1535 /// outbound so the caller dispatches it normally.
1536 ///
1537 /// Test: construct a public-only daemon keypair (seal will
1538 /// fail at attestation), wire an identity context that
1539 /// surfaces the target's static, drive the handler with a
1540 /// `TakeSnapshot` message. Assert:
1541 /// 1. `handle_message` returns `Ok(outbound)` — no bubble-up.
1542 /// 2. The outbound contains exactly one `MigrationFailed`
1543 /// addressed to the originator (`from_node`).
1544 /// 3. The `source_handler` no longer tracks this daemon
1545 /// (abort ran).
1546 #[test]
1547 fn take_snapshot_seal_failure_emits_migration_failed_reply() {
1548 use crate::adapter::net::state::snapshot::StateSnapshot;
1549 use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1550
1551 // Target static for the context's peer lookup. The value
1552 // isn't exercised by the seal (it fails at attestation
1553 // first due to public-only keypair), but the context needs
1554 // a non-None for the lookup or it'd short-circuit before
1555 // hitting the seal at all.
1556 let mut x25519_seed = [0u8; 32];
1557 getrandom::fill(&mut x25519_seed).unwrap();
1558 let target_priv = X25519Secret::from(x25519_seed);
1559 let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1560 let target_node_id: u64 = 0x2222;
1561 let orchestrator_node_id: u64 = 0x3333;
1562
1563 // Daemon registered with a public-only keypair — seal's
1564 // attestation step needs the signing half, so this guarantees
1565 // `maybe_seal_envelope` returns Err once the seal runs.
1566 let real_kp = EntityKeypair::generate();
1567 let origin = real_kp.origin_hash();
1568 let public_only_kp = EntityKeypair::public_only(real_kp.entity_id().clone());
1569
1570 let reg = Arc::new(DaemonRegistry::new());
1571 let host = DaemonHost::new(
1572 Box::new(TestDaemon { value: 7 }),
1573 public_only_kp,
1574 DaemonHostConfig::default(),
1575 );
1576 reg.register(host).unwrap();
1577
1578 let unseal: EnvelopeUnsealFn =
1579 Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&target_priv));
1580 let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1581 Arc::new(move |nid| {
1582 if nid == target_node_id {
1583 Some(target_pub)
1584 } else {
1585 None
1586 }
1587 });
1588 let ctx = MigrationIdentityContext {
1589 unseal_snapshot: unseal,
1590 peer_static_lookup,
1591 };
1592
1593 let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1594 let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1595 let target = Arc::new(MigrationTargetHandler::new(reg));
1596 let handler = MigrationSubprotocolHandler::with_hooks(
1597 orch,
1598 source.clone(),
1599 target,
1600 0x1111,
1601 MigrationHandlerHooks {
1602 identity: Some(ctx),
1603 ..Default::default()
1604 },
1605 );
1606
1607 // Drive a `TakeSnapshot` from the fictional orchestrator.
1608 let msg = MigrationMessage::TakeSnapshot {
1609 daemon_origin: origin,
1610 target_node: target_node_id,
1611 };
1612 let encoded = wire::encode(&msg).unwrap();
1613 let outbound = handler
1614 .handle_message(&encoded, orchestrator_node_id)
1615 .expect("seal failure must not bubble up as dispatcher error");
1616
1617 // Exactly one message back, addressed to the orchestrator
1618 // that sent TakeSnapshot.
1619 assert_eq!(
1620 outbound.len(),
1621 1,
1622 "expected single MigrationFailed reply, got {} messages",
1623 outbound.len(),
1624 );
1625 assert_eq!(outbound[0].dest_node, orchestrator_node_id);
1626
1627 let reply = wire::decode(&outbound[0].payload).unwrap();
1628 match reply {
1629 MigrationMessage::MigrationFailed {
1630 daemon_origin,
1631 reason,
1632 } => {
1633 assert_eq!(daemon_origin, origin);
1634 let reason_msg = format!("{reason}");
1635 assert!(
1636 reason_msg.contains("identity envelope seal failed"),
1637 "expected seal-failure reason, got: {reason_msg}",
1638 );
1639 }
1640 other => panic!("expected MigrationFailed, got {other:?}"),
1641 }
1642
1643 // Source-handler record was aborted — the pre-fix code
1644 // left this in place indefinitely.
1645 assert!(
1646 source.phase(origin).is_none(),
1647 "source_handler must have cleared its record for {origin:#x} after a failed TakeSnapshot",
1648 );
1649 }
1650
1651 #[test]
1652 fn test_handle_migration_failed() {
1653 let (handler, _reg, origin) = setup();
1654
1655 let msg = MigrationMessage::MigrationFailed {
1656 daemon_origin: origin,
1657 reason: crate::adapter::net::compute::MigrationFailureReason::StateFailed(
1658 "test failure".into(),
1659 ),
1660 };
1661 let encoded = wire::encode(&msg).unwrap();
1662
1663 // Should not error — just cleans up
1664 let outbound = handler.handle_message(&encoded, 0x3333).unwrap();
1665 assert!(outbound.is_empty());
1666 }
1667
1668 /// Regression for a test that the SDK-level suite could not
1669 /// honestly exercise: when the factory registry carries a
1670 /// pre-provisioned **fallback keypair** AND the snapshot carries
1671 /// a **valid identity envelope**, the envelope's keypair must
1672 /// win. The SDK test that used to assert this could only register
1673 /// a fallback keyed by the envelope's own `origin_hash`, because
1674 /// `origin_hash` is derived from the keypair bytes — there's no
1675 /// way for a user-level API to supply a "wrong" keypair at a
1676 /// given `origin_hash`.
1677 ///
1678 /// This unit test reaches directly into `resolve_restore_keypair`
1679 /// with two genuinely-distinct keypairs and asserts the envelope
1680 /// overrides. If someone later flips the resolution order (e.g.
1681 /// preferring the fallback for some misguided "backward-
1682 /// compatibility" reason), this test trips.
1683 #[test]
1684 fn envelope_keypair_overrides_fallback_placeholder() {
1685 use crate::adapter::net::identity::IdentityEnvelope;
1686 use crate::adapter::net::state::causal::CausalLink;
1687 use crate::adapter::net::state::snapshot::StateSnapshot;
1688 use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1689
1690 // Target's Noise static X25519 keypair — used to seal and
1691 // then unseal the envelope.
1692 let mut seed = [0u8; 32];
1693 getrandom::fill(&mut seed).unwrap();
1694 let target_priv = X25519Secret::from(seed);
1695 let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1696
1697 // Real source-side daemon keypair: the one that should end
1698 // up being used for restore.
1699 let real_kp = EntityKeypair::generate();
1700 // Wrong fallback keypair: the one that would be used if
1701 // someone flipped the resolution order.
1702 let wrong_fallback = EntityKeypair::generate();
1703 assert_ne!(
1704 real_kp.entity_id(),
1705 wrong_fallback.entity_id(),
1706 "fixture: real and fallback must differ",
1707 );
1708
1709 let chain_link = CausalLink {
1710 origin_hash: real_kp.origin_hash(),
1711 horizon_encoded: 0,
1712 sequence: 0,
1713 parent_hash: 0,
1714 };
1715 let envelope =
1716 IdentityEnvelope::new(&real_kp, target_pub, &chain_link).expect("seal envelope");
1717
1718 // Snapshot carrying the envelope. The envelope's origin_hash
1719 // matches `real_kp`; the test doesn't need the rest of the
1720 // snapshot to validate, only the envelope-open path.
1721 let snapshot = StateSnapshot {
1722 version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1723 entity_id: real_kp.entity_id().clone(),
1724 chain_link,
1725 through_seq: 0,
1726 state: Bytes::new(),
1727 horizon: Default::default(),
1728 created_at: 0,
1729 bindings_bytes: Vec::new(),
1730 identity_envelope: Some(envelope),
1731 head_payload: None,
1732 };
1733
1734 // Build a handler with an identity_context whose unseal
1735 // closure holds the target's private key. This mirrors what
1736 // `MeshNode::migration_identity_context` produces.
1737 let priv_for_closure = target_priv.clone();
1738 let unseal_snapshot: EnvelopeUnsealFn =
1739 Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&priv_for_closure));
1740 let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1741 Arc::new(|_| None);
1742 let ctx = MigrationIdentityContext {
1743 unseal_snapshot,
1744 peer_static_lookup,
1745 };
1746
1747 let reg = Arc::new(DaemonRegistry::new());
1748 let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1749 let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1750 let target = Arc::new(MigrationTargetHandler::new(reg));
1751 let handler = MigrationSubprotocolHandler::with_hooks(
1752 orch,
1753 source,
1754 target,
1755 0x1111,
1756 MigrationHandlerHooks {
1757 identity: Some(ctx),
1758 ..Default::default()
1759 },
1760 );
1761
1762 // Both envelope and fallback present — envelope wins.
1763 let resolved = handler
1764 .resolve_restore_keypair(&snapshot, Some(&wrong_fallback))
1765 .expect("resolve");
1766 assert_eq!(
1767 resolved.entity_id(),
1768 real_kp.entity_id(),
1769 "envelope's keypair must win over the pre-provisioned fallback — \
1770 flipping this order silently downgrades identity transport to \
1771 whatever the factory registry was pre-populated with",
1772 );
1773 assert_ne!(
1774 resolved.entity_id(),
1775 wrong_fallback.entity_id(),
1776 "fallback must NOT leak through when the envelope is valid",
1777 );
1778
1779 // Sanity: with no envelope on the snapshot, fallback is
1780 // returned verbatim. Proves the `Some(envelope) → envelope`
1781 // branch above wasn't passing by coincidence.
1782 let snapshot_no_envelope = StateSnapshot {
1783 identity_envelope: None,
1784 head_payload: None,
1785 ..snapshot.clone()
1786 };
1787 let resolved_fallback = handler
1788 .resolve_restore_keypair(&snapshot_no_envelope, Some(&wrong_fallback))
1789 .expect("resolve with fallback only");
1790 assert_eq!(resolved_fallback.entity_id(), wrong_fallback.entity_id());
1791 }
1792
1793 /// Regression (Cubic-AI P2): once an identity envelope is
1794 /// present on the snapshot, resolution must commit to
1795 /// envelope transport. A misbehaving `unseal_snapshot` that
1796 /// returns `Ok(None)` — e.g., a partially-implemented or
1797 /// buggy custom closure — previously made the dispatcher
1798 /// fall through to the pre-provisioned fallback keypair,
1799 /// silently downgrading a migration the caller had opted
1800 /// into envelope transport for.
1801 ///
1802 /// The fix treats `Ok(None)` from a present-envelope snapshot
1803 /// as a terminal error, matching the policy for an explicit
1804 /// `Err(...)` from unseal.
1805 ///
1806 /// Test: construct a snapshot that carries an envelope, wire
1807 /// an identity context whose `unseal_snapshot` ignores the
1808 /// snapshot entirely and returns `Ok(None)`. Provide a
1809 /// (wrong) fallback keypair. The resolver must refuse,
1810 /// returning `Err(...)` — not the fallback.
1811 #[test]
1812 fn envelope_present_but_unseal_returns_none_fails_rather_than_fallback() {
1813 use crate::adapter::net::identity::IdentityEnvelope;
1814 use crate::adapter::net::state::snapshot::StateSnapshot;
1815 use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1816
1817 // Fresh X25519 keypair — seal recipient.
1818 let mut seed = [0u8; 32];
1819 getrandom::fill(&mut seed).unwrap();
1820 let target_priv = X25519Secret::from(seed);
1821 let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1822
1823 // Real daemon keypair: builds a valid envelope so the
1824 // snapshot is well-formed from the wire's perspective.
1825 let real_kp = EntityKeypair::generate();
1826 let chain_link = crate::adapter::net::state::causal::CausalLink {
1827 origin_hash: real_kp.origin_hash(),
1828 horizon_encoded: 0,
1829 sequence: 0,
1830 parent_hash: 0,
1831 };
1832 let envelope =
1833 IdentityEnvelope::new(&real_kp, target_pub, &chain_link).expect("seal envelope");
1834
1835 let snapshot = StateSnapshot {
1836 version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1837 entity_id: real_kp.entity_id().clone(),
1838 chain_link,
1839 through_seq: 0,
1840 state: Bytes::new(),
1841 horizon: Default::default(),
1842 created_at: 0,
1843 bindings_bytes: Vec::new(),
1844 identity_envelope: Some(envelope),
1845 head_payload: None,
1846 };
1847
1848 // Misbehaving unsealer: always returns `Ok(None)`, even
1849 // when handed a snapshot with a real envelope. Simulates
1850 // a partial implementation or a bug that would have
1851 // triggered the silent-downgrade previously.
1852 let unseal_snapshot: EnvelopeUnsealFn = Arc::new(|_snap: &StateSnapshot| Ok(None));
1853 let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1854 Arc::new(|_| None);
1855 let ctx = MigrationIdentityContext {
1856 unseal_snapshot,
1857 peer_static_lookup,
1858 };
1859
1860 let reg = Arc::new(DaemonRegistry::new());
1861 let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1862 let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1863 let target = Arc::new(MigrationTargetHandler::new(reg));
1864 let handler = MigrationSubprotocolHandler::with_hooks(
1865 orch,
1866 source,
1867 target,
1868 0x1111,
1869 MigrationHandlerHooks {
1870 identity: Some(ctx),
1871 ..Default::default()
1872 },
1873 );
1874
1875 // Fallback would have "succeeded" (wrong keypair, but
1876 // syntactically present) pre-fix. Post-fix the resolver
1877 // rejects because the envelope-present invariant commits
1878 // us to envelope transport.
1879 let wrong_fallback = EntityKeypair::generate();
1880 let err = handler
1881 .resolve_restore_keypair(&snapshot, Some(&wrong_fallback))
1882 .expect_err(
1883 "envelope present + unseal Ok(None) must fail; silently \
1884 returning the fallback downgrades identity transport",
1885 );
1886 assert!(
1887 err.contains("refusing to fall back"),
1888 "expected 'refusing to fall back' in error message, got: {err}",
1889 );
1890 }
1891
1892 /// CR-24: pin the no-per-daemon-coupling invariant for
1893 /// `StandbyGroup` and `CapabilityIndex`. The audit suggested
1894 /// the `MigrationFailed` arm needed teardown for both
1895 /// subsystems; investigation showed neither holds
1896 /// per-daemon migration-coupled state today (see the comment
1897 /// block at the MigrationFailed arm). This test fires loudly
1898 /// if a future change introduces such coupling, signalling
1899 /// that the maintainer MUST wire teardown into the arm.
1900 ///
1901 /// Mechanism: scan the source files for the canonical coupling
1902 /// shapes — `daemon_origin` field on `StandbyGroup` /
1903 /// `CapabilityIndex`, or migration-handler import of either
1904 /// type. Any match indicates the contract has changed and
1905 /// `migration_handler.rs:MigrationFailed` likely needs to
1906 /// call cleanup on the new state.
1907 #[test]
1908 fn cr24_no_per_daemon_migration_coupling_in_standby_or_capability() {
1909 let standby_src = include_str!("../compute/standby_group.rs");
1910 let capability_src = include_str!("../behavior/capability.rs");
1911
1912 // Cubic P2: strip both line comments (`// ...`) AND block
1913 // comments (`/* ... */`, including `/** ... */` doc
1914 // comments) before scanning. The earlier filter only
1915 // skipped `//` lines, so a token mention inside
1916 // `/** ... */` would falsely trip the regression. We
1917 // strip block-comment ranges first; the per-line filter
1918 // then handles the line-comment case.
1919 fn strip_comments(src: &str) -> String {
1920 let bytes = src.as_bytes();
1921 let mut out = Vec::with_capacity(bytes.len());
1922 let mut i = 0;
1923 while i < bytes.len() {
1924 // Skip block comment.
1925 if i + 1 < bytes.len() && bytes[i] == b'/' && bytes[i + 1] == b'*' {
1926 i += 2;
1927 while i + 1 < bytes.len() && !(bytes[i] == b'*' && bytes[i + 1] == b'/') {
1928 // Preserve newlines so per-line scanning still aligns.
1929 if bytes[i] == b'\n' {
1930 out.push(b'\n');
1931 }
1932 i += 1;
1933 }
1934 if i + 1 < bytes.len() {
1935 i += 2; // skip closing */
1936 }
1937 continue;
1938 }
1939 out.push(bytes[i]);
1940 i += 1;
1941 }
1942 String::from_utf8_lossy(&out).into_owned()
1943 }
1944
1945 let capability_clean = strip_comments(capability_src);
1946 let standby_clean = strip_comments(standby_src);
1947
1948 // CapabilityIndex must NOT index by daemon_origin. Pinned
1949 // separately because it's the audit's specific claim.
1950 let capability_uses_daemon_origin = capability_clean.lines().any(|line| {
1951 let trimmed = line.trim_start();
1952 !trimmed.starts_with("//") && trimmed.contains("daemon_origin")
1953 });
1954 assert!(
1955 !capability_uses_daemon_origin,
1956 "CR-24 regression: CapabilityIndex now references `daemon_origin` in \
1957 non-comment source. The audit's CR-24 concern was that capabilities \
1958 tied to a migrating daemon need teardown on MigrationFailed. With \
1959 this new coupling the migration_handler MUST call \
1960 `capability_index.cleanup_origin(daemon_origin)` (or equivalent) \
1961 in the MigrationFailed arm. Add the call AND update this test."
1962 );
1963
1964 // StandbyGroup must NOT have an "in-flight migration
1965 // promotion" field. The audit's scenario was "promotion
1966 // mid-flight" — for that to be a real concern, there
1967 // would need to be a state field (e.g. `pending_promotion:
1968 // Option<...>`) that survives across multiple migration-
1969 // handler dispatches.
1970 let standby_has_pending = standby_clean.lines().any(|line| {
1971 let trimmed = line.trim_start();
1972 if trimmed.starts_with("//") {
1973 return false;
1974 }
1975 trimmed.contains("pending_promotion")
1976 || trimmed.contains("migration_in_flight")
1977 || trimmed.contains("in_migration:")
1978 });
1979 assert!(
1980 !standby_has_pending,
1981 "CR-24 regression: StandbyGroup now has a pending-promotion or \
1982 in-migration field. The audit's CR-24 concern was that a mid- \
1983 flight standby promotion needs teardown on MigrationFailed. With \
1984 this new coupling the migration_handler MUST call rollback on \
1985 StandbyGroup in the MigrationFailed arm. Add the rollback call \
1986 AND update this test."
1987 );
1988 }
1989}