Skip to main content

net/adapter/net/subprotocol/
migration_handler.rs

1//! Migration subprotocol message handler.
2//!
3//! Dispatches inbound migration messages (subprotocol 0x0500) to the
4//! appropriate handler: orchestrator, source, or target.
5
6use std::sync::Arc;
7
8use dashmap::DashMap;
9use parking_lot::Mutex;
10
11use crate::adapter::net::compute::migration_target::RestoreContext;
12use crate::adapter::net::compute::orchestrator::wire;
13use crate::adapter::net::compute::{
14    MigrationError, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
15    MigrationTargetHandler, SnapshotReassembler,
16};
17use crate::adapter::net::identity::EntityKeypair;
18use crate::adapter::net::state::snapshot::StateSnapshot;
19
20/// Identity-transport context for automatic envelope seal/open in
21/// the migration dispatcher.
22///
23/// When populated, the handler:
24/// - **Source path**: after taking a snapshot, if the target's
25///   X25519 static pub is known (via `peer_static_lookup`) AND the
26///   local daemon's keypair is available, seal the envelope into
27///   the snapshot before chunking.
28/// - **Target path**: on `SnapshotReady` with an attached envelope,
29///   call `unseal_snapshot` to recover the daemon's keypair, and
30///   pass that into `restore_snapshot` instead of whatever the
31///   factory registry has pre-registered.
32///
33/// A `None` context means the dispatcher ignores envelopes — the
34/// pre-identity-envelope fallback path where both nodes pre-register
35/// matching keypairs.
36///
37/// # Key hygiene
38///
39/// This struct used to carry the local Noise static private key as a
40/// `pub [u8; 32]` field. Any SDK caller with access to a
41/// `MigrationIdentityContext` could copy the node's long-term secret
42/// out, which is unacceptable — the Noise static is what backs the
43/// node's identity in the mesh, not just the envelope-open path.
44///
45/// The private key is now captured inside the `unseal_snapshot`
46/// closure (built by [`MeshNode::migration_identity_context`](crate::adapter::net::MeshNode::migration_identity_context)) and
47/// never surfaced as a struct field. Callers can still hand the
48/// context to the dispatcher, but they cannot extract the key from
49/// it. This matches how `peer_static_lookup` already worked.
50#[derive(Clone)]
51pub struct MigrationIdentityContext {
52    /// Open an identity envelope attached to `snapshot`, if present,
53    /// using the local static X25519 private key captured at
54    /// construction time. Returns `Ok(None)` when the snapshot has
55    /// no envelope, `Ok(Some(kp))` on a successful open, and an
56    /// error string on seal-open / attestation failure.
57    ///
58    /// Built by [`MeshNode::migration_identity_context`](crate::adapter::net::MeshNode::migration_identity_context); the
59    /// closure owns a `zeroize`-on-drop `StaticSecret` so the key
60    /// material is wiped when the context is dropped.
61    pub unseal_snapshot: EnvelopeUnsealFn,
62    /// Callback: given a peer node_id, return its X25519 static
63    /// public key if we have an active session with it. Used by the
64    /// source path to pick the seal recipient.
65    pub peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync>,
66}
67
68impl std::fmt::Debug for MigrationIdentityContext {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("MigrationIdentityContext")
71            .field("unseal_snapshot", &"<fn>")
72            .field("peer_static_lookup", &"<fn>")
73            .finish()
74    }
75}
76
77/// Outbound message with destination node.
78#[derive(Debug)]
79pub struct OutboundMigrationMessage {
80    /// Destination node ID.
81    pub dest_node: u64,
82    /// Encoded wire message.
83    pub payload: Vec<u8>,
84}
85
86/// Handles inbound migration subprotocol messages.
87///
88/// Routes each message type to the orchestrator, source handler, or target
89/// handler as appropriate, and produces outbound response messages.
90/// Callback fired after the target-side dispatcher successfully
91/// restores a daemon from a migration snapshot. Invoked with the
92/// daemon's `origin_hash`. Used by the SDK to drive channel
93/// re-bind replay (`DAEMON_CHANNEL_REBIND_PLAN.md` Stage 3): the
94/// callback walks the restored daemon's subscription ledger and
95/// spawns asynchronous `subscribe_channel` calls so publishers
96/// start fanning out to the target before the source tears down.
97///
98/// The callback runs synchronously on the dispatcher thread; it
99/// should kick off any async work via `tokio::spawn` rather than
100/// blocking the dispatch loop.
101pub type PostRestoreCallback = Arc<dyn Fn(u64) + Send + Sync>;
102
103/// Callback fired on the source side at `CutoverNotify` handling,
104/// immediately before `source_handler.cleanup` unregisters the
105/// daemon. Stage 4 of `DAEMON_CHANNEL_REBIND_PLAN.md`: the SDK's
106/// hook snapshots the daemon's subscription ledger here and spawns
107/// async `unsubscribe_channel` calls to each publisher so rosters
108/// drop the source immediately rather than aging out over the
109/// session-timeout window.
110///
111/// Sync on the dispatcher thread; async work must `tokio::spawn`.
112pub type PreCleanupCallback = Arc<dyn Fn(u64) + Send + Sync>;
113
114/// Readiness predicate — returns `true` when the local runtime is
115/// prepared to accept inbound migrations (target path). When
116/// populated and the predicate returns `false`, the dispatcher
117/// responds to `SnapshotReady` with
118/// [`MigrationFailureReason::NotReady`](crate::adapter::net::compute::MigrationFailureReason::NotReady)
119/// so the source can retry with backoff rather than surfacing a
120/// terminal error while the target is still booting.
121///
122/// The callback is consulted synchronously on the dispatcher
123/// thread — it must return quickly.
124pub type ReadinessCallback = Arc<dyn Fn() -> bool + Send + Sync>;
125
126/// Callback fired on the source side whenever the dispatcher
127/// observes an inbound `MigrationFailed` message. The SDK uses this
128/// to surface the structured reason code to the
129/// [`MigrationHandle::wait`](crate) caller so they can distinguish
130/// retriable from terminal failures.
131///
132/// Sync on the dispatcher thread.
133pub type FailureCallback =
134    Arc<dyn Fn(u64, crate::adapter::net::compute::MigrationFailureReason) + Send + Sync>;
135
136/// Callback that opens an identity envelope carried on a
137/// [`crate::adapter::net::state::snapshot::StateSnapshot`], using a
138/// local private key captured inside the closure. See
139/// [`MigrationIdentityContext`] for key-hygiene rationale.
140pub type EnvelopeUnsealFn = Arc<
141    dyn Fn(
142            &crate::adapter::net::state::snapshot::StateSnapshot,
143        ) -> Result<Option<EntityKeypair>, crate::adapter::net::identity::EnvelopeError>
144        + Send
145        + Sync,
146>;
147
148/// Optional cross-cutting hooks consumed by
149/// [`MigrationSubprotocolHandler::with_hooks`]. Each field is
150/// independently opt-in so a test that cares about one hook doesn't
151/// have to fabricate the others; `Default` = nothing wired.
152///
153/// | Field             | Side     | Purpose                                    |
154/// |-------------------|----------|--------------------------------------------|
155/// | `identity`        | both     | auto-seal / auto-open identity envelopes   |
156/// | `post_restore`    | target   | kick off channel re-bind replay            |
157/// | `pre_cleanup`     | source   | source-side Unsubscribe teardown           |
158/// | `readiness`       | target   | gate inbound migrations on runtime state   |
159/// | `failure`         | source   | surface structured reason to SDK caller    |
160#[derive(Default, Clone)]
161pub struct MigrationHandlerHooks {
162    /// Identity-transport context. `None` = envelopes ignored
163    /// (pre-identity-envelope fallback).
164    pub identity: Option<MigrationIdentityContext>,
165    /// Target-side post-restore callback — drives subscription
166    /// replay.
167    pub post_restore: Option<PostRestoreCallback>,
168    /// Source-side pre-cleanup callback — drives Unsubscribe
169    /// teardown.
170    pub pre_cleanup: Option<PreCleanupCallback>,
171    /// Target-side readiness predicate — returns `false` to reply
172    /// `NotReady` instead of attempting restore.
173    pub readiness: Option<ReadinessCallback>,
174    /// Source-side failure observer — surfaces structured reason
175    /// codes to the SDK.
176    pub failure: Option<FailureCallback>,
177}
178
179/// Dispatcher for migration subprotocol (`0x0500`) messages.
180///
181/// Wraps the three handler halves — orchestrator, source, target —
182/// plus the optional cross-cutting hooks that let the SDK drive
183/// identity-envelope seal/open, channel-re-bind replay,
184/// source-side Unsubscribe teardown, runtime-readiness gating, and
185/// source-side failure observation. Constructed by the SDK's
186/// `DaemonRuntime::start` via [`Self::with_hooks`]; tests use
187/// [`Self::new`] when they don't need any hooks.
188///
189/// Install onto a `MeshNode` via `MeshNode::set_migration_handler`.
190pub struct MigrationSubprotocolHandler {
191    orchestrator: Arc<MigrationOrchestrator>,
192    source_handler: Arc<MigrationSourceHandler>,
193    target_handler: Arc<MigrationTargetHandler>,
194    local_node_id: u64,
195    /// Per-target reassemblers for incoming snapshot chunks. One entry
196    /// per in-flight inbound migration. Lazily created on first chunk,
197    /// torn down after successful restore or failure.
198    ///
199    /// Wrapped in `Mutex` because `SnapshotReassembler::feed` requires
200    /// `&mut self`.
201    reassemblers: DashMap<u64, Mutex<SnapshotReassembler>>,
202    /// Identity-transport context. `None` = envelopes ignored
203    /// (pre-identity-envelope fallback).
204    identity_context: Option<MigrationIdentityContext>,
205    /// Post-restore callback, fired on the target side after
206    /// `restore_snapshot` succeeds. Used by the SDK to drive
207    /// subscription replay. `None` = no hook (used by tests and
208    /// pre-Stage-3 callers).
209    post_restore_callback: Option<PostRestoreCallback>,
210    /// Pre-cleanup callback, fired on the source side at
211    /// `CutoverNotify` handling just before the daemon is
212    /// unregistered. Drives source-side Unsubscribe teardown
213    /// (Stage 4 of the channel re-bind plan). `None` = no hook.
214    pre_cleanup_callback: Option<PreCleanupCallback>,
215    /// Target-side readiness predicate. When `Some` and returns
216    /// `false`, the dispatcher responds to inbound migration
217    /// attempts with `NotReady` instead of attempting restore.
218    /// Drives the runtime-readiness retry path
219    /// (`DAEMON_RUNTIME_READINESS_PLAN.md` Stage 3). `None` = always
220    /// treat target as ready (pre-Stage-3 behaviour).
221    readiness_callback: Option<ReadinessCallback>,
222    /// Source-side failure observer — fired when the dispatcher
223    /// receives an inbound `MigrationFailed` message. Lets the SDK
224    /// hand the structured reason to the caller via
225    /// [`MigrationHandle::wait`] rather than swallowing it at the
226    /// dispatcher layer. `None` = no hook; failures are still
227    /// processed (orchestrator aborted) but the SDK can't
228    /// distinguish retriable (NotReady) from terminal.
229    failure_callback: Option<FailureCallback>,
230}
231
232impl MigrationSubprotocolHandler {
233    /// Create a new handler with no hooks wired. Envelopes on
234    /// inbound snapshots are ignored; outbound snapshots don't
235    /// carry an envelope; readiness is treated as always-ready;
236    /// no failure observer; no channel-re-bind callbacks. Matches
237    /// the pre-Stage-5b behaviour and is the right shape for tests
238    /// that only need the bare three-handler dispatcher.
239    pub fn new(
240        orchestrator: Arc<MigrationOrchestrator>,
241        source_handler: Arc<MigrationSourceHandler>,
242        target_handler: Arc<MigrationTargetHandler>,
243        local_node_id: u64,
244    ) -> Self {
245        Self::with_hooks(
246            orchestrator,
247            source_handler,
248            target_handler,
249            local_node_id,
250            MigrationHandlerHooks::default(),
251        )
252    }
253
254    /// Create a handler with an explicit [`MigrationHandlerHooks`]
255    /// bundle. Each hook field is independently optional; the SDK
256    /// supplies all five at once from `DaemonRuntime::start`, tests
257    /// populate only the subset they need.
258    pub fn with_hooks(
259        orchestrator: Arc<MigrationOrchestrator>,
260        source_handler: Arc<MigrationSourceHandler>,
261        target_handler: Arc<MigrationTargetHandler>,
262        local_node_id: u64,
263        hooks: MigrationHandlerHooks,
264    ) -> Self {
265        Self {
266            orchestrator,
267            source_handler,
268            target_handler,
269            local_node_id,
270            reassemblers: DashMap::new(),
271            identity_context: hooks.identity,
272            post_restore_callback: hooks.post_restore,
273            pre_cleanup_callback: hooks.pre_cleanup,
274            readiness_callback: hooks.readiness,
275            failure_callback: hooks.failure,
276        }
277    }
278
279    /// Handle an inbound migration message.
280    ///
281    /// Returns zero or more outbound messages to send to other nodes.
282    pub fn handle_message(
283        &self,
284        data: &[u8],
285        from_node: u64,
286    ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
287        let msg = wire::decode(data)?;
288        self.dispatch(msg, from_node)
289    }
290
291    /// Dispatch a decoded message to the appropriate handler.
292    fn dispatch(
293        &self,
294        msg: MigrationMessage,
295        from_node: u64,
296    ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
297        let mut outbound = Vec::new();
298
299        match msg {
300            MigrationMessage::TakeSnapshot {
301                daemon_origin,
302                target_node,
303            } => {
304                // We are the source — take snapshot and reply.
305                // Record `from_node` as the orchestrator: it's the node
306                // that sent us TakeSnapshot, and replies (SnapshotReady,
307                // CleanupComplete) must reach it. The source-side handler
308                // stores this so subsequent replies don't drift if a
309                // future forwarding layer rewrites `from_node`.
310                let mut snapshot =
311                    self.source_handler
312                        .start_snapshot(daemon_origin, target_node, from_node)?;
313
314                // Identity envelope: if we have a transport context
315                // and can find the target's X25519 pubkey, seal the
316                // daemon's keypair into the snapshot before chunking
317                // so the target can reconstruct identity without an
318                // out-of-band pre-registration.
319                //
320                // Seal failure needs a wire reply, not just `?`. The
321                // orchestrator (remote) is blocked waiting for this
322                // node's `SnapshotReady`; bailing with a dispatcher
323                // error would leave it waiting forever and leave our
324                // own `source_handler.start_snapshot` state dangling
325                // for `daemon_origin`. Convert to a `MigrationFailed`
326                // reply — the orchestrator consumes it, the SDK
327                // surfaces the reason, retry semantics kick in if
328                // applicable.
329                snapshot = match self.maybe_seal_envelope(snapshot, daemon_origin, target_node) {
330                    Ok(s) => s,
331                    Err(e) => {
332                        let _ = self.source_handler.abort(daemon_origin);
333                        let reason =
334                            crate::adapter::net::compute::MigrationFailureReason::StateFailed(
335                                format!("identity envelope seal failed: {e}"),
336                            );
337                        outbound.push(OutboundMigrationMessage {
338                            dest_node: from_node,
339                            payload: wire::encode(&MigrationMessage::MigrationFailed {
340                                daemon_origin,
341                                reason,
342                            })?,
343                        });
344                        return Ok(outbound);
345                    }
346                };
347
348                // Chunk the snapshot for transport
349                //
350                // Oversized state/bindings surfaces as a
351                // `MigrationFailed` reply (StateFailed) rather than
352                // a panic in the dispatch task. The orchestrator
353                // consumes the reply and retry semantics kick in,
354                // the same shape as `maybe_seal_envelope` failure
355                // above.
356                let snapshot_bytes = match snapshot.try_to_bytes() {
357                    Ok(b) => b,
358                    Err(e) => {
359                        let _ = self.source_handler.abort(daemon_origin);
360                        let reason =
361                            crate::adapter::net::compute::MigrationFailureReason::StateFailed(
362                                format!("snapshot serialization failed: {e}"),
363                            );
364                        outbound.push(OutboundMigrationMessage {
365                            dest_node: from_node,
366                            payload: wire::encode(&MigrationMessage::MigrationFailed {
367                                daemon_origin,
368                                reason,
369                            })?,
370                        });
371                        return Ok(outbound);
372                    }
373                };
374                let chunks = crate::adapter::net::compute::orchestrator::chunk_snapshot(
375                    daemon_origin,
376                    snapshot_bytes,
377                    snapshot.through_seq,
378                )?;
379                let orch = self
380                    .source_handler
381                    .orchestrator_node(daemon_origin)
382                    .unwrap_or(from_node);
383                for chunk in chunks {
384                    outbound.push(OutboundMigrationMessage {
385                        dest_node: orch,
386                        payload: wire::encode(&chunk)?,
387                    });
388                }
389            }
390
391            MigrationMessage::SnapshotReady {
392                daemon_origin,
393                snapshot_bytes,
394                seq_through,
395                chunk_index,
396                total_chunks,
397            } => {
398                // Peer-auth gate. SnapshotReady is always source→
399                // {orchestrator,target}. The orchestrator records
400                // source_node at start_migration time; the target's
401                // local record (if any) keeps the orchestrator
402                // binding from its own start path. Reject if the
403                // sender doesn't match the recorded principal for
404                // whichever role we are in.
405                //
406                // Third-tier fallback closes a TOFU window: when
407                // neither orchestrator-side nor target-side state
408                // has a record (the orchestrator lives on a remote
409                // node and we've not yet received any messages for
410                // this origin), the first `SnapshotReady` would
411                // otherwise bind whoever sent it as the orchestrator
412                // inside `restore_on_target`. Operators who know
413                // the orchestrator out-of-band can pre-bind it via
414                // `DaemonFactoryRegistry::bind_expected_orchestrator`;
415                // when bound, a mismatching sender is rejected
416                // here, before `restore_on_target` records them.
417                if let Some(expected) = self.orchestrator.source_node(daemon_origin) {
418                    if expected != from_node {
419                        return Err(MigrationError::WrongPeer {
420                            daemon_origin,
421                            from: from_node,
422                            expected,
423                        });
424                    }
425                } else if let Some(expected) = self.target_handler.orchestrator_node(daemon_origin)
426                {
427                    if expected != from_node {
428                        return Err(MigrationError::WrongPeer {
429                            daemon_origin,
430                            from: from_node,
431                            expected,
432                        });
433                    }
434                } else if let Some(expected) = self
435                    .target_handler
436                    .factories()
437                    .expected_orchestrator(daemon_origin)
438                {
439                    if expected != from_node {
440                        return Err(MigrationError::WrongPeer {
441                            daemon_origin,
442                            from: from_node,
443                            expected,
444                        });
445                    }
446                }
447                // If the orchestrator is local, let it record this chunk and
448                // forward to target. `target_node` identifies where the
449                // snapshot should be restored; if that's us, we reassemble
450                // and restore instead of forwarding.
451                let orch_target = self.orchestrator.target_node(daemon_origin);
452
453                match orch_target {
454                    Some(target) if target == self.local_node_id => {
455                        // We are the target — advance orchestrator state
456                        // (safe: on_snapshot_ready is idempotent on the
457                        // target side because it just re-derives the
458                        // forward message we ignore), then reassemble.
459                        //
460                        // Errors are informational: the restore path below
461                        // is the authoritative check for whether the
462                        // snapshot is usable. We log at `debug` rather
463                        // than ignore so non-idempotent failures (e.g.,
464                        // unexpected phase transitions on a stale record)
465                        // are observable during triage.
466                        if let Err(e) = self.orchestrator.on_snapshot_ready(
467                            daemon_origin,
468                            snapshot_bytes.clone(),
469                            seq_through,
470                            chunk_index,
471                            total_chunks,
472                        ) {
473                            tracing::debug!(
474                                ?e,
475                                origin = format!("{:#x}", daemon_origin),
476                                "on_snapshot_ready (local target): ignored"
477                            );
478                        }
479
480                        if let Some(out) = self.restore_on_target(
481                            daemon_origin,
482                            snapshot_bytes,
483                            seq_through,
484                            chunk_index,
485                            total_chunks,
486                            from_node,
487                        )? {
488                            outbound.extend(out);
489                        }
490                    }
491                    Some(target) => {
492                        // Middle of the chain (or orchestrator node forwarding
493                        // to a remote target). Let the orchestrator update its
494                        // own phase state and emit the forward.
495                        let forward = self.orchestrator.on_snapshot_ready(
496                            daemon_origin,
497                            snapshot_bytes,
498                            seq_through,
499                            chunk_index,
500                            total_chunks,
501                        )?;
502                        if let MigrationMessage::SnapshotReady { .. } = &forward {
503                            outbound.push(OutboundMigrationMessage {
504                                dest_node: target,
505                                payload: wire::encode(&forward)?,
506                            });
507                        }
508                    }
509                    None => {
510                        // No local migration record — this node may be a
511                        // target that has no orchestrator-side state (the
512                        // orchestrator lives on a different node). Try to
513                        // restore anyway; the factory registry is the
514                        // authority on whether this node should accept.
515                        if let Some(out) = self.restore_on_target(
516                            daemon_origin,
517                            snapshot_bytes,
518                            seq_through,
519                            chunk_index,
520                            total_chunks,
521                            from_node,
522                        )? {
523                            outbound.extend(out);
524                        }
525                    }
526                }
527            }
528
529            MigrationMessage::RestoreComplete {
530                daemon_origin,
531                restored_seq,
532            } => {
533                // Target has restored — orchestrator may send buffered events.
534                // If there are no buffered events, send an empty BufferedEvents
535                // anyway: the target's reply (ReplayComplete) is what drives
536                // the chain forward into Cutover. Dropping the message here
537                // would stall any migration whose source never buffered.
538                let buffered_msg = self
539                    .orchestrator
540                    .on_restore_complete(daemon_origin, restored_seq)?
541                    .unwrap_or(MigrationMessage::BufferedEvents {
542                        daemon_origin,
543                        events: Vec::new(),
544                    });
545                outbound.push(OutboundMigrationMessage {
546                    dest_node: from_node, // send back to target
547                    payload: wire::encode(&buffered_msg)?,
548                });
549            }
550
551            MigrationMessage::ReplayComplete {
552                daemon_origin,
553                replayed_seq,
554                target_head,
555            } => {
556                // Target finished replay — orchestrator initiates cutover
557                let cutover_msg = self.orchestrator.on_replay_complete(
558                    daemon_origin,
559                    replayed_seq,
560                    target_head,
561                )?;
562
563                // Send CutoverNotify to source (from_node is the target that reported)
564                if let MigrationMessage::CutoverNotify { .. } = &cutover_msg {
565                    let source_node = self
566                        .orchestrator
567                        .source_node(daemon_origin)
568                        .unwrap_or(from_node);
569
570                    outbound.push(OutboundMigrationMessage {
571                        dest_node: source_node,
572                        payload: wire::encode(&cutover_msg)?,
573                    });
574                }
575            }
576
577            MigrationMessage::CutoverNotify {
578                daemon_origin,
579                target_node,
580            } => {
581                // We are the source — stop accepting writes.
582                //
583                // `on_cutover` returns `DaemonNotFound` if this node didn't
584                // handle a `TakeSnapshot` (the orchestrator took the snapshot
585                // locally and never involved `source_handler`). Treat that as
586                // "no buffered events to drain" rather than a hard error so
587                // local-source migrations can still reach cleanup.
588                let final_events = match self.source_handler.on_cutover(daemon_origin) {
589                    Ok(events) => events,
590                    Err(MigrationError::DaemonNotFound(_)) => Vec::new(),
591                    Err(e) => return Err(e),
592                };
593
594                // If there are last-moment events, send them to target
595                if !final_events.is_empty() {
596                    let events_msg = MigrationMessage::BufferedEvents {
597                        daemon_origin,
598                        events: final_events,
599                    };
600                    outbound.push(OutboundMigrationMessage {
601                        dest_node: target_node,
602                        payload: wire::encode(&events_msg)?,
603                    });
604                }
605
606                // Acknowledge cutover to the local orchestrator. When the
607                // orchestrator lives on a different node, this local call
608                // has no record to advance; the remote orchestrator learns
609                // about cutover from `CleanupComplete`, which does the same
610                // phase advance there.
611                match self.orchestrator.on_cutover_acknowledged(daemon_origin) {
612                    Ok(()) => {}
613                    Err(MigrationError::DaemonNotFound(_)) => {}
614                    Err(e) => return Err(e),
615                }
616
617                // Capture the orchestrator BEFORE `cleanup()` clears the
618                // source-side migration record — once it's gone,
619                // `orchestrator_node()` returns None and we'd silently
620                // fall back to `from_node`, defeating the whole point of
621                // recording the orchestrator at `start_snapshot` time.
622                let dest = self
623                    .source_handler
624                    .orchestrator_node(daemon_origin)
625                    .unwrap_or(from_node);
626
627                // Fire the pre-cleanup callback BEFORE unregistering
628                // the daemon — the host still holds the subscription
629                // ledger, which the SDK's hook snapshots here so it
630                // can send `Unsubscribe` messages to every publisher
631                // after cleanup. This is Stage 4 of the channel
632                // re-bind plan: without it, the publishers' rosters
633                // keep pointing at the source until their session
634                // timeout (~30 s), causing duplicate deliveries to
635                // a now-gone daemon and unnecessary bandwidth.
636                if let Some(cb) = &self.pre_cleanup_callback {
637                    cb(daemon_origin);
638                }
639
640                // Cleanup source. No-op if this node never authored the
641                // migration (e.g. a replayed CutoverNotify after the
642                // record was already cleared); only an authored
643                // migration in Cutover phase actually unregisters the
644                // local daemon. A forged CutoverNotify for an origin
645                // we never migrated leaves the local daemon untouched.
646                let _ = self.source_handler.cleanup(daemon_origin);
647
648                let cleanup_msg = MigrationMessage::CleanupComplete { daemon_origin };
649                outbound.push(OutboundMigrationMessage {
650                    dest_node: dest,
651                    payload: wire::encode(&cleanup_msg)?,
652                });
653            }
654
655            MigrationMessage::CleanupComplete { daemon_origin } => {
656                // Peer-auth gate. CleanupComplete is source→
657                // orchestrator. Without binding, a forged
658                // CleanupComplete from any peer makes the
659                // orchestrator emit ActivateTarget to a target that
660                // hasn't necessarily finished restore.
661                if let Some(expected) = self.orchestrator.source_node(daemon_origin) {
662                    if expected != from_node {
663                        return Err(MigrationError::WrongPeer {
664                            daemon_origin,
665                            from: from_node,
666                            expected,
667                        });
668                    }
669                }
670                // Source reports its cleanup done. The orchestrator now
671                // tells the target to activate.
672                let activate = self.orchestrator.on_cleanup_complete(daemon_origin)?;
673                // Route the ActivateTarget to whichever node is the target.
674                let target = self
675                    .orchestrator
676                    .target_node(daemon_origin)
677                    .unwrap_or(from_node);
678                outbound.push(OutboundMigrationMessage {
679                    dest_node: target,
680                    payload: wire::encode(&activate)?,
681                });
682            }
683
684            MigrationMessage::ActivateTarget { daemon_origin } => {
685                // Peer-auth gate. ActivateTarget is orchestrator→
686                // target. Without binding, any peer with subprotocol
687                // reach forces the target to flip live while the
688                // source still believes it owns the daemon —
689                // divergent chain heads. The target_handler records
690                // the orchestrator at restore_snapshot time; reject
691                // unless from_node matches.
692                if let Some(expected) = self.target_handler.orchestrator_node(daemon_origin) {
693                    if expected != from_node {
694                        return Err(MigrationError::WrongPeer {
695                            daemon_origin,
696                            from: from_node,
697                            expected,
698                        });
699                    }
700                }
701                // We are the target — drain remaining events and go live.
702                // Retry-safe: `activate()` is idempotent once the migration
703                // has been completed, and we route the ack to the recorded
704                // orchestrator BEFORE `complete()` transitions state to the
705                // idempotency index. If the ack packet is lost, a retried
706                // ActivateTarget will find the completed record, return the
707                // same replayed_seq, and re-send the ack. The orchestrator
708                // therefore can't get wedged waiting for a completion that
709                // already happened.
710                let replayed_seq = self.target_handler.activate(daemon_origin)?;
711                let ack_dest = self
712                    .target_handler
713                    .orchestrator_node(daemon_origin)
714                    .unwrap_or(from_node);
715                let ack = MigrationMessage::ActivateAck {
716                    daemon_origin,
717                    replayed_seq,
718                };
719                outbound.push(OutboundMigrationMessage {
720                    dest_node: ack_dest,
721                    payload: wire::encode(&ack)?,
722                });
723                // `complete()` is idempotent: a retried ActivateTarget
724                // after a lost ack re-runs `activate()` (idempotent) and
725                // `complete()` (no-op once Complete).
726                let _ = self.target_handler.complete(daemon_origin);
727            }
728
729            MigrationMessage::ActivateAck {
730                daemon_origin,
731                replayed_seq,
732            } => {
733                // Target acknowledged — migration terminus on the
734                // orchestrator.
735                self.orchestrator
736                    .on_activate_ack(daemon_origin, replayed_seq)?;
737            }
738
739            MigrationMessage::MigrationFailed {
740                daemon_origin,
741                reason,
742            } => {
743                // Peer-auth gate. MigrationFailed can come from any
744                // participant (orchestrator, source, or target).
745                // Without binding, a forged MigrationFailed from any
746                // peer drives a rollback after a legitimate
747                // cutover. Accept only when from_node matches a
748                // recorded principal on at least one of the local
749                // handler views; if no record exists at all (e.g.,
750                // late-arriving for a migration already cleaned up),
751                // drop silently rather than abort phantom state.
752                let recorded = [
753                    self.orchestrator.source_node(daemon_origin),
754                    self.orchestrator.target_node(daemon_origin),
755                    self.source_handler.orchestrator_node(daemon_origin),
756                    self.target_handler.orchestrator_node(daemon_origin),
757                ];
758                let known = recorded.iter().any(|p| p.is_some());
759                if known && !recorded.contains(&Some(from_node)) {
760                    return Err(MigrationError::WrongPeer {
761                        daemon_origin,
762                        from: from_node,
763                        expected: recorded.iter().find_map(|p| *p).unwrap_or(0),
764                    });
765                }
766                // Fire the SDK's observer BEFORE abort, so the
767                // observer sees the structured reason while the
768                // migration record is still alive — the SDK uses
769                // this to surface NotReady vs terminal to the
770                // caller of `MigrationHandle::wait`.
771                if let Some(cb) = &self.failure_callback {
772                    cb(daemon_origin, reason.clone());
773                }
774                // Abort on all local handlers. This is correct for
775                // terminal reasons; for `NotReady` the SDK may
776                // elect to retry, which will re-`start_migration`
777                // from scratch (re-snapshotting on local source,
778                // re-sending TakeSnapshot on remote source).
779                let _ = self.source_handler.abort(daemon_origin);
780                let _ = self.target_handler.abort(daemon_origin);
781                let _ = self
782                    .orchestrator
783                    .abort_migration_with_reason(daemon_origin, reason);
784                // Also drop any partial reassembler we accumulated
785                // as the target. The local-source-failure path
786                // (`fail_migration_with_reason`, line ~1061)
787                // already clears this; an inbound `MigrationFailed`
788                // after the target had received some snapshot
789                // chunks would otherwise leave ~`chunk_size *
790                // chunks_received` bytes pinned in the DashMap
791                // forever (or until the same origin migrated again
792                // with a higher `seq_through`). With many ephemeral
793                // daemons this would be an unbounded leak.
794                self.reassemblers.remove(&daemon_origin);
795
796                // Cleanup completeness: neither `StandbyGroup` nor
797                // `CapabilityIndex` holds per-daemon
798                // migration-coupled state today.
799                //
800                //   * `StandbyGroup::promote` is synchronous —
801                //     either succeeds or rolls back atomically.
802                //     There is no "promotion in flight across
803                //     migration phases" state.
804                //   * `CapabilityIndex` indexes by `node_id`, not
805                //     by `daemon_origin` (verified by `grep -rn
806                //     daemon_origin src/adapter/net/behavior/
807                //     capability.rs` returning no matches).
808                //     Capabilities are node-level; failure of a
809                //     specific daemon's migration doesn't change
810                //     what the source node is advertising.
811                //
812                // So no additional teardown is needed today. If a
813                // future change adds per-daemon coupling in either
814                // subsystem, the regression test for this invariant
815                // fires loudly and the maintainer must wire
816                // teardown HERE.
817            }
818
819            MigrationMessage::BufferedEvents {
820                daemon_origin,
821                events,
822            } => {
823                // We are the target — replay events
824                let replayed_seq = self.target_handler.replay_events(daemon_origin, events)?;
825
826                // Ship the freshly-replayed daemon's chain head to
827                // the orchestrator so its `SuperpositionState`'s
828                // continuity-proof anchor is the real cryptographic
829                // head, not a synthetic placeholder the orchestrator
830                // would have to fabricate when it lives on a third
831                // node and has no local registry entry to read from.
832                let target_head = self.target_handler.host_head_link(daemon_origin)?;
833
834                // Tell orchestrator we're done replaying
835                let reply = MigrationMessage::ReplayComplete {
836                    daemon_origin,
837                    replayed_seq,
838                    target_head,
839                };
840                let dest = self
841                    .target_handler
842                    .orchestrator_node(daemon_origin)
843                    .unwrap_or(from_node);
844                outbound.push(OutboundMigrationMessage {
845                    dest_node: dest,
846                    payload: wire::encode(&reply)?,
847                });
848            }
849        }
850
851        Ok(outbound)
852    }
853
854    /// Feed a snapshot chunk into the target-side reassembler. When the
855    /// full snapshot is assembled, resolve a factory for the daemon and
856    /// call `restore_snapshot`, then emit `RestoreComplete` back to the
857    /// source node (`from_node`).
858    ///
859    /// Returns `Ok(None)` while waiting for more chunks, `Ok(Some(outbound))`
860    /// with the `RestoreComplete` (or a `MigrationFailed`) once restore has
861    /// been attempted.
862    fn restore_on_target(
863        &self,
864        daemon_origin: u64,
865        snapshot_bytes: Vec<u8>,
866        seq_through: u64,
867        chunk_index: u32,
868        total_chunks: u32,
869        from_node: u64,
870    ) -> Result<Option<Vec<OutboundMigrationMessage>>, MigrationError> {
871        let reassembler_entry = self
872            .reassemblers
873            .entry(daemon_origin)
874            .or_insert_with(|| Mutex::new(SnapshotReassembler::new()));
875
876        let assembled = {
877            let mut reassembler = reassembler_entry.lock();
878            reassembler
879                .feed(
880                    daemon_origin,
881                    snapshot_bytes,
882                    seq_through,
883                    chunk_index,
884                    total_chunks,
885                )
886                .map_err(|e| {
887                    MigrationError::StateFailed(format!("snapshot reassembly failed: {:?}", e))
888                })?
889        };
890        drop(reassembler_entry); // release DashMap read lock
891
892        let assembled_bytes = match assembled {
893            Some(bytes) => bytes,
894            None => return Ok(None), // still waiting for more chunks
895        };
896
897        // Drop the reassembler entry now that we've completed.
898        self.reassemblers.remove(&daemon_origin);
899
900        // Parse the snapshot. A parse failure is a hard migration failure.
901        let snapshot = match StateSnapshot::from_bytes(&assembled_bytes) {
902            Some(s) => s,
903            None => {
904                return Ok(Some(self.fail_migration(
905                    daemon_origin,
906                    from_node,
907                    "failed to parse snapshot bytes on target",
908                )?));
909            }
910        };
911
912        // `source_node` is the daemon's pre-migration host — tracked here
913        // only for the target-handler's internal bookkeeping. It is NOT
914        // where `RestoreComplete` gets sent (see below).
915        let source_node = self
916            .orchestrator
917            .source_node(daemon_origin)
918            .unwrap_or(from_node);
919
920        // If this origin is already under migration here, the source is
921        // retrying because our earlier `RestoreComplete` didn't make it
922        // back. Don't touch the already-restored daemon; just re-emit
923        // `RestoreComplete` so the orchestrator can advance. This also
924        // means we DO NOT consume the factory on the retry — the factory
925        // registration must survive until the migration is truly complete
926        // (`ActivateAck`), not just until the first locally-successful
927        // restore.
928        if !self.target_handler.is_migrating(daemon_origin) {
929            // Readiness check first: if the runtime is still in
930            // `Registering`, respond `NotReady` so the source can
931            // retry with backoff rather than burning the attempt
932            // on a target that's still booting.
933            if let Some(readiness) = &self.readiness_callback {
934                if !readiness() {
935                    return Ok(Some(self.fail_migration_with_reason(
936                        daemon_origin,
937                        from_node,
938                        crate::adapter::net::compute::MigrationFailureReason::NotReady,
939                    )?));
940                }
941            }
942
943            let inputs = match self.target_handler.factories().construct(daemon_origin) {
944                Some(i) => i,
945                None => {
946                    return Ok(Some(self.fail_migration_with_reason(
947                        daemon_origin,
948                        from_node,
949                        crate::adapter::net::compute::MigrationFailureReason::FactoryNotFound,
950                    )?));
951                }
952            };
953
954            // Identity envelope: if the snapshot carries one and we
955            // have the X25519 private key to unseal it, the envelope
956            // supplies the real daemon keypair. Otherwise fall back
957            // to whatever keypair the factory was registered with —
958            // which, for public-identity migrations or pre-Stage-5b
959            // callers, is either a placeholder or a manually-shared
960            // keypair. A present-but-invalid envelope is a hard
961            // failure, not a fallback — otherwise an attacker who
962            // tampers with the envelope could downgrade identity
963            // transport silently.
964            let keypair = match self.resolve_restore_keypair(&snapshot, inputs.keypair.as_ref()) {
965                Ok(kp) => kp,
966                Err(e) => {
967                    return Ok(Some(self.fail_migration(
968                        daemon_origin,
969                        from_node,
970                        &format!("identity envelope open failed: {e}"),
971                    )?));
972                }
973            };
974
975            let daemon = inputs.daemon;
976            if let Err(e) = self.target_handler.restore_snapshot(
977                RestoreContext {
978                    daemon_origin,
979                    snapshot: &snapshot,
980                    source_node,
981                    // orchestrator: whoever forwarded SnapshotReady to us
982                    orchestrator_node: from_node,
983                },
984                keypair,
985                move || daemon,
986                inputs.config,
987            ) {
988                // Factory is still registered — next `SnapshotReady` for
989                // this origin (e.g., from an orchestrator retry) can try
990                // again. On successful completion (`complete()`), the
991                // factory is auto-removed so a stale or replayed
992                // SnapshotReady can't re-trigger restore against what is
993                // already the authoritative copy.
994                return Ok(Some(self.fail_migration(
995                    daemon_origin,
996                    from_node,
997                    &format!("restore_snapshot failed: {:?}", e),
998                )?));
999            }
1000
1001            // Fire the post-restore callback. The SDK-supplied hook
1002            // drives channel re-bind replay (Stage 3 of the channel
1003            // re-bind plan): walks the restored daemon's ledger and
1004            // spawns async `subscribe_channel` calls so publishers
1005            // start fanning out to the target before the source
1006            // tears down. Sync callback; the hook itself should
1007            // `tokio::spawn` the actual work.
1008            if let Some(cb) = &self.post_restore_callback {
1009                cb(daemon_origin);
1010            }
1011        }
1012
1013        // Route `RestoreComplete` to the recorded orchestrator. Only the
1014        // orchestrator holds the migration record; sending to a relay
1015        // would stall the state machine. `from_node` is used as a
1016        // fallback when the target-side record has been lost (e.g. a
1017        // very late chunk after the migration record timed out).
1018        let reply = MigrationMessage::RestoreComplete {
1019            daemon_origin,
1020            restored_seq: seq_through,
1021        };
1022        let dest = self
1023            .target_handler
1024            .orchestrator_node(daemon_origin)
1025            .unwrap_or(from_node);
1026        Ok(Some(vec![OutboundMigrationMessage {
1027            dest_node: dest,
1028            payload: wire::encode(&reply)?,
1029        }]))
1030    }
1031
1032    /// Source-side helper: if we have an identity-transport context
1033    /// and the target's X25519 pubkey is available, seal the
1034    /// daemon's keypair into the snapshot.
1035    ///
1036    /// Resolution:
1037    /// - **Prerequisite missing** (no context, target key not known,
1038    ///   or daemon keypair absent from the local registry): return
1039    ///   the snapshot unchanged. This is the legitimate public-
1040    ///   identity / NKpsk0-responder fallback — the target is
1041    ///   expected to have a pre-registered keypair.
1042    /// - **Prerequisites met, seal crypto succeeds**: return the
1043    ///   sealed snapshot.
1044    /// - **Prerequisites met, seal crypto fails**: fail the
1045    ///   migration. Silently downgrading to unsealed transport would
1046    ///   break the identity-transport guarantee the caller installed
1047    ///   `identity_context` to obtain — the target would restore
1048    ///   using whatever fallback keypair the factory registry carries
1049    ///   (possibly nothing, possibly a stale mismatch), and any later
1050    ///   signature the daemon produces on the target would be bound
1051    ///   to the wrong identity. The only honest response is to abort.
1052    fn maybe_seal_envelope(
1053        &self,
1054        snapshot: StateSnapshot,
1055        daemon_origin: u64,
1056        target_node: u64,
1057    ) -> Result<StateSnapshot, MigrationError> {
1058        let Some(ctx) = &self.identity_context else {
1059            return Ok(snapshot);
1060        };
1061        // Skip if the snapshot already carries an envelope (e.g. the
1062        // SDK pre-sealed at `start_migration` time for a local-source
1063        // case).
1064        if snapshot.identity_envelope.is_some() {
1065            return Ok(snapshot);
1066        }
1067        let Some(target_pub) = (ctx.peer_static_lookup)(target_node) else {
1068            return Ok(snapshot);
1069        };
1070        // Find the daemon's keypair in the local registry. The
1071        // orchestrator + source_handler + target_handler share one
1072        // registry, so whichever owns this daemon, we see it.
1073        let kp = match self
1074            .source_handler_registry_keypair(daemon_origin)
1075            .or_else(|| self.target_handler_registry_keypair(daemon_origin))
1076        {
1077            Some(kp) => kp,
1078            None => return Ok(snapshot),
1079        };
1080        snapshot
1081            .with_identity_envelope(&kp, target_pub)
1082            .map_err(|e| {
1083                MigrationError::StateFailed(format!(
1084                    "identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
1085                ))
1086            })
1087    }
1088
1089    /// Read-only keypair fetch from the shared daemon registry
1090    /// reachable via `source_handler`. `source_handler` and
1091    /// `target_handler` hold `Arc` clones of the same registry in
1092    /// typical wiring, so checking via source is sufficient; the
1093    /// `target_handler_registry_keypair` fallback exists for
1094    /// asymmetric setups where they diverge.
1095    fn source_handler_registry_keypair(&self, daemon_origin: u64) -> Option<EntityKeypair> {
1096        let _ = daemon_origin;
1097        // `MigrationSourceHandler` doesn't expose the registry
1098        // publicly, so reach through the orchestrator which shares
1099        // the same `Arc<DaemonRegistry>`.
1100        self.orchestrator
1101            .daemon_registry()
1102            .daemon_keypair(daemon_origin)
1103    }
1104
1105    fn target_handler_registry_keypair(&self, daemon_origin: u64) -> Option<EntityKeypair> {
1106        // Parallel path — the target-side registry may in some
1107        // configurations be distinct. Today it's the same `Arc`, so
1108        // this returns the same value as the source path; kept as a
1109        // seam.
1110        self.orchestrator
1111            .daemon_registry()
1112            .daemon_keypair(daemon_origin)
1113    }
1114
1115    /// Target-side helper: pick the keypair to hand to
1116    /// `restore_snapshot`. Resolution order:
1117    ///
1118    /// 1. If the snapshot carries an identity envelope AND we have
1119    ///    the X25519 private key to unseal it → use the envelope's
1120    ///    keypair. (Non-envelope cases fall through.)
1121    /// 2. Otherwise, if `fallback` was provided — the factory was
1122    ///    registered via `DaemonFactoryRegistry::register` with a
1123    ///    pre-provisioned keypair — use that.
1124    /// 3. If neither is available (placeholder registration +
1125    ///    no envelope in the snapshot), fail: a placeholder factory
1126    ///    expects the envelope to supply the keypair, and its
1127    ///    absence means the source skipped identity transport
1128    ///    without the target being prepared for that.
1129    ///
1130    /// Once an envelope is **present** on the snapshot, envelope
1131    /// transport is mandatory — the fallback keypair is NEVER used
1132    /// on that path. Present-but-invalid envelopes are terminal
1133    /// (propagating the envelope error rather than falling back
1134    /// prevents an attacker from downgrading identity transport by
1135    /// tampering with the envelope bytes), and a misbehaving
1136    /// `unseal_snapshot` that returns `Ok(None)` despite an
1137    /// envelope being present is treated as a terminal error for
1138    /// the same reason: silently falling through to the
1139    /// pre-provisioned keypair would defeat the identity-transport
1140    /// guarantee callers installed the context to obtain.
1141    fn resolve_restore_keypair(
1142        &self,
1143        snapshot: &StateSnapshot,
1144        fallback: Option<&EntityKeypair>,
1145    ) -> Result<EntityKeypair, String> {
1146        if let (Some(ctx), Some(_)) = (&self.identity_context, &snapshot.identity_envelope) {
1147            // The private key stays inside the closure owned by
1148            // `ctx.unseal_snapshot` — the dispatcher never sees it.
1149            //
1150            // Both `Ok(None)` and `Err` terminate resolution. The
1151            // envelope has been attached to the snapshot, so we've
1152            // already committed to envelope transport; falling back
1153            // to the pre-provisioned keypair here would silently
1154            // downgrade a migration the caller asked to carry
1155            // identity. A conforming `unseal_snapshot` returns
1156            // `Ok(Some(_))` or `Err(_)` when handed a snapshot with
1157            // a present envelope — `Ok(None)` indicates a broken
1158            // unsealer and must not mask the breakage.
1159            return match (ctx.unseal_snapshot)(snapshot) {
1160                Ok(Some(kp)) => Ok(kp),
1161                Ok(None) => Err("identity envelope present on snapshot but \
1162                     `unseal_snapshot` returned Ok(None) — refusing to \
1163                     fall back to the pre-provisioned keypair; a \
1164                     present envelope mandates envelope-sourced \
1165                     identity transport"
1166                    .to_string()),
1167                Err(e) => Err(format!("{e}")),
1168            };
1169        }
1170        fallback.cloned().ok_or_else(|| {
1171            "placeholder factory registered but snapshot has no \
1172             identity envelope (and no local fallback keypair available)"
1173                .to_string()
1174        })
1175    }
1176
1177    /// Build a `MigrationFailed` outbound message and clean up local state.
1178    /// Convenience wrapper that wraps `reason` in
1179    /// [`MigrationFailureReason::StateFailed`] for generic failures;
1180    /// callers that need a specific reason code (e.g. `NotReady`,
1181    /// `FactoryNotFound`) should use
1182    /// [`Self::fail_migration_with_reason`].
1183    fn fail_migration(
1184        &self,
1185        daemon_origin: u64,
1186        from_node: u64,
1187        reason: &str,
1188    ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
1189        self.fail_migration_with_reason(
1190            daemon_origin,
1191            from_node,
1192            crate::adapter::net::compute::MigrationFailureReason::StateFailed(reason.to_string()),
1193        )
1194    }
1195
1196    /// Build a `MigrationFailed` outbound message with a structured
1197    /// reason. Clean-up is the same as [`Self::fail_migration`]: the
1198    /// reassembler entry + target-handler state are dropped so a
1199    /// retry from the source can start fresh (unless the reason is
1200    /// `FactoryNotFound` — a retry won't find what isn't there).
1201    fn fail_migration_with_reason(
1202        &self,
1203        daemon_origin: u64,
1204        from_node: u64,
1205        reason: crate::adapter::net::compute::MigrationFailureReason,
1206    ) -> Result<Vec<OutboundMigrationMessage>, MigrationError> {
1207        tracing::warn!(
1208            daemon_origin = format!("{:#x}", daemon_origin),
1209            reason = %reason,
1210            "migration failed on target",
1211        );
1212        self.reassemblers.remove(&daemon_origin);
1213        let _ = self.target_handler.abort(daemon_origin);
1214        let msg = MigrationMessage::MigrationFailed {
1215            daemon_origin,
1216            reason,
1217        };
1218        Ok(vec![OutboundMigrationMessage {
1219            dest_node: from_node,
1220            payload: wire::encode(&msg)?,
1221        }])
1222    }
1223
1224    /// Get a reference to the orchestrator.
1225    pub fn orchestrator(&self) -> &Arc<MigrationOrchestrator> {
1226        &self.orchestrator
1227    }
1228
1229    /// Get a reference to the source handler.
1230    pub fn source_handler(&self) -> &Arc<MigrationSourceHandler> {
1231        &self.source_handler
1232    }
1233
1234    /// Get a reference to the target handler.
1235    pub fn target_handler(&self) -> &Arc<MigrationTargetHandler> {
1236        &self.target_handler
1237    }
1238}
1239
1240impl std::fmt::Debug for MigrationSubprotocolHandler {
1241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1242        f.debug_struct("MigrationSubprotocolHandler")
1243            .field("local_node_id", &format!("{:#x}", self.local_node_id))
1244            .finish()
1245    }
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250    use super::*;
1251    use crate::adapter::net::behavior::capability::CapabilityFilter;
1252    use crate::adapter::net::compute::orchestrator::wire;
1253    use crate::adapter::net::compute::{
1254        DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
1255        MigrationOrchestrator, MigrationSourceHandler, MigrationTargetHandler,
1256    };
1257    use crate::adapter::net::identity::EntityKeypair;
1258    use crate::adapter::net::state::causal::CausalEvent;
1259    use bytes::Bytes;
1260
1261    /// Regression (Cubic-AI P1: leaking Noise static private key):
1262    /// `MigrationIdentityContext` previously exposed
1263    /// `pub local_x25519_priv: [u8; 32]`, which meant any SDK caller
1264    /// holding a context (or calling the now-removed
1265    /// `MeshNode::static_x25519_priv`) could copy the node's
1266    /// long-term identity key out.
1267    ///
1268    /// The fix moves the key into an `unseal_snapshot` closure owned
1269    /// by the context, with the raw bytes never reachable as a
1270    /// readable field. This test pins the struct's size so a
1271    /// blind re-add of a `[u8; 32]` or similar secret-bearing field
1272    /// trips the canary.
1273    ///
1274    /// Two `Arc<dyn Fn ...>` are fat pointers — two `usize`s each —
1275    /// so the context is `4 * size_of::<usize>()`. If someone adds a
1276    /// 32-byte key field, size jumps to that + 32 and this assertion
1277    /// fails. Not a true API-surface guard (PR review is the real
1278    /// guard) but an honest canary for the specific regression.
1279    #[test]
1280    fn migration_identity_context_has_no_plaintext_secret_field_regression() {
1281        use std::mem::size_of;
1282        let fat_ptr = 2 * size_of::<usize>();
1283        assert_eq!(
1284            size_of::<MigrationIdentityContext>(),
1285            2 * fat_ptr,
1286            "MigrationIdentityContext must stay two Arc<dyn Fn ...> and \
1287             nothing else — a size change means a new field was added, \
1288             most likely re-exposing the Noise static private key the \
1289             way `local_x25519_priv: [u8; 32]` used to.",
1290        );
1291    }
1292
1293    struct TestDaemon {
1294        value: u64,
1295    }
1296
1297    impl MeshDaemon for TestDaemon {
1298        fn name(&self) -> &str {
1299            "test"
1300        }
1301        fn requirements(&self) -> CapabilityFilter {
1302            CapabilityFilter::default()
1303        }
1304        fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
1305            self.value += 1;
1306            Ok(vec![])
1307        }
1308        fn snapshot(&self) -> Option<Bytes> {
1309            Some(Bytes::from(self.value.to_le_bytes().to_vec()))
1310        }
1311        fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
1312            self.value = u64::from_le_bytes(state[..8].try_into().unwrap());
1313            Ok(())
1314        }
1315    }
1316
1317    fn setup() -> (MigrationSubprotocolHandler, Arc<DaemonRegistry>, u64) {
1318        let reg = Arc::new(DaemonRegistry::new());
1319        let kp = EntityKeypair::generate();
1320        let origin = kp.origin_hash();
1321        let host = DaemonHost::new(
1322            Box::new(TestDaemon { value: 100 }),
1323            kp,
1324            DaemonHostConfig::default(),
1325        );
1326        reg.register(host).unwrap();
1327
1328        let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1329        let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1330        let target = Arc::new(MigrationTargetHandler::new(reg.clone()));
1331
1332        let handler = MigrationSubprotocolHandler::new(orch, source, target, 0x1111);
1333        (handler, reg, origin)
1334    }
1335
1336    #[test]
1337    fn test_handle_take_snapshot() {
1338        let (handler, _reg, origin) = setup();
1339
1340        let msg = MigrationMessage::TakeSnapshot {
1341            daemon_origin: origin,
1342            target_node: 0x2222,
1343        };
1344        let encoded = wire::encode(&msg).unwrap();
1345
1346        let outbound = handler.handle_message(&encoded, 0x3333).unwrap();
1347        assert!(!outbound.is_empty());
1348
1349        // Should get SnapshotReady back
1350        let reply = wire::decode(&outbound[0].payload).unwrap();
1351        match reply {
1352            MigrationMessage::SnapshotReady { daemon_origin, .. } => {
1353                assert_eq!(daemon_origin, origin);
1354            }
1355            _ => panic!("expected SnapshotReady"),
1356        }
1357    }
1358
1359    /// Regression (Cubic-AI P2): `maybe_seal_envelope` used to
1360    /// swallow seal-crypto errors (e.g. a public-only source
1361    /// keypair) and return the **unsealed** snapshot to the caller,
1362    /// downgrading identity transport silently. Any target that
1363    /// relied on the envelope to supply the daemon keypair
1364    /// (`expect_migration` placeholder + no out-of-band keypair)
1365    /// would then fail to restore, or — worse — restore under a
1366    /// stale fallback keypair and produce mis-signed outputs.
1367    ///
1368    /// The fix makes the helper return `Result` and propagate seal
1369    /// failures as `MigrationError::StateFailed`. Callers abort
1370    /// rather than ship an unsealed snapshot they didn't ask for.
1371    ///
1372    /// This test stages the exact failure mode: register a daemon
1373    /// with a public-only `EntityKeypair`, install an identity
1374    /// context that successfully resolves the target's static, then
1375    /// ask `maybe_seal_envelope` to seal. The underlying crypto
1376    /// rejects (public-only can't sign the attestation) and the
1377    /// helper must surface it.
1378    #[test]
1379    fn maybe_seal_envelope_propagates_seal_failures() {
1380        use crate::adapter::net::identity::IdentityEnvelope;
1381        use crate::adapter::net::state::snapshot::StateSnapshot;
1382        use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1383
1384        // Target's X25519 static — arbitrary fresh key.
1385        let mut seed = [0u8; 32];
1386        getrandom::fill(&mut seed).unwrap();
1387        let target_priv = X25519Secret::from(seed);
1388        let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1389        let target_node_id: u64 = 0x2222;
1390
1391        // Daemon keypair: generate a real one for the entity_id,
1392        // then strip the signing half via `public_only`. The
1393        // registry will hand this out; the seal will try to sign
1394        // the attestation transcript and fail.
1395        let real_kp = EntityKeypair::generate();
1396        let origin_hash = real_kp.origin_hash();
1397        let public_only_kp = EntityKeypair::public_only(real_kp.entity_id().clone());
1398        assert!(
1399            public_only_kp.is_read_only(),
1400            "fixture: must be public-only",
1401        );
1402
1403        // Register a DaemonHost using the public-only keypair so
1404        // `source_handler_registry_keypair` → `daemon_keypair`
1405        // returns it. The daemon body is irrelevant.
1406        let reg = Arc::new(DaemonRegistry::new());
1407        let host = DaemonHost::new(
1408            Box::new(TestDaemon { value: 0 }),
1409            public_only_kp,
1410            DaemonHostConfig::default(),
1411        );
1412        reg.register(host).unwrap();
1413
1414        // Build a matching snapshot the normal way — same origin,
1415        // same entity_id.
1416        let snapshot = StateSnapshot {
1417            version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1418            entity_id: real_kp.entity_id().clone(),
1419            chain_link: crate::adapter::net::state::causal::CausalLink {
1420                origin_hash,
1421                horizon_encoded: 0,
1422                sequence: 0,
1423                parent_hash: 0,
1424            },
1425            through_seq: 0,
1426            state: Bytes::from_static(&[0u8; 8]),
1427            horizon: Default::default(),
1428            created_at: 0,
1429            bindings_bytes: Vec::new(),
1430            identity_envelope: None,
1431            head_payload: None,
1432        };
1433
1434        // Wire the identity context so `peer_static_lookup` returns
1435        // the target pub — i.e., every prerequisite is satisfied.
1436        // The unseal closure isn't exercised on the source path.
1437        let unseal_snapshot: EnvelopeUnsealFn =
1438            Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&target_priv));
1439        let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1440            Arc::new(move |nid| {
1441                if nid == target_node_id {
1442                    Some(target_pub)
1443                } else {
1444                    None
1445                }
1446            });
1447        let ctx = MigrationIdentityContext {
1448            unseal_snapshot,
1449            peer_static_lookup,
1450        };
1451
1452        let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1453        let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1454        let target = Arc::new(MigrationTargetHandler::new(reg));
1455        let handler = MigrationSubprotocolHandler::with_hooks(
1456            orch,
1457            source,
1458            target,
1459            0x1111,
1460            MigrationHandlerHooks {
1461                identity: Some(ctx),
1462                ..Default::default()
1463            },
1464        );
1465
1466        // With all prerequisites satisfied but crypto guaranteed to
1467        // fail (public-only keypair can't attest), the helper must
1468        // surface an error — not return the unsealed snapshot.
1469        let err = handler
1470            .maybe_seal_envelope(snapshot, origin_hash, target_node_id)
1471            .expect_err(
1472                "public-only daemon keypair must fail to seal; silently returning the \
1473                 unsealed snapshot breaks the identity-transport guarantee",
1474            );
1475        match err {
1476            MigrationError::StateFailed(ref msg) => {
1477                assert!(
1478                    msg.contains("envelope seal failed"),
1479                    "expected 'envelope seal failed' in message, got: {msg}",
1480                );
1481                assert!(
1482                    msg.contains(&format!("{origin_hash:#x}")),
1483                    "expected origin_hash in message, got: {msg}",
1484                );
1485            }
1486            other => panic!("expected StateFailed, got {other:?}"),
1487        }
1488
1489        // Belt-and-braces: the unsealed-fallback path (no context)
1490        // still works — proves this test isn't accidentally
1491        // asserting `maybe_seal_envelope` always errors.
1492        let handler_no_ctx = MigrationSubprotocolHandler::new(
1493            Arc::new(MigrationOrchestrator::new(
1494                Arc::new(DaemonRegistry::new()),
1495                0x1111,
1496            )),
1497            Arc::new(MigrationSourceHandler::new(Arc::new(DaemonRegistry::new()))),
1498            Arc::new(MigrationTargetHandler::new(Arc::new(DaemonRegistry::new()))),
1499            0x1111,
1500        );
1501        let snapshot2 = StateSnapshot {
1502            version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1503            entity_id: real_kp.entity_id().clone(),
1504            chain_link: crate::adapter::net::state::causal::CausalLink {
1505                origin_hash,
1506                horizon_encoded: 0,
1507                sequence: 0,
1508                parent_hash: 0,
1509            },
1510            through_seq: 0,
1511            state: Bytes::from_static(&[0u8; 8]),
1512            horizon: Default::default(),
1513            created_at: 0,
1514            bindings_bytes: Vec::new(),
1515            identity_envelope: None,
1516            head_payload: None,
1517        };
1518        let passthrough = handler_no_ctx
1519            .maybe_seal_envelope(snapshot2, origin_hash, target_node_id)
1520            .expect("no ctx = ok unchanged");
1521        assert!(passthrough.identity_envelope.is_none());
1522        let _ = IdentityEnvelope::new; // silence unused import
1523    }
1524
1525    /// Regression (Cubic-AI P1): seal failure inside the
1526    /// `TakeSnapshot` dispatcher path was propagated as a
1527    /// dispatcher error via `?`, leaving the source's
1528    /// `start_snapshot` record in place AND starving the remote
1529    /// orchestrator — it's waiting for a `SnapshotReady` that
1530    /// will never arrive.
1531    ///
1532    /// The fix converts seal failures into a `MigrationFailed`
1533    /// wire reply back to the orchestrator, aborts the local
1534    /// source-handler record, and returns the single-message
1535    /// outbound so the caller dispatches it normally.
1536    ///
1537    /// Test: construct a public-only daemon keypair (seal will
1538    /// fail at attestation), wire an identity context that
1539    /// surfaces the target's static, drive the handler with a
1540    /// `TakeSnapshot` message. Assert:
1541    /// 1. `handle_message` returns `Ok(outbound)` — no bubble-up.
1542    /// 2. The outbound contains exactly one `MigrationFailed`
1543    ///    addressed to the originator (`from_node`).
1544    /// 3. The `source_handler` no longer tracks this daemon
1545    ///    (abort ran).
1546    #[test]
1547    fn take_snapshot_seal_failure_emits_migration_failed_reply() {
1548        use crate::adapter::net::state::snapshot::StateSnapshot;
1549        use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1550
1551        // Target static for the context's peer lookup. The value
1552        // isn't exercised by the seal (it fails at attestation
1553        // first due to public-only keypair), but the context needs
1554        // a non-None for the lookup or it'd short-circuit before
1555        // hitting the seal at all.
1556        let mut x25519_seed = [0u8; 32];
1557        getrandom::fill(&mut x25519_seed).unwrap();
1558        let target_priv = X25519Secret::from(x25519_seed);
1559        let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1560        let target_node_id: u64 = 0x2222;
1561        let orchestrator_node_id: u64 = 0x3333;
1562
1563        // Daemon registered with a public-only keypair — seal's
1564        // attestation step needs the signing half, so this guarantees
1565        // `maybe_seal_envelope` returns Err once the seal runs.
1566        let real_kp = EntityKeypair::generate();
1567        let origin = real_kp.origin_hash();
1568        let public_only_kp = EntityKeypair::public_only(real_kp.entity_id().clone());
1569
1570        let reg = Arc::new(DaemonRegistry::new());
1571        let host = DaemonHost::new(
1572            Box::new(TestDaemon { value: 7 }),
1573            public_only_kp,
1574            DaemonHostConfig::default(),
1575        );
1576        reg.register(host).unwrap();
1577
1578        let unseal: EnvelopeUnsealFn =
1579            Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&target_priv));
1580        let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1581            Arc::new(move |nid| {
1582                if nid == target_node_id {
1583                    Some(target_pub)
1584                } else {
1585                    None
1586                }
1587            });
1588        let ctx = MigrationIdentityContext {
1589            unseal_snapshot: unseal,
1590            peer_static_lookup,
1591        };
1592
1593        let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1594        let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1595        let target = Arc::new(MigrationTargetHandler::new(reg));
1596        let handler = MigrationSubprotocolHandler::with_hooks(
1597            orch,
1598            source.clone(),
1599            target,
1600            0x1111,
1601            MigrationHandlerHooks {
1602                identity: Some(ctx),
1603                ..Default::default()
1604            },
1605        );
1606
1607        // Drive a `TakeSnapshot` from the fictional orchestrator.
1608        let msg = MigrationMessage::TakeSnapshot {
1609            daemon_origin: origin,
1610            target_node: target_node_id,
1611        };
1612        let encoded = wire::encode(&msg).unwrap();
1613        let outbound = handler
1614            .handle_message(&encoded, orchestrator_node_id)
1615            .expect("seal failure must not bubble up as dispatcher error");
1616
1617        // Exactly one message back, addressed to the orchestrator
1618        // that sent TakeSnapshot.
1619        assert_eq!(
1620            outbound.len(),
1621            1,
1622            "expected single MigrationFailed reply, got {} messages",
1623            outbound.len(),
1624        );
1625        assert_eq!(outbound[0].dest_node, orchestrator_node_id);
1626
1627        let reply = wire::decode(&outbound[0].payload).unwrap();
1628        match reply {
1629            MigrationMessage::MigrationFailed {
1630                daemon_origin,
1631                reason,
1632            } => {
1633                assert_eq!(daemon_origin, origin);
1634                let reason_msg = format!("{reason}");
1635                assert!(
1636                    reason_msg.contains("identity envelope seal failed"),
1637                    "expected seal-failure reason, got: {reason_msg}",
1638                );
1639            }
1640            other => panic!("expected MigrationFailed, got {other:?}"),
1641        }
1642
1643        // Source-handler record was aborted — the pre-fix code
1644        // left this in place indefinitely.
1645        assert!(
1646            source.phase(origin).is_none(),
1647            "source_handler must have cleared its record for {origin:#x} after a failed TakeSnapshot",
1648        );
1649    }
1650
1651    #[test]
1652    fn test_handle_migration_failed() {
1653        let (handler, _reg, origin) = setup();
1654
1655        let msg = MigrationMessage::MigrationFailed {
1656            daemon_origin: origin,
1657            reason: crate::adapter::net::compute::MigrationFailureReason::StateFailed(
1658                "test failure".into(),
1659            ),
1660        };
1661        let encoded = wire::encode(&msg).unwrap();
1662
1663        // Should not error — just cleans up
1664        let outbound = handler.handle_message(&encoded, 0x3333).unwrap();
1665        assert!(outbound.is_empty());
1666    }
1667
1668    /// Regression for a test that the SDK-level suite could not
1669    /// honestly exercise: when the factory registry carries a
1670    /// pre-provisioned **fallback keypair** AND the snapshot carries
1671    /// a **valid identity envelope**, the envelope's keypair must
1672    /// win. The SDK test that used to assert this could only register
1673    /// a fallback keyed by the envelope's own `origin_hash`, because
1674    /// `origin_hash` is derived from the keypair bytes — there's no
1675    /// way for a user-level API to supply a "wrong" keypair at a
1676    /// given `origin_hash`.
1677    ///
1678    /// This unit test reaches directly into `resolve_restore_keypair`
1679    /// with two genuinely-distinct keypairs and asserts the envelope
1680    /// overrides. If someone later flips the resolution order (e.g.
1681    /// preferring the fallback for some misguided "backward-
1682    /// compatibility" reason), this test trips.
1683    #[test]
1684    fn envelope_keypair_overrides_fallback_placeholder() {
1685        use crate::adapter::net::identity::IdentityEnvelope;
1686        use crate::adapter::net::state::causal::CausalLink;
1687        use crate::adapter::net::state::snapshot::StateSnapshot;
1688        use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1689
1690        // Target's Noise static X25519 keypair — used to seal and
1691        // then unseal the envelope.
1692        let mut seed = [0u8; 32];
1693        getrandom::fill(&mut seed).unwrap();
1694        let target_priv = X25519Secret::from(seed);
1695        let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1696
1697        // Real source-side daemon keypair: the one that should end
1698        // up being used for restore.
1699        let real_kp = EntityKeypair::generate();
1700        // Wrong fallback keypair: the one that would be used if
1701        // someone flipped the resolution order.
1702        let wrong_fallback = EntityKeypair::generate();
1703        assert_ne!(
1704            real_kp.entity_id(),
1705            wrong_fallback.entity_id(),
1706            "fixture: real and fallback must differ",
1707        );
1708
1709        let chain_link = CausalLink {
1710            origin_hash: real_kp.origin_hash(),
1711            horizon_encoded: 0,
1712            sequence: 0,
1713            parent_hash: 0,
1714        };
1715        let envelope =
1716            IdentityEnvelope::new(&real_kp, target_pub, &chain_link).expect("seal envelope");
1717
1718        // Snapshot carrying the envelope. The envelope's origin_hash
1719        // matches `real_kp`; the test doesn't need the rest of the
1720        // snapshot to validate, only the envelope-open path.
1721        let snapshot = StateSnapshot {
1722            version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1723            entity_id: real_kp.entity_id().clone(),
1724            chain_link,
1725            through_seq: 0,
1726            state: Bytes::new(),
1727            horizon: Default::default(),
1728            created_at: 0,
1729            bindings_bytes: Vec::new(),
1730            identity_envelope: Some(envelope),
1731            head_payload: None,
1732        };
1733
1734        // Build a handler with an identity_context whose unseal
1735        // closure holds the target's private key. This mirrors what
1736        // `MeshNode::migration_identity_context` produces.
1737        let priv_for_closure = target_priv.clone();
1738        let unseal_snapshot: EnvelopeUnsealFn =
1739            Arc::new(move |snap: &StateSnapshot| snap.open_identity_envelope(&priv_for_closure));
1740        let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1741            Arc::new(|_| None);
1742        let ctx = MigrationIdentityContext {
1743            unseal_snapshot,
1744            peer_static_lookup,
1745        };
1746
1747        let reg = Arc::new(DaemonRegistry::new());
1748        let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1749        let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1750        let target = Arc::new(MigrationTargetHandler::new(reg));
1751        let handler = MigrationSubprotocolHandler::with_hooks(
1752            orch,
1753            source,
1754            target,
1755            0x1111,
1756            MigrationHandlerHooks {
1757                identity: Some(ctx),
1758                ..Default::default()
1759            },
1760        );
1761
1762        // Both envelope and fallback present — envelope wins.
1763        let resolved = handler
1764            .resolve_restore_keypair(&snapshot, Some(&wrong_fallback))
1765            .expect("resolve");
1766        assert_eq!(
1767            resolved.entity_id(),
1768            real_kp.entity_id(),
1769            "envelope's keypair must win over the pre-provisioned fallback — \
1770             flipping this order silently downgrades identity transport to \
1771             whatever the factory registry was pre-populated with",
1772        );
1773        assert_ne!(
1774            resolved.entity_id(),
1775            wrong_fallback.entity_id(),
1776            "fallback must NOT leak through when the envelope is valid",
1777        );
1778
1779        // Sanity: with no envelope on the snapshot, fallback is
1780        // returned verbatim. Proves the `Some(envelope) → envelope`
1781        // branch above wasn't passing by coincidence.
1782        let snapshot_no_envelope = StateSnapshot {
1783            identity_envelope: None,
1784            head_payload: None,
1785            ..snapshot.clone()
1786        };
1787        let resolved_fallback = handler
1788            .resolve_restore_keypair(&snapshot_no_envelope, Some(&wrong_fallback))
1789            .expect("resolve with fallback only");
1790        assert_eq!(resolved_fallback.entity_id(), wrong_fallback.entity_id());
1791    }
1792
1793    /// Regression (Cubic-AI P2): once an identity envelope is
1794    /// present on the snapshot, resolution must commit to
1795    /// envelope transport. A misbehaving `unseal_snapshot` that
1796    /// returns `Ok(None)` — e.g., a partially-implemented or
1797    /// buggy custom closure — previously made the dispatcher
1798    /// fall through to the pre-provisioned fallback keypair,
1799    /// silently downgrading a migration the caller had opted
1800    /// into envelope transport for.
1801    ///
1802    /// The fix treats `Ok(None)` from a present-envelope snapshot
1803    /// as a terminal error, matching the policy for an explicit
1804    /// `Err(...)` from unseal.
1805    ///
1806    /// Test: construct a snapshot that carries an envelope, wire
1807    /// an identity context whose `unseal_snapshot` ignores the
1808    /// snapshot entirely and returns `Ok(None)`. Provide a
1809    /// (wrong) fallback keypair. The resolver must refuse,
1810    /// returning `Err(...)` — not the fallback.
1811    #[test]
1812    fn envelope_present_but_unseal_returns_none_fails_rather_than_fallback() {
1813        use crate::adapter::net::identity::IdentityEnvelope;
1814        use crate::adapter::net::state::snapshot::StateSnapshot;
1815        use x25519_dalek::{PublicKey as X25519Pub, StaticSecret as X25519Secret};
1816
1817        // Fresh X25519 keypair — seal recipient.
1818        let mut seed = [0u8; 32];
1819        getrandom::fill(&mut seed).unwrap();
1820        let target_priv = X25519Secret::from(seed);
1821        let target_pub = *X25519Pub::from(&target_priv).as_bytes();
1822
1823        // Real daemon keypair: builds a valid envelope so the
1824        // snapshot is well-formed from the wire's perspective.
1825        let real_kp = EntityKeypair::generate();
1826        let chain_link = crate::adapter::net::state::causal::CausalLink {
1827            origin_hash: real_kp.origin_hash(),
1828            horizon_encoded: 0,
1829            sequence: 0,
1830            parent_hash: 0,
1831        };
1832        let envelope =
1833            IdentityEnvelope::new(&real_kp, target_pub, &chain_link).expect("seal envelope");
1834
1835        let snapshot = StateSnapshot {
1836            version: crate::adapter::net::state::snapshot::SNAPSHOT_VERSION,
1837            entity_id: real_kp.entity_id().clone(),
1838            chain_link,
1839            through_seq: 0,
1840            state: Bytes::new(),
1841            horizon: Default::default(),
1842            created_at: 0,
1843            bindings_bytes: Vec::new(),
1844            identity_envelope: Some(envelope),
1845            head_payload: None,
1846        };
1847
1848        // Misbehaving unsealer: always returns `Ok(None)`, even
1849        // when handed a snapshot with a real envelope. Simulates
1850        // a partial implementation or a bug that would have
1851        // triggered the silent-downgrade previously.
1852        let unseal_snapshot: EnvelopeUnsealFn = Arc::new(|_snap: &StateSnapshot| Ok(None));
1853        let peer_static_lookup: Arc<dyn Fn(u64) -> Option<[u8; 32]> + Send + Sync> =
1854            Arc::new(|_| None);
1855        let ctx = MigrationIdentityContext {
1856            unseal_snapshot,
1857            peer_static_lookup,
1858        };
1859
1860        let reg = Arc::new(DaemonRegistry::new());
1861        let orch = Arc::new(MigrationOrchestrator::new(reg.clone(), 0x1111));
1862        let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
1863        let target = Arc::new(MigrationTargetHandler::new(reg));
1864        let handler = MigrationSubprotocolHandler::with_hooks(
1865            orch,
1866            source,
1867            target,
1868            0x1111,
1869            MigrationHandlerHooks {
1870                identity: Some(ctx),
1871                ..Default::default()
1872            },
1873        );
1874
1875        // Fallback would have "succeeded" (wrong keypair, but
1876        // syntactically present) pre-fix. Post-fix the resolver
1877        // rejects because the envelope-present invariant commits
1878        // us to envelope transport.
1879        let wrong_fallback = EntityKeypair::generate();
1880        let err = handler
1881            .resolve_restore_keypair(&snapshot, Some(&wrong_fallback))
1882            .expect_err(
1883                "envelope present + unseal Ok(None) must fail; silently \
1884                 returning the fallback downgrades identity transport",
1885            );
1886        assert!(
1887            err.contains("refusing to fall back"),
1888            "expected 'refusing to fall back' in error message, got: {err}",
1889        );
1890    }
1891
1892    /// CR-24: pin the no-per-daemon-coupling invariant for
1893    /// `StandbyGroup` and `CapabilityIndex`. The audit suggested
1894    /// the `MigrationFailed` arm needed teardown for both
1895    /// subsystems; investigation showed neither holds
1896    /// per-daemon migration-coupled state today (see the comment
1897    /// block at the MigrationFailed arm). This test fires loudly
1898    /// if a future change introduces such coupling, signalling
1899    /// that the maintainer MUST wire teardown into the arm.
1900    ///
1901    /// Mechanism: scan the source files for the canonical coupling
1902    /// shapes — `daemon_origin` field on `StandbyGroup` /
1903    /// `CapabilityIndex`, or migration-handler import of either
1904    /// type. Any match indicates the contract has changed and
1905    /// `migration_handler.rs:MigrationFailed` likely needs to
1906    /// call cleanup on the new state.
1907    #[test]
1908    fn cr24_no_per_daemon_migration_coupling_in_standby_or_capability() {
1909        let standby_src = include_str!("../compute/standby_group.rs");
1910        let capability_src = include_str!("../behavior/capability.rs");
1911
1912        // Cubic P2: strip both line comments (`// ...`) AND block
1913        // comments (`/* ... */`, including `/** ... */` doc
1914        // comments) before scanning. The earlier filter only
1915        // skipped `//` lines, so a token mention inside
1916        // `/** ... */` would falsely trip the regression. We
1917        // strip block-comment ranges first; the per-line filter
1918        // then handles the line-comment case.
1919        fn strip_comments(src: &str) -> String {
1920            let bytes = src.as_bytes();
1921            let mut out = Vec::with_capacity(bytes.len());
1922            let mut i = 0;
1923            while i < bytes.len() {
1924                // Skip block comment.
1925                if i + 1 < bytes.len() && bytes[i] == b'/' && bytes[i + 1] == b'*' {
1926                    i += 2;
1927                    while i + 1 < bytes.len() && !(bytes[i] == b'*' && bytes[i + 1] == b'/') {
1928                        // Preserve newlines so per-line scanning still aligns.
1929                        if bytes[i] == b'\n' {
1930                            out.push(b'\n');
1931                        }
1932                        i += 1;
1933                    }
1934                    if i + 1 < bytes.len() {
1935                        i += 2; // skip closing */
1936                    }
1937                    continue;
1938                }
1939                out.push(bytes[i]);
1940                i += 1;
1941            }
1942            String::from_utf8_lossy(&out).into_owned()
1943        }
1944
1945        let capability_clean = strip_comments(capability_src);
1946        let standby_clean = strip_comments(standby_src);
1947
1948        // CapabilityIndex must NOT index by daemon_origin. Pinned
1949        // separately because it's the audit's specific claim.
1950        let capability_uses_daemon_origin = capability_clean.lines().any(|line| {
1951            let trimmed = line.trim_start();
1952            !trimmed.starts_with("//") && trimmed.contains("daemon_origin")
1953        });
1954        assert!(
1955            !capability_uses_daemon_origin,
1956            "CR-24 regression: CapabilityIndex now references `daemon_origin` in \
1957             non-comment source. The audit's CR-24 concern was that capabilities \
1958             tied to a migrating daemon need teardown on MigrationFailed. With \
1959             this new coupling the migration_handler MUST call \
1960             `capability_index.cleanup_origin(daemon_origin)` (or equivalent) \
1961             in the MigrationFailed arm. Add the call AND update this test."
1962        );
1963
1964        // StandbyGroup must NOT have an "in-flight migration
1965        // promotion" field. The audit's scenario was "promotion
1966        // mid-flight" — for that to be a real concern, there
1967        // would need to be a state field (e.g. `pending_promotion:
1968        // Option<...>`) that survives across multiple migration-
1969        // handler dispatches.
1970        let standby_has_pending = standby_clean.lines().any(|line| {
1971            let trimmed = line.trim_start();
1972            if trimmed.starts_with("//") {
1973                return false;
1974            }
1975            trimmed.contains("pending_promotion")
1976                || trimmed.contains("migration_in_flight")
1977                || trimmed.contains("in_migration:")
1978        });
1979        assert!(
1980            !standby_has_pending,
1981            "CR-24 regression: StandbyGroup now has a pending-promotion or \
1982             in-migration field. The audit's CR-24 concern was that a mid- \
1983             flight standby promotion needs teardown on MigrationFailed. With \
1984             this new coupling the migration_handler MUST call rollback on \
1985             StandbyGroup in the MigrationFailed arm. Add the rollback call \
1986             AND update this test."
1987        );
1988    }
1989}