Skip to main content

net/adapter/net/compute/
orchestrator.rs

1//! Migration orchestrator — coordinates all 6 phases of daemon migration.
2//!
3//! The orchestrator runs on the controller node (which may be the source,
4//! target, or a third-party coordinator). It sequences phase transitions,
5//! routes migration messages, and handles failures/timeouts.
6
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use bytes::Buf;
11use dashmap::DashMap;
12use parking_lot::Mutex;
13
14use super::migration::{MigrationError, MigrationFailureReason, MigrationPhase, MigrationState};
15use super::migration_source::MigrationSourceHandler;
16use super::registry::DaemonRegistry;
17use crate::adapter::net::continuity::superposition::SuperpositionState;
18use crate::adapter::net::identity::EntityId;
19use crate::adapter::net::state::causal::{CausalEvent, CausalLink};
20use crate::adapter::net::state::snapshot::{StateSnapshot, SNAPSHOT_VERSION};
21
22/// Snapshot wire-format magic bytes — duplicated locally from
23/// `crate::adapter::net::state::snapshot` (the constant is
24/// module-private there). Mirror of `V1_MAGIC = *b"CDS1"`.
25const SNAPSHOT_WIRE_MAGIC: &[u8; 4] = b"CDS1";
26
27/// Validate a chunk's wire envelope without forcing the
28/// orchestrator to reassemble the full snapshot.
29///
30/// On `chunk_index == 0` (the chunk that carries the snapshot
31/// header — single-chunk OR first piece of a multi-chunk send),
32/// confirm:
33/// - The first 4 bytes match the snapshot wire magic.
34/// - Byte 4 is the expected SNAPSHOT_VERSION.
35/// - Bytes 5..37 carry an `entity_id` whose `origin_hash()` matches
36///   the migration's recorded `daemon_origin`.
37///
38/// On `chunk_index > 0`, the chunk is a raw payload fragment with
39/// no envelope — return Ok and let the target's reassembler verify
40/// the assembled snapshot.
41///
42/// Strictly weaker than `StateSnapshot::from_bytes` for the
43/// single-chunk path (from_bytes parses everything past byte 37
44/// too), but matters for multi-chunk where from_bytes can't run
45/// at the orchestrator. Closes the asymmetry where single-chunk
46/// corruption caught at the orchestrator while multi-chunk
47/// corruption deferred entirely to the target.
48#[expect(
49    clippy::expect_used,
50    reason = "snapshot_bytes.len() >= 37 checked above; slicing [5..37] yields exactly 32 bytes"
51)]
52fn validate_chunk_header(
53    chunk_index: u32,
54    snapshot_bytes: &[u8],
55    expected_daemon_origin: u64,
56) -> Result<(), MigrationError> {
57    if chunk_index != 0 {
58        return Ok(());
59    }
60    // Magic (4) + version (1) + entity_id (32) = 37 bytes minimum
61    // for the validator to do anything useful.
62    if snapshot_bytes.len() < 37 {
63        return Err(MigrationError::StateFailed(format!(
64            "SnapshotReady chunk 0 is {} bytes — too short for snapshot envelope (need >= 37)",
65            snapshot_bytes.len()
66        )));
67    }
68    if &snapshot_bytes[..4] != SNAPSHOT_WIRE_MAGIC {
69        return Err(MigrationError::StateFailed(format!(
70            "SnapshotReady chunk 0 has wrong magic bytes: {:?}",
71            &snapshot_bytes[..4]
72        )));
73    }
74    let version = snapshot_bytes[4];
75    if version != SNAPSHOT_VERSION {
76        return Err(MigrationError::StateFailed(format!(
77            "SnapshotReady chunk 0 carries snapshot version {} (expected {})",
78            version, SNAPSHOT_VERSION
79        )));
80    }
81    let entity_bytes: [u8; 32] = snapshot_bytes[5..37].try_into().expect("range is 32 bytes");
82    let claimed_origin = EntityId::from_bytes(entity_bytes).origin_hash();
83    if claimed_origin != expected_daemon_origin {
84        return Err(MigrationError::StateFailed(format!(
85            "SnapshotReady chunk 0 entity_id origin {:#x} does not match daemon_origin {:#x}",
86            claimed_origin, expected_daemon_origin
87        )));
88    }
89    Ok(())
90}
91
92// ── Migration message protocol ──────────────────────────────────────────────
93
94/// Wire message types for the migration subprotocol (0x0500).
95#[derive(Debug, Clone)]
96pub enum MigrationMessage {
97    /// Phase 0→1: Request snapshot on source.
98    TakeSnapshot {
99        /// Origin hash of daemon to migrate.
100        daemon_origin: u64,
101        /// Destination node ID.
102        target_node: u64,
103    },
104
105    /// Phase 1→2: Snapshot taken, payload included.
106    ///
107    /// Large snapshots are chunked across multiple `SnapshotReady` messages.
108    /// The receiver must reassemble all chunks (0..total_chunks) before
109    /// deserializing the snapshot. Single-chunk snapshots have
110    /// `chunk_index = 0, total_chunks = 1`.
111    SnapshotReady {
112        /// Origin hash of daemon being migrated.
113        daemon_origin: u64,
114        /// Serialized `StateSnapshot` bytes (or chunk thereof).
115        snapshot_bytes: Vec<u8>,
116        /// Sequence number the snapshot covers through.
117        seq_through: u64,
118        /// Zero-based index of this chunk.
119        chunk_index: u32,
120        /// Total number of chunks for this snapshot.
121        total_chunks: u32,
122    },
123
124    /// Phase 2→3: Target restored daemon from snapshot.
125    RestoreComplete {
126        /// Origin hash of daemon being migrated.
127        daemon_origin: u64,
128        /// Sequence number restored through.
129        restored_seq: u64,
130    },
131
132    /// Phase 3→4: Target finished replaying buffered events.
133    ReplayComplete {
134        /// Origin hash of daemon being migrated.
135        daemon_origin: u64,
136        /// Sequence number replayed through.
137        replayed_seq: u64,
138        /// Target's chain head after replay. The orchestrator stamps
139        /// this into its `SuperpositionState` so continuity proofs
140        /// carry the real cryptographic anchor even when the
141        /// orchestrator lives on a third node and has no local
142        /// daemon registry entry to read from.
143        target_head: CausalLink,
144    },
145
146    /// Phase 4: Source stops accepting writes, routing switches.
147    CutoverNotify {
148        /// Origin hash of daemon being migrated.
149        daemon_origin: u64,
150        /// Target node that is now authoritative.
151        target_node: u64,
152    },
153
154    /// Phase 5: Source cleaned up.
155    CleanupComplete {
156        /// Origin hash of daemon whose migration is complete.
157        daemon_origin: u64,
158    },
159
160    /// Any phase: Abort migration.
161    MigrationFailed {
162        /// Origin hash of daemon whose migration failed.
163        daemon_origin: u64,
164        /// Structured reason code — source dispatches on this to
165        /// decide whether the migration is retriable. See
166        /// [`MigrationFailureReason`].
167        reason: MigrationFailureReason,
168    },
169
170    /// Buffered events from source for replay on target.
171    BufferedEvents {
172        /// Origin hash of daemon being migrated.
173        daemon_origin: u64,
174        /// Events to replay, in causal order.
175        events: Vec<CausalEvent>,
176    },
177
178    /// Phase 5→6: Source has cleaned up; target should now go live.
179    ///
180    /// Emitted by the orchestrator once it observes `CleanupComplete`. The
181    /// target calls `MigrationTargetHandler::activate` in response and
182    /// replies with `ActivateAck`.
183    ActivateTarget {
184        /// Origin hash of daemon whose target should activate.
185        daemon_origin: u64,
186    },
187
188    /// Phase 6: Target has activated and is now authoritative.
189    ActivateAck {
190        /// Origin hash of daemon whose migration is complete.
191        daemon_origin: u64,
192        /// Sequence number the target is authoritative through.
193        replayed_seq: u64,
194    },
195}
196
197// ── Wire format ─────────────────────────────────────────────────────────────
198
199/// Wire format encode/decode for migration messages.
200pub mod wire {
201    use super::*;
202    use bytes::{Buf, BufMut};
203
204    /// Wire type: request snapshot on source.
205    pub const MSG_TAKE_SNAPSHOT: u8 = 0;
206    /// Wire type: snapshot taken, payload included.
207    pub const MSG_SNAPSHOT_READY: u8 = 1;
208    /// Wire type: target restored daemon from snapshot.
209    pub const MSG_RESTORE_COMPLETE: u8 = 2;
210    /// Wire type: target finished replaying buffered events.
211    pub const MSG_REPLAY_COMPLETE: u8 = 3;
212    /// Wire type: source stops writes, routing switches.
213    pub const MSG_CUTOVER_NOTIFY: u8 = 4;
214    /// Wire type: source cleaned up.
215    pub const MSG_CLEANUP_COMPLETE: u8 = 5;
216    /// Wire type: migration failed/aborted.
217    pub const MSG_FAILED: u8 = 6;
218    /// Wire type: buffered events for replay.
219    pub const MSG_BUFFERED_EVENTS: u8 = 7;
220    /// Wire type: orchestrator tells target to activate.
221    pub const MSG_ACTIVATE_TARGET: u8 = 8;
222    /// Wire type: target acknowledges activation.
223    pub const MSG_ACTIVATE_ACK: u8 = 9;
224
225    /// Encode a migration message to bytes.
226    ///
227    /// Returns `MigrationError::StateFailed` when a length-prefixed field
228    /// would not fit in its on-wire width. Length prefixes are `u32` for
229    /// payloads and counts and `u16` for the failure reason string; silently
230    /// truncating to fit would corrupt the stream and confuse the decoder.
231    pub fn encode(msg: &MigrationMessage) -> Result<Vec<u8>, MigrationError> {
232        // Helper: convert a usize length to u32 with an error on overflow.
233        fn len_u32(field: &str, n: usize) -> Result<u32, MigrationError> {
234            u32::try_from(n).map_err(|_| {
235                MigrationError::StateFailed(format!("{} length {} exceeds u32::MAX", field, n))
236            })
237        }
238
239        let mut buf = Vec::with_capacity(128);
240
241        match msg {
242            MigrationMessage::TakeSnapshot {
243                daemon_origin,
244                target_node,
245            } => {
246                buf.put_u8(MSG_TAKE_SNAPSHOT);
247                buf.put_u64_le(*daemon_origin);
248                buf.put_u64_le(*target_node);
249            }
250            MigrationMessage::SnapshotReady {
251                daemon_origin,
252                snapshot_bytes,
253                seq_through,
254                chunk_index,
255                total_chunks,
256            } => {
257                let payload_len = len_u32("snapshot_bytes", snapshot_bytes.len())?;
258                buf.put_u8(MSG_SNAPSHOT_READY);
259                buf.put_u64_le(*daemon_origin);
260                buf.put_u64_le(*seq_through);
261                buf.put_u32_le(*chunk_index);
262                buf.put_u32_le(*total_chunks);
263                buf.put_u32_le(payload_len);
264                buf.extend_from_slice(snapshot_bytes);
265            }
266            MigrationMessage::RestoreComplete {
267                daemon_origin,
268                restored_seq,
269            } => {
270                buf.put_u8(MSG_RESTORE_COMPLETE);
271                buf.put_u64_le(*daemon_origin);
272                buf.put_u64_le(*restored_seq);
273            }
274            MigrationMessage::ReplayComplete {
275                daemon_origin,
276                replayed_seq,
277                target_head,
278            } => {
279                buf.put_u8(MSG_REPLAY_COMPLETE);
280                buf.put_u64_le(*daemon_origin);
281                buf.put_u64_le(*replayed_seq);
282                buf.extend_from_slice(&target_head.to_bytes());
283            }
284            MigrationMessage::CutoverNotify {
285                daemon_origin,
286                target_node,
287            } => {
288                buf.put_u8(MSG_CUTOVER_NOTIFY);
289                buf.put_u64_le(*daemon_origin);
290                buf.put_u64_le(*target_node);
291            }
292            MigrationMessage::CleanupComplete { daemon_origin } => {
293                buf.put_u8(MSG_CLEANUP_COMPLETE);
294                buf.put_u64_le(*daemon_origin);
295            }
296            MigrationMessage::MigrationFailed {
297                daemon_origin,
298                reason,
299            } => {
300                buf.put_u8(MSG_FAILED);
301                buf.put_u64_le(*daemon_origin);
302                // Wire layout:
303                //   code:  u16 le
304                //   then variant-specific payload (0 bytes for
305                //   zero-payload variants; `u16 le + bytes` for
306                //   string-bearing variants; `u8` for NotReadyTimeout).
307                buf.put_u16_le(reason.code());
308                match reason {
309                    MigrationFailureReason::NotReady
310                    | MigrationFailureReason::FactoryNotFound
311                    | MigrationFailureReason::ComputeNotSupported
312                    | MigrationFailureReason::AlreadyMigrating => {}
313                    MigrationFailureReason::StateFailed(msg)
314                    | MigrationFailureReason::IdentityTransportFailed(msg) => {
315                        let len = u16::try_from(msg.len()).map_err(|_| {
316                            MigrationError::StateFailed(format!(
317                                "failure reason message length {} exceeds u16::MAX",
318                                msg.len()
319                            ))
320                        })?;
321                        buf.put_u16_le(len);
322                        buf.extend_from_slice(msg.as_bytes());
323                    }
324                    MigrationFailureReason::NotReadyTimeout { attempts } => {
325                        buf.put_u8(*attempts);
326                    }
327                }
328            }
329            MigrationMessage::BufferedEvents {
330                daemon_origin,
331                events,
332            } => {
333                let event_count = len_u32("events", events.len())?;
334                buf.put_u8(MSG_BUFFERED_EVENTS);
335                buf.put_u64_le(*daemon_origin);
336                buf.put_u32_le(event_count);
337                for event in events {
338                    let payload_len = len_u32("event payload", event.payload.len())?;
339                    let link_bytes = event.link.to_bytes();
340                    buf.extend_from_slice(&link_bytes);
341                    buf.put_u32_le(payload_len);
342                    buf.extend_from_slice(&event.payload);
343                    buf.put_u64_le(event.received_at);
344                }
345            }
346            MigrationMessage::ActivateTarget { daemon_origin } => {
347                buf.put_u8(MSG_ACTIVATE_TARGET);
348                buf.put_u64_le(*daemon_origin);
349            }
350            MigrationMessage::ActivateAck {
351                daemon_origin,
352                replayed_seq,
353            } => {
354                buf.put_u8(MSG_ACTIVATE_ACK);
355                buf.put_u64_le(*daemon_origin);
356                buf.put_u64_le(*replayed_seq);
357            }
358        }
359
360        Ok(buf)
361    }
362
363    /// Decode a migration message from bytes.
364    pub fn decode(data: &[u8]) -> Result<MigrationMessage, MigrationError> {
365        if data.is_empty() {
366            return Err(MigrationError::StateFailed("empty message".into()));
367        }
368
369        let mut cur = std::io::Cursor::new(data);
370
371        let msg_type = cur.get_u8();
372
373        match msg_type {
374            MSG_TAKE_SNAPSHOT => {
375                if cur.remaining() < 16 {
376                    return Err(MigrationError::StateFailed("truncated TakeSnapshot".into()));
377                }
378                Ok(MigrationMessage::TakeSnapshot {
379                    daemon_origin: cur.get_u64_le(),
380                    target_node: cur.get_u64_le(),
381                })
382            }
383            MSG_SNAPSHOT_READY => {
384                // daemon_origin(8) + seq_through(8) + chunk_index(4) + total_chunks(4) + len(4) = 28
385                if cur.remaining() < 28 {
386                    return Err(MigrationError::StateFailed(
387                        "truncated SnapshotReady".into(),
388                    ));
389                }
390                let daemon_origin = cur.get_u64_le();
391                let seq_through = cur.get_u64_le();
392                let chunk_index = cur.get_u32_le();
393                let total_chunks = cur.get_u32_le();
394                let len = cur.get_u32_le() as usize;
395                // Reject structurally invalid chunks at the wire boundary so
396                // malformed messages never even reach the reassembler. The
397                // reassembler enforces the same invariants defensively.
398                if total_chunks == 0 {
399                    return Err(MigrationError::StateFailed(
400                        "SnapshotReady: total_chunks must be >= 1".into(),
401                    ));
402                }
403                if total_chunks > MAX_TOTAL_CHUNKS {
404                    return Err(MigrationError::StateFailed(format!(
405                        "SnapshotReady: total_chunks {} exceeds MAX_TOTAL_CHUNKS ({})",
406                        total_chunks, MAX_TOTAL_CHUNKS
407                    )));
408                }
409                if chunk_index >= total_chunks {
410                    return Err(MigrationError::StateFailed(format!(
411                        "SnapshotReady: chunk_index {} out of range for total_chunks {}",
412                        chunk_index, total_chunks
413                    )));
414                }
415                if len > MAX_SNAPSHOT_CHUNK_SIZE {
416                    return Err(MigrationError::StateFailed(format!(
417                        "SnapshotReady: chunk len {} exceeds MAX_SNAPSHOT_CHUNK_SIZE ({})",
418                        len, MAX_SNAPSHOT_CHUNK_SIZE
419                    )));
420                }
421                if cur.remaining() < len {
422                    return Err(MigrationError::StateFailed(
423                        "truncated snapshot payload".into(),
424                    ));
425                }
426                let mut snapshot_bytes = vec![0u8; len];
427                cur.copy_to_slice(&mut snapshot_bytes);
428                Ok(MigrationMessage::SnapshotReady {
429                    daemon_origin,
430                    snapshot_bytes,
431                    seq_through,
432                    chunk_index,
433                    total_chunks,
434                })
435            }
436            MSG_RESTORE_COMPLETE => {
437                if cur.remaining() < 16 {
438                    return Err(MigrationError::StateFailed(
439                        "truncated RestoreComplete".into(),
440                    ));
441                }
442                Ok(MigrationMessage::RestoreComplete {
443                    daemon_origin: cur.get_u64_le(),
444                    restored_seq: cur.get_u64_le(),
445                })
446            }
447            MSG_REPLAY_COMPLETE => {
448                // 8 (origin) + 8 (seq) + 32 (target_head) = 48
449                if cur.remaining() < 48 {
450                    return Err(MigrationError::StateFailed(
451                        "truncated ReplayComplete".into(),
452                    ));
453                }
454                let daemon_origin = cur.get_u64_le();
455                let replayed_seq = cur.get_u64_le();
456                let mut head_bytes = [0u8; 32];
457                cur.copy_to_slice(&mut head_bytes);
458                let target_head = CausalLink::from_bytes(&head_bytes).ok_or_else(|| {
459                    MigrationError::StateFailed(
460                        "ReplayComplete: malformed target_head bytes".into(),
461                    )
462                })?;
463                Ok(MigrationMessage::ReplayComplete {
464                    daemon_origin,
465                    replayed_seq,
466                    target_head,
467                })
468            }
469            MSG_CUTOVER_NOTIFY => {
470                if cur.remaining() < 16 {
471                    return Err(MigrationError::StateFailed(
472                        "truncated CutoverNotify".into(),
473                    ));
474                }
475                Ok(MigrationMessage::CutoverNotify {
476                    daemon_origin: cur.get_u64_le(),
477                    target_node: cur.get_u64_le(),
478                })
479            }
480            MSG_CLEANUP_COMPLETE => {
481                if cur.remaining() < 8 {
482                    return Err(MigrationError::StateFailed(
483                        "truncated CleanupComplete".into(),
484                    ));
485                }
486                Ok(MigrationMessage::CleanupComplete {
487                    daemon_origin: cur.get_u64_le(),
488                })
489            }
490            MSG_FAILED => {
491                if cur.remaining() < 8 + 2 {
492                    return Err(MigrationError::StateFailed(
493                        "truncated MigrationFailed header".into(),
494                    ));
495                }
496                let daemon_origin = cur.get_u64_le();
497                let code = cur.get_u16_le();
498                let reason = decode_failure_reason(&mut cur, code)?;
499                Ok(MigrationMessage::MigrationFailed {
500                    daemon_origin,
501                    reason,
502                })
503            }
504            MSG_BUFFERED_EVENTS => {
505                if cur.remaining() < 12 {
506                    return Err(MigrationError::StateFailed(
507                        "truncated BufferedEvents".into(),
508                    ));
509                }
510                let daemon_origin = cur.get_u64_le();
511                let count = cur.get_u32_le() as usize;
512                // Bound `count` against the remaining wire bytes before
513                // allocating. Each event on the wire is at least
514                // CAUSAL_LINK_SIZE(24) + u32 payload_len(4) + u64
515                // received_at(8) = 36 bytes (empty payload). Without this
516                // check, a malformed packet could claim `count = u32::MAX`
517                // and force the decoder to allocate ~4G Vec slots before
518                // the per-event bound checks fire — a cheap DoS against
519                // the migration subprotocol.
520                use crate::adapter::net::state::causal::CAUSAL_LINK_SIZE;
521                const MIN_EVENT_WIRE_SIZE: usize = CAUSAL_LINK_SIZE + 4 + 8;
522                // Hard cap as defense-in-depth. Well above any realistic
523                // buffered-event batch (the orchestrator sends one
524                // per-daemon batch at restore-complete; millions of
525                // events per daemon per migration is already pathological).
526                const MAX_BUFFERED_EVENTS: usize = 1_000_000;
527                let max_possible = cur.remaining() / MIN_EVENT_WIRE_SIZE;
528                if count > max_possible || count > MAX_BUFFERED_EVENTS {
529                    return Err(MigrationError::StateFailed(format!(
530                        "BufferedEvents: count {} exceeds bound (remaining={}, \
531                         min_event_size={}, max_possible={}, hard_cap={})",
532                        count,
533                        cur.remaining(),
534                        MIN_EVENT_WIRE_SIZE,
535                        max_possible,
536                        MAX_BUFFERED_EVENTS,
537                    )));
538                }
539                let mut events = Vec::with_capacity(count);
540                for _ in 0..count {
541                    if cur.remaining() < CAUSAL_LINK_SIZE + 4 {
542                        return Err(MigrationError::StateFailed(
543                            "truncated buffered event".into(),
544                        ));
545                    }
546                    let mut link_bytes = [0u8; CAUSAL_LINK_SIZE];
547                    cur.copy_to_slice(&mut link_bytes);
548                    let link = CausalLink::from_bytes(&link_bytes)
549                        .ok_or_else(|| MigrationError::StateFailed("invalid causal link".into()))?;
550                    let payload_len = cur.get_u32_le() as usize;
551                    // Per-event payload cap. Defence-in-depth against
552                    // a peer that ships a buffered-events message
553                    // declaring a max-u32 (4 GiB) payload — without
554                    // the cap, `Vec::with_capacity(payload_len)` 30
555                    // lines below would attempt the allocation. Cap
556                    // at MAX_SNAPSHOT_CHUNK_SIZE, the same byte limit
557                    // every other per-event wire surface uses; a real
558                    // BufferedEvents stream never carries payloads
559                    // larger than the snapshot chunk size.
560                    if payload_len > MAX_SNAPSHOT_CHUNK_SIZE {
561                        return Err(MigrationError::StateFailed(format!(
562                            "buffered event payload {} exceeds per-event cap {}",
563                            payload_len, MAX_SNAPSHOT_CHUNK_SIZE
564                        )));
565                    }
566                    // Saturate-add so `payload_len + 8` can't wrap on
567                    // 32-bit targets and cause the `<` check below to
568                    // pass against an attacker-shaped length. The
569                    // crate's primary deployment is 64-bit, but the
570                    // type is `usize` and a 32-bit cdylib build would
571                    // expose the wrap.
572                    let need = payload_len.saturating_add(8);
573                    if cur.remaining() < need {
574                        return Err(MigrationError::StateFailed(
575                            "truncated event payload".into(),
576                        ));
577                    }
578                    let mut payload = vec![0u8; payload_len];
579                    cur.copy_to_slice(&mut payload);
580                    let received_at = cur.get_u64_le();
581                    events.push(CausalEvent {
582                        link,
583                        payload: bytes::Bytes::from(payload),
584                        received_at,
585                    });
586                }
587                Ok(MigrationMessage::BufferedEvents {
588                    daemon_origin,
589                    events,
590                })
591            }
592            MSG_ACTIVATE_TARGET => {
593                if cur.remaining() < 8 {
594                    return Err(MigrationError::StateFailed(
595                        "truncated ActivateTarget".into(),
596                    ));
597                }
598                Ok(MigrationMessage::ActivateTarget {
599                    daemon_origin: cur.get_u64_le(),
600                })
601            }
602            MSG_ACTIVATE_ACK => {
603                if cur.remaining() < 16 {
604                    return Err(MigrationError::StateFailed("truncated ActivateAck".into()));
605                }
606                Ok(MigrationMessage::ActivateAck {
607                    daemon_origin: cur.get_u64_le(),
608                    replayed_seq: cur.get_u64_le(),
609                })
610            }
611            _ => Err(MigrationError::StateFailed(format!(
612                "unknown message type: {}",
613                msg_type
614            ))),
615        }
616    }
617}
618
619/// Decode a `MigrationFailureReason` from the `MSG_FAILED` variant
620/// payload. The 16-bit tag already consumed by the caller selects
621/// the variant; unknown tags are rejected so forward-compat is
622/// explicit rather than silent-ignore.
623fn decode_failure_reason(
624    cur: &mut std::io::Cursor<&[u8]>,
625    code: u16,
626) -> Result<MigrationFailureReason, MigrationError> {
627    match code {
628        0 => Ok(MigrationFailureReason::NotReady),
629        1 => Ok(MigrationFailureReason::FactoryNotFound),
630        2 => Ok(MigrationFailureReason::ComputeNotSupported),
631        3 => {
632            let msg = read_u16_string(cur, "StateFailed message")?;
633            Ok(MigrationFailureReason::StateFailed(msg))
634        }
635        4 => Ok(MigrationFailureReason::AlreadyMigrating),
636        5 => {
637            let msg = read_u16_string(cur, "IdentityTransportFailed message")?;
638            Ok(MigrationFailureReason::IdentityTransportFailed(msg))
639        }
640        6 => {
641            if cur.remaining() < 1 {
642                return Err(MigrationError::StateFailed(
643                    "truncated NotReadyTimeout attempts field".into(),
644                ));
645            }
646            Ok(MigrationFailureReason::NotReadyTimeout {
647                attempts: cur.get_u8(),
648            })
649        }
650        other => Err(MigrationError::StateFailed(format!(
651            "unknown MigrationFailureReason code {other}",
652        ))),
653    }
654}
655
656fn read_u16_string(cur: &mut std::io::Cursor<&[u8]>, ctx: &str) -> Result<String, MigrationError> {
657    if cur.remaining() < 2 {
658        return Err(MigrationError::StateFailed(format!(
659            "truncated {ctx} length prefix",
660        )));
661    }
662    let len = cur.get_u16_le() as usize;
663    if cur.remaining() < len {
664        return Err(MigrationError::StateFailed(format!("truncated {ctx} body")));
665    }
666    let mut bytes = vec![0u8; len];
667    cur.copy_to_slice(&mut bytes);
668    String::from_utf8(bytes)
669        .map_err(|e| MigrationError::StateFailed(format!("{ctx} is not valid UTF-8: {e}")))
670}
671
672// ── Snapshot chunking ────────────────────────────────────────────────────────
673
674/// Maximum snapshot chunk size. Sized to fit within `MAX_PAYLOAD_SIZE` after
675/// accounting for the SnapshotReady wire header overhead
676/// (msg_type + daemon_origin + seq_through + chunk_index + total_chunks + len = 29 bytes)
677/// and leaving headroom for the outer transport framing.
678pub const MAX_SNAPSHOT_CHUNK_SIZE: usize = 7000;
679
680/// Maximum transferable snapshot size: `u32::MAX` chunks * 7,000 bytes per chunk.
681///
682/// This is ~28 TB — effectively unlimited for daemon state. The `StateSnapshot`
683/// wire format itself caps at ~4 GB (`state_len: u32`), so in practice the
684/// snapshot serialization limit is reached first.
685pub const MAX_SNAPSHOT_SIZE: usize = u32::MAX as usize * MAX_SNAPSHOT_CHUNK_SIZE;
686
687/// Maximum `total_chunks` the reassembler will accept per reassembly.
688///
689/// `StateSnapshot` wire format caps payload at ~4 GB (`state_len: u32`), so
690/// `ceil(u32::MAX / MAX_SNAPSHOT_CHUNK_SIZE)` ≈ 613,566 chunks is the largest
691/// legitimate value. We cap above that with headroom; anything higher is an
692/// attacker declaring a fake `total_chunks` to either flood us with
693/// BTreeMap insertions or stall the reassembler forever waiting for chunks
694/// that will never arrive.
695pub const MAX_TOTAL_CHUNKS: u32 = 700_000;
696
697/// Hard upper bound on bytes buffered for a SINGLE in-flight reassembly.
698///
699/// `MAX_TOTAL_CHUNKS × MAX_SNAPSHOT_CHUNK_SIZE` ≈ 4.3 GiB; combined with the
700/// fact that `seq_through == latest` doesn't trigger eviction, an attacker
701/// can park up to that much memory per `(daemon_origin, seq_through)` and
702/// refresh forever without ever completing the snapshot. This cap is a
703/// hard ceiling on the per-entry buffer regardless of the declared
704/// `total_chunks`. Real daemon snapshots run in the megabytes, not
705/// gigabytes — 64 MiB leaves plenty of headroom while bounding the
706/// flood-amplification a malicious peer can produce.
707pub const MAX_PENDING_REASSEMBLY_BYTES: usize = 64 * 1024 * 1024;
708
709/// Maximum age of a pending reassembly entry before it is swept.
710///
711/// Even with the per-entry byte cap, a peer can park up to
712/// `MAX_PENDING_REASSEMBLY_BYTES` indefinitely under a single
713/// `(daemon_origin, seq_through)` key: the cap refuses *additional*
714/// chunks once buffered bytes reach the ceiling, but it doesn't
715/// evict what's already there, and the `seq_through > latest`
716/// eviction never fires while the peer re-uses the same
717/// `seq_through`. Across many distinct `daemon_origin` values that
718/// produces unbounded growth. The age sweep closes that hole by
719/// removing entries whose last-progress timestamp is older than
720/// this duration. Real snapshots complete in seconds; 5 minutes
721/// leaves headroom for a slow legitimate peer while bounding the
722/// persistence of attacker-shaped state.
723pub const MAX_PENDING_REASSEMBLY_AGE: Duration = Duration::from_secs(300);
724
725/// Split a snapshot into chunked `SnapshotReady` messages.
726///
727/// Small snapshots (<= `MAX_SNAPSHOT_CHUNK_SIZE`) produce a single message
728/// with `chunk_index = 0, total_chunks = 1`. Larger snapshots are split
729/// into multiple messages that the receiver must reassemble.
730///
731/// Returns `MigrationError::SnapshotTooLarge` if the snapshot exceeds
732/// `MAX_SNAPSHOT_SIZE` (~28 TB).
733pub fn chunk_snapshot(
734    daemon_origin: u64,
735    snapshot_bytes: Vec<u8>,
736    seq_through: u64,
737) -> Result<Vec<MigrationMessage>, MigrationError> {
738    if snapshot_bytes.len() <= MAX_SNAPSHOT_CHUNK_SIZE {
739        return Ok(vec![MigrationMessage::SnapshotReady {
740            daemon_origin,
741            snapshot_bytes,
742            seq_through,
743            chunk_index: 0,
744            total_chunks: 1,
745        }]);
746    }
747
748    let total_chunks = snapshot_bytes.len().div_ceil(MAX_SNAPSHOT_CHUNK_SIZE);
749    let total_chunks =
750        u32::try_from(total_chunks).map_err(|_| MigrationError::SnapshotTooLarge {
751            size: snapshot_bytes.len(),
752            max: MAX_SNAPSHOT_SIZE,
753        })?;
754
755    Ok(snapshot_bytes
756        .chunks(MAX_SNAPSHOT_CHUNK_SIZE)
757        .enumerate()
758        .map(|(i, chunk)| MigrationMessage::SnapshotReady {
759            daemon_origin,
760            snapshot_bytes: chunk.to_vec(),
761            seq_through,
762            chunk_index: i as u32,
763            total_chunks,
764        })
765        .collect())
766}
767
768/// Why a chunk was rejected by [`SnapshotReassembler::feed`].
769#[derive(Debug, Clone, Copy, PartialEq, Eq)]
770pub enum ReassemblyError {
771    /// `total_chunks == 0` — a well-formed message always declares at least one.
772    ZeroTotalChunks,
773    /// `chunk_index >= total_chunks` — attacker trying to smuggle out-of-range
774    /// indices past the "all chunks received" count check.
775    ChunkIndexOutOfRange {
776        /// The chunk index declared by the peer.
777        chunk_index: u32,
778        /// The `total_chunks` declared by the peer.
779        total_chunks: u32,
780    },
781    /// `total_chunks > MAX_TOTAL_CHUNKS` — peer declared more chunks than any
782    /// legitimate snapshot could produce.
783    TotalChunksTooLarge {
784        /// The `total_chunks` declared by the peer.
785        total_chunks: u32,
786    },
787    /// An individual chunk exceeds `MAX_SNAPSHOT_CHUNK_SIZE`.
788    ChunkTooLarge {
789        /// The chunk length observed.
790        len: usize,
791    },
792    /// A later chunk declared a different `total_chunks` than the first chunk
793    /// for the same `(daemon_origin, seq_through)`. Peer is either buggy or
794    /// trying to resize an in-flight reassembly to force extra allocations.
795    TotalChunksMismatch {
796        /// The value declared by the current chunk.
797        got: u32,
798        /// The value locked in by the first chunk.
799        expected: u32,
800    },
801    /// Peer sent a chunk for an older `seq_through` after we already
802    /// accepted a newer one for the same daemon.
803    StaleSeqThrough {
804        /// The `seq_through` on the incoming chunk.
805        got: u64,
806        /// The newest `seq_through` we've accepted for this daemon.
807        latest: u64,
808    },
809    /// Buffered bytes for this `(daemon_origin, seq_through)` would
810    /// exceed `MAX_PENDING_REASSEMBLY_BYTES`. Refusing the chunk
811    /// bounds the memory amplification a peer can drive by sending
812    /// only some of the chunks for an outsized declared snapshot.
813    TooManyPendingBytes {
814        /// Bytes already buffered for this entry.
815        buffered: usize,
816        /// Length of the chunk being rejected.
817        incoming: usize,
818        /// Per-entry cap.
819        cap: usize,
820    },
821}
822
823impl std::fmt::Display for ReassemblyError {
824    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825        match self {
826            Self::ZeroTotalChunks => write!(f, "total_chunks == 0"),
827            Self::ChunkIndexOutOfRange {
828                chunk_index,
829                total_chunks,
830            } => write!(
831                f,
832                "chunk_index {} out of range for total_chunks {}",
833                chunk_index, total_chunks
834            ),
835            Self::TotalChunksTooLarge { total_chunks } => write!(
836                f,
837                "total_chunks {} exceeds MAX_TOTAL_CHUNKS ({})",
838                total_chunks, MAX_TOTAL_CHUNKS
839            ),
840            Self::ChunkTooLarge { len } => write!(
841                f,
842                "chunk length {} exceeds MAX_SNAPSHOT_CHUNK_SIZE ({})",
843                len, MAX_SNAPSHOT_CHUNK_SIZE
844            ),
845            Self::TotalChunksMismatch { got, expected } => write!(
846                f,
847                "total_chunks {} does not match first chunk's declared {}",
848                got, expected
849            ),
850            Self::StaleSeqThrough { got, latest } => write!(
851                f,
852                "seq_through {} is older than latest accepted {} for this daemon",
853                got, latest
854            ),
855            Self::TooManyPendingBytes {
856                buffered,
857                incoming,
858                cap,
859            } => write!(
860                f,
861                "buffered {} + incoming {} would exceed per-entry cap {}",
862                buffered, incoming, cap
863            ),
864        }
865    }
866}
867
868impl std::error::Error for ReassemblyError {}
869
870/// Reassembles chunked `SnapshotReady` messages into a complete snapshot.
871///
872/// Keyed by `(daemon_origin, seq_through)` so chunks from different snapshot
873/// generations cannot be mixed. At most one in-flight reassembly is kept
874/// per daemon — a chunk for a newer `seq_through` evicts any older pending
875/// state for that daemon, and chunks for older `seq_through` are rejected.
876pub struct SnapshotReassembler {
877    /// Pending reassemblies: (daemon_origin, seq_through) → chunks.
878    pending: std::collections::HashMap<(u64, u64), ReassemblyState>,
879    /// Latest `seq_through` accepted per daemon, for stale-chunk rejection
880    /// even after a reassembly completes and is evicted from `pending`.
881    latest_seq: std::collections::HashMap<u64, u64>,
882    /// Max age applied by the opportunistic sweep at the head of
883    /// every `feed`. Defaults to `MAX_PENDING_REASSEMBLY_AGE`. The
884    /// public `sweep_stale` accepts its own `max_age` and ignores
885    /// this; this field exists so tests can drive the implicit
886    /// sweep without waiting wall-clock minutes.
887    max_pending_age: Duration,
888}
889
890struct ReassemblyState {
891    total_chunks: u32,
892    chunks: std::collections::BTreeMap<u32, Vec<u8>>,
893    /// Sum of `chunks` values' lengths. Maintained explicitly (rather
894    /// than recomputed via `chunks.values().map(Vec::len).sum()` per
895    /// feed) so the `MAX_PENDING_REASSEMBLY_BYTES` gate is O(1) per
896    /// chunk instead of O(chunks).
897    bytes_buffered: usize,
898    /// Time of the most recent chunk arrival for this entry. Resets
899    /// on every accepted chunk so a slow-but-progressing peer never
900    /// trips the age sweep; a stalled entry that hasn't received a
901    /// chunk in `MAX_PENDING_REASSEMBLY_AGE` is dropped by either
902    /// `sweep_stale` or the opportunistic sweep at the head of
903    /// `feed`.
904    last_progress_at: Instant,
905}
906
907impl SnapshotReassembler {
908    /// Create a new reassembler with the default
909    /// `MAX_PENDING_REASSEMBLY_AGE` opportunistic-sweep age.
910    pub fn new() -> Self {
911        Self::with_max_pending_age(MAX_PENDING_REASSEMBLY_AGE)
912    }
913
914    /// Create a reassembler with a custom opportunistic-sweep age.
915    /// Production callers should use `new()`; this exists primarily
916    /// for tests that need to exercise the in-`feed` sweep without
917    /// waiting wall-clock minutes.
918    pub fn with_max_pending_age(max_pending_age: Duration) -> Self {
919        Self {
920            pending: std::collections::HashMap::new(),
921            latest_seq: std::collections::HashMap::new(),
922            max_pending_age,
923        }
924    }
925
926    /// Feed a snapshot chunk.
927    ///
928    /// Returns `Ok(Some(bytes))` when all chunks for the current
929    /// `(daemon_origin, seq_through)` have been received, `Ok(None)` while
930    /// still waiting, and `Err(ReassemblyError)` if the chunk is malformed or
931    /// part of an attacker-shaped sequence. Rejected chunks never mutate
932    /// in-flight state.
933    pub fn feed(
934        &mut self,
935        daemon_origin: u64,
936        snapshot_bytes: Vec<u8>,
937        seq_through: u64,
938        chunk_index: u32,
939        total_chunks: u32,
940    ) -> Result<Option<Vec<u8>>, ReassemblyError> {
941        // ---- Per-chunk validation (no mutation until we've passed these) ----
942        // Pre-fix the opportunistic age sweep ran BEFORE these
943        // structural checks, so a peer that shipped a torrent of
944        // malformed chunks (total_chunks=0, oversize, etc.) forced
945        // an O(n) sweep on every reject — amplifying the malformed-
946        // chunk DoS. Move the sweep after validation so attacker
947        // junk is rejected for O(1) instead.
948        if total_chunks == 0 {
949            return Err(ReassemblyError::ZeroTotalChunks);
950        }
951        if total_chunks > MAX_TOTAL_CHUNKS {
952            return Err(ReassemblyError::TotalChunksTooLarge { total_chunks });
953        }
954        if chunk_index >= total_chunks {
955            return Err(ReassemblyError::ChunkIndexOutOfRange {
956                chunk_index,
957                total_chunks,
958            });
959        }
960
961        // Opportunistic age sweep. Even without an external scheduler
962        // driving `sweep_stale`, the pending map self-heals as new
963        // traffic arrives, so a hostile peer who parks an entry at
964        // the byte cap and goes silent can't keep it alive
965        // indefinitely. Cheap: `pending` is bounded to one entry
966        // per daemon and the retain is O(n) over a small map.
967        self.sweep_stale(self.max_pending_age);
968        // Zero-byte chunks are nonsensical: every legitimate
969        // SnapshotReady carries at least one byte of state (an empty
970        // snapshot would be a 1-byte length-prefixed empty payload,
971        // not a 0-byte chunk). Pre-fix a peer could ship
972        // MAX_TOTAL_CHUNKS = 700_000 zero-byte chunks per reassembly
973        // without ever consuming the documented byte-budget guard,
974        // bookkeeping `BTreeMap` entries until `MAX_TOTAL_CHUNKS`
975        // alone bounded the abuse. Refuse them at the boundary.
976        if snapshot_bytes.is_empty() {
977            return Err(ReassemblyError::ChunkTooLarge { len: 0 });
978        }
979        if snapshot_bytes.len() > MAX_SNAPSHOT_CHUNK_SIZE {
980            return Err(ReassemblyError::ChunkTooLarge {
981                len: snapshot_bytes.len(),
982            });
983        }
984        if let Some(&latest) = self.latest_seq.get(&daemon_origin) {
985            if seq_through < latest {
986                return Err(ReassemblyError::StaleSeqThrough {
987                    got: seq_through,
988                    latest,
989                });
990            }
991        }
992
993        // A newer seq_through for the same daemon evicts older in-flight state.
994        // This is what the public docstring always claimed; without it, the
995        // `pending` map grew unbounded across seq_through values.
996        if self
997            .latest_seq
998            .get(&daemon_origin)
999            .is_none_or(|&latest| seq_through > latest)
1000        {
1001            self.pending
1002                .retain(|&(origin, seq), _| origin != daemon_origin || seq == seq_through);
1003            self.latest_seq.insert(daemon_origin, seq_through);
1004        }
1005
1006        // Single-chunk fast path: no state to keep. Honour the
1007        // total_chunks-mismatch guard before bypassing — a peer that
1008        // shipped chunk 0/3 for `(daemon_origin, seq_through)` and
1009        // followed up with chunk 0/1 for the same key would otherwise
1010        // have the second message accepted as a complete snapshot,
1011        // dodging the mismatch error the multi-chunk path below
1012        // returns. The dedup-by-seq_through eviction above only fires
1013        // when a *newer* seq_through arrives; same-key collisions
1014        // still need to be caught here.
1015        if total_chunks == 1 {
1016            if let Some(state) = self.pending.get(&(daemon_origin, seq_through)) {
1017                if state.total_chunks != 1 {
1018                    return Err(ReassemblyError::TotalChunksMismatch {
1019                        got: 1,
1020                        expected: state.total_chunks,
1021                    });
1022                }
1023            }
1024            self.pending.remove(&(daemon_origin, seq_through));
1025            return Ok(Some(snapshot_bytes));
1026        }
1027
1028        let key = (daemon_origin, seq_through);
1029        let state = self.pending.entry(key).or_insert_with(|| ReassemblyState {
1030            total_chunks,
1031            chunks: std::collections::BTreeMap::new(),
1032            bytes_buffered: 0,
1033            last_progress_at: Instant::now(),
1034        });
1035
1036        // The first chunk fixes total_chunks; later chunks must agree.
1037        if state.total_chunks != total_chunks {
1038            return Err(ReassemblyError::TotalChunksMismatch {
1039                got: total_chunks,
1040                expected: state.total_chunks,
1041            });
1042        }
1043
1044        // Per-entry bytes cap. Refuse a chunk that would push the
1045        // accumulated buffer past `MAX_PENDING_REASSEMBLY_BYTES`.
1046        // Re-sending the same chunk index doesn't double-count: we
1047        // subtract the displaced chunk's length below before
1048        // re-checking. A peer that declares an oversized snapshot
1049        // and ships only some of the chunks can no longer park
1050        // ~4 GiB indefinitely — the cap forces the entry to be
1051        // refused once buffered bytes exceed the ceiling.
1052        let displaced_len = state.chunks.get(&chunk_index).map(Vec::len).unwrap_or(0);
1053        let projected = state
1054            .bytes_buffered
1055            .saturating_sub(displaced_len)
1056            .saturating_add(snapshot_bytes.len());
1057        if projected > MAX_PENDING_REASSEMBLY_BYTES {
1058            return Err(ReassemblyError::TooManyPendingBytes {
1059                buffered: state.bytes_buffered,
1060                incoming: snapshot_bytes.len(),
1061                cap: MAX_PENDING_REASSEMBLY_BYTES,
1062            });
1063        }
1064
1065        let new_len = snapshot_bytes.len();
1066        state.chunks.insert(chunk_index, snapshot_bytes);
1067        state.bytes_buffered = state
1068            .bytes_buffered
1069            .saturating_sub(displaced_len)
1070            .saturating_add(new_len);
1071        state.last_progress_at = Instant::now();
1072
1073        // With `chunk_index < total_chunks` enforced above, the BTreeMap's
1074        // keys are all in 0..total_chunks. Reaching total_chunks entries
1075        // therefore means we have every distinct index exactly once.
1076        if state.chunks.len() == state.total_chunks as usize {
1077            #[expect(
1078                clippy::unwrap_used,
1079                reason = "pending.entry(key).or_insert_with above guarantees the key is present"
1080            )]
1081            let state = self.pending.remove(&key).unwrap();
1082            let mut full = Vec::with_capacity(state.chunks.values().map(|c| c.len()).sum());
1083            for (_idx, chunk) in state.chunks {
1084                full.extend_from_slice(&chunk);
1085            }
1086            Ok(Some(full))
1087        } else {
1088            Ok(None)
1089        }
1090    }
1091
1092    /// Cancel reassembly for a daemon (e.g., on migration abort).
1093    ///
1094    /// Removes all pending reassemblies for this daemon regardless of
1095    /// `seq_through`. Does **not** reset `latest_seq`, so a subsequent
1096    /// replay of old chunks is still rejected.
1097    pub fn cancel(&mut self, daemon_origin: u64) {
1098        self.pending
1099            .retain(|&(origin, _), _| origin != daemon_origin);
1100    }
1101
1102    /// Drop pending reassemblies whose last-progress timestamp is
1103    /// older than `max_age`. Returns the number of entries evicted.
1104    ///
1105    /// Called opportunistically at the head of every `feed`, but
1106    /// also exposed publicly so a topology-aware caller (e.g. the
1107    /// migration dispatcher's housekeeping tick) can drive it on a
1108    /// timer when no inbound traffic is arriving — the
1109    /// `seq_through == latest` path in `feed` cannot self-trigger
1110    /// the sweep without a fresh chunk to cause it.
1111    ///
1112    /// Does **not** reset `latest_seq`: a peer that comes back later
1113    /// with the same `seq_through` is still rejected via the usual
1114    /// `StaleSeqThrough` gate, so dropping the in-flight buffer
1115    /// can't be turned into a snapshot-replacement amplifier.
1116    pub fn sweep_stale(&mut self, max_age: Duration) -> usize {
1117        let before = self.pending.len();
1118        let now = Instant::now();
1119        self.pending.retain(|_, state| {
1120            now.checked_duration_since(state.last_progress_at)
1121                .is_none_or(|age| age < max_age)
1122        });
1123        before - self.pending.len()
1124    }
1125
1126    /// Number of pending reassemblies.
1127    pub fn pending_count(&self) -> usize {
1128        self.pending.len()
1129    }
1130}
1131
1132impl Default for SnapshotReassembler {
1133    fn default() -> Self {
1134        Self::new()
1135    }
1136}
1137
1138// ── Orchestrator ─────────────────────────────────────────────────────────────
1139
1140/// Tracks an in-flight migration with its superposition state.
1141struct MigrationRecord {
1142    state: MigrationState,
1143    superposition: SuperpositionState,
1144    started_at: Instant,
1145}
1146
1147/// One row of [`MigrationOrchestrator::list_migrations`]. Used
1148/// by operator-facing surfaces (Deck MIGRATIONS tab via the
1149/// `MigrationSnapshot` wire form, ICE blast-radius simulator).
1150/// Pre-`MigrationListItem` this was a `(u64, MigrationPhase, u64)`
1151/// tuple; the operator-facing columns outgrew the tuple's
1152/// readability budget, and a named struct documents which
1153/// field is which without per-caller comments.
1154#[derive(Clone, Debug)]
1155pub struct MigrationListItem {
1156    /// Origin hash of the daemon being migrated.
1157    pub daemon_origin: u64,
1158    /// Source node ID.
1159    pub source_node: u64,
1160    /// Target node ID.
1161    pub target_node: u64,
1162    /// Current phase.
1163    pub phase: MigrationPhase,
1164    /// Milliseconds since the migration started.
1165    pub elapsed_ms: u64,
1166    /// Milliseconds since the current phase was entered. Distinct
1167    /// from `elapsed_ms` — a migration ten minutes old that
1168    /// transitioned to Replay one minute ago reports `60_000` here.
1169    pub age_in_phase_ms: u64,
1170    /// Snapshot payload size in bytes; `None` while the source
1171    /// hasn't produced a snapshot yet.
1172    pub snapshot_bytes: Option<u64>,
1173    /// Retry attempts accumulated by orchestrator-driven retries.
1174    pub retries: u32,
1175}
1176
1177/// Coordinates all 6 phases of daemon migration.
1178///
1179/// The orchestrator manages the lifecycle of migrations: initiating snapshots,
1180/// forwarding snapshot data to targets, coordinating replay, executing cutover,
1181/// and cleaning up the source.
1182pub struct MigrationOrchestrator {
1183    /// In-flight migrations: daemon_origin → record.
1184    migrations: DashMap<u64, Mutex<MigrationRecord>>,
1185    /// Local daemon registry (for taking snapshots on local daemons).
1186    daemon_registry: Arc<DaemonRegistry>,
1187    /// Local node ID.
1188    local_node_id: u64,
1189    /// Source-side migration handler.
1190    ///
1191    /// Production wiring (`MigrationDispatcher::new`) sets this so
1192    /// the local-source path of `start_migration` registers the
1193    /// migration in the source-side handler — without that
1194    /// registration, `is_migrating(origin)` returns false and
1195    /// `DaemonRegistry::deliver` keeps mutating the source daemon's
1196    /// state past the snapshot's `seq_through`, so events arriving
1197    /// between snapshot capture and cutover would be silently lost.
1198    /// Tests that don't exercise the local-source path can leave
1199    /// this as `None`; the fallback is direct-snapshot behavior
1200    /// with a `tracing::warn!` flagging the gap.
1201    source_handler: Option<Arc<MigrationSourceHandler>>,
1202}
1203
1204impl MigrationOrchestrator {
1205    /// Create a new orchestrator.
1206    ///
1207    /// The source-side handler defaults to `None`. Production
1208    /// callers should chain [`Self::with_source_handler`] to wire
1209    /// the orchestrator to the dispatcher's source handler — see
1210    /// the field doc for why that matters.
1211    pub fn new(daemon_registry: Arc<DaemonRegistry>, local_node_id: u64) -> Self {
1212        Self {
1213            migrations: DashMap::new(),
1214            daemon_registry,
1215            local_node_id,
1216            source_handler: None,
1217        }
1218    }
1219
1220    /// Builder: install the source-side migration handler. Required
1221    /// for correct local-source migration behavior.
1222    /// `MigrationDispatcher::new` already calls this; tests that
1223    /// exercise the local-source code path must call it explicitly.
1224    pub fn with_source_handler(mut self, source_handler: Arc<MigrationSourceHandler>) -> Self {
1225        self.source_handler = Some(source_handler);
1226        self
1227    }
1228
1229    /// Shared handle to the daemon registry this orchestrator was
1230    /// built against. Exposed so the migration subprotocol
1231    /// dispatcher can reach the registry without an extra `Arc`
1232    /// plumbed alongside.
1233    pub fn daemon_registry(&self) -> &Arc<DaemonRegistry> {
1234        &self.daemon_registry
1235    }
1236
1237    /// Initiate a migration (phase 0: Snapshot).
1238    ///
1239    /// If the source is the local node, takes the snapshot immediately and
1240    /// returns `SnapshotReady`. Otherwise, returns `TakeSnapshot` for the
1241    /// caller to send to the source node.
1242    pub fn start_migration(
1243        &self,
1244        daemon_origin: u64,
1245        source_node: u64,
1246        target_node: u64,
1247    ) -> Result<Vec<MigrationMessage>, MigrationError> {
1248        // Atomic check-and-insert via entry() to prevent TOCTOU races
1249        let entry = match self.migrations.entry(daemon_origin) {
1250            dashmap::mapref::entry::Entry::Occupied(_) => {
1251                return Err(MigrationError::AlreadyMigrating(daemon_origin));
1252            }
1253            dashmap::mapref::entry::Entry::Vacant(entry) => entry,
1254        };
1255
1256        let mut state = MigrationState::new(daemon_origin, source_node, target_node);
1257
1258        // If we are the source, take snapshot locally.
1259        if source_node == self.local_node_id {
1260            // The snapshot MUST be routed through the source-side
1261            // handler so it gets registered for the migration;
1262            // otherwise `source_handler.is_migrating(origin)`
1263            // returns false and `DaemonRegistry::deliver` keeps
1264            // routing post-snapshot events into the live source
1265            // daemon's state, losing them at cutover. The remote-
1266            // source path goes through
1267            // `source_handler.start_snapshot` via the dispatcher
1268            // (`migration_handler.rs:310-312`); we mirror that
1269            // here.
1270            //
1271            // `orchestrator_node` is `local_node_id` here — for a
1272            // self-initiated local migration the orchestrator IS this
1273            // node, so the source handler's reply-routing field
1274            // points back to us.
1275            let snapshot = match &self.source_handler {
1276                Some(handler) => {
1277                    handler.start_snapshot(daemon_origin, target_node, self.local_node_id)?
1278                }
1279                None => {
1280                    // Surface the unwired-source-handler path
1281                    // loudly. Production wiring
1282                    // (`MigrationDispatcher::new`) always sets the
1283                    // handler, so this branch is only reached by
1284                    // tests / direct orchestrator construction.
1285                    // Gating this with `cfg(not(test))` and Err in
1286                    // production would break integration tests
1287                    // that link against the library compiled
1288                    // WITHOUT `cfg(test)`. Warn-loud is the
1289                    // actionable signal: operators running under
1290                    // tracing see the missing-handler condition
1291                    // without the cfg mismatch silently breaking
1292                    // integration tests that intentionally
1293                    // exercise the orchestrator without a
1294                    // dispatcher.
1295                    tracing::warn!(
1296                        daemon_origin = format_args!("{:#x}", daemon_origin),
1297                        "MigrationOrchestrator::start_migration on local source without \
1298                         a source_handler installed — events arriving between snapshot \
1299                         capture and cutover may be silently lost. \
1300                         Production callers wire the handler via \
1301                         `MigrationDispatcher::new`. Direct orchestrator construction \
1302                         should call `MigrationOrchestrator::with_source_handler`."
1303                    );
1304                    self.daemon_registry
1305                        .snapshot(daemon_origin)
1306                        .map_err(|e| MigrationError::StateFailed(e.to_string()))?
1307                        .ok_or_else(|| {
1308                            MigrationError::StateFailed(
1309                                "daemon is stateless or snapshot failed".into(),
1310                            )
1311                        })?
1312                }
1313            };
1314
1315            // Surface oversized-snapshot errors as a
1316            // MigrationError instead of a panic that would crash the
1317            // dispatch task without releasing locks.
1318            let snapshot_bytes = snapshot
1319                .try_to_bytes()
1320                .map_err(|e| MigrationError::StateFailed(e.to_string()))?;
1321            let seq_through = snapshot.through_seq;
1322
1323            state.set_snapshot(snapshot)?;
1324
1325            let source_head = state
1326                .snapshot()
1327                .map(|s| s.chain_link)
1328                .unwrap_or_else(|| CausalLink::genesis(daemon_origin, 0));
1329
1330            let superposition = SuperpositionState::new(daemon_origin, source_head);
1331
1332            entry.insert(Mutex::new(MigrationRecord {
1333                state,
1334                superposition,
1335                started_at: Instant::now(),
1336            }));
1337
1338            // Pre-fix this returned a single `SnapshotReady`
1339            // with `chunk_index: 0, total_chunks: 1` regardless
1340            // of `snapshot_bytes.len()`. Any snapshot larger
1341            // than `MAX_SNAPSHOT_CHUNK_SIZE` (7 KB) was
1342            // rejected at the wire encoder
1343            // (`migration_handler.rs:336`) and the receiver
1344            // dropped it as `ChunkTooLarge`. Locally-initiated
1345            // migration of any stateful daemon with a
1346            // non-trivial state vector (cached models, large
1347            // bindings, behaviour history) thus could not be
1348            // sent at all. Route through `chunk_snapshot` so a
1349            // multi-chunk snapshot returns multiple messages
1350            // for the caller to dispatch in order.
1351            chunk_snapshot(daemon_origin, snapshot_bytes, seq_through)
1352        } else {
1353            let source_head = CausalLink::genesis(daemon_origin, 0);
1354            let superposition = SuperpositionState::new(daemon_origin, source_head);
1355
1356            entry.insert(Mutex::new(MigrationRecord {
1357                state,
1358                superposition,
1359                started_at: Instant::now(),
1360            }));
1361
1362            Ok(vec![MigrationMessage::TakeSnapshot {
1363                daemon_origin,
1364                target_node,
1365            }])
1366        }
1367    }
1368
1369    /// Initiate a migration with automatic target selection.
1370    ///
1371    /// Uses the scheduler to find the best migration-capable target node
1372    /// based on the daemon's capability requirements. The scheduler queries
1373    /// the `CapabilityIndex` for nodes advertising `subprotocol:0x0500`.
1374    ///
1375    /// Returns the target node ID and the first migration message.
1376    pub fn start_migration_auto(
1377        &self,
1378        daemon_origin: u64,
1379        source_node: u64,
1380        scheduler: &super::Scheduler,
1381        daemon_filter: &crate::adapter::net::behavior::capability::CapabilityFilter,
1382    ) -> Result<(u64, Vec<MigrationMessage>), MigrationError> {
1383        // Map a scheduler "no candidate" / "index unavailable"
1384        // outcome to the typed `NoTargetAvailable` variant. Pre-
1385        // fix this used `TargetUnavailable(0)`, surfacing
1386        // "target node 0x0 unavailable" to operators — confusing
1387        // because no specific node id was ever attempted; the
1388        // auto-placement found nobody to attempt against.
1389        // Phase G slice 8 — auto-target migration runs through v2
1390        // placement by default. Ranking flows via
1391        // `select_migration_target` + LOCKED §7 tie-breaker;
1392        // observable eligibility matches v1 because v2 wraps
1393        // `LegacyPlacement::permissive`. The legacy
1394        // `Scheduler::place_migration` is still available for
1395        // callers who explicitly want the v1 contract.
1396        let placement = scheduler
1397            .place_migration_v2(daemon_filter, source_node)
1398            .map_err(|_| MigrationError::NoTargetAvailable)?;
1399
1400        let target_node = placement.node_id;
1401        let msgs = self.start_migration(daemon_origin, source_node, target_node)?;
1402        Ok((target_node, msgs))
1403    }
1404
1405    /// Handle snapshot taken on source (phase 1→2).
1406    ///
1407    /// Validates and stores the snapshot, advances to Transfer phase.
1408    /// Returns the message to forward to the target node. For chunked
1409    /// snapshots, only the first chunk (index 0) triggers validation
1410    /// and phase advancement — subsequent chunks are forwarded as-is.
1411    pub fn on_snapshot_ready(
1412        &self,
1413        daemon_origin: u64,
1414        snapshot_bytes: Vec<u8>,
1415        seq_through: u64,
1416        chunk_index: u32,
1417        total_chunks: u32,
1418    ) -> Result<MigrationMessage, MigrationError> {
1419        let entry = self
1420            .migrations
1421            .get(&daemon_origin)
1422            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1423
1424        let mut record = entry.lock();
1425
1426        // Per-chunk envelope validation. Runs on chunk_index == 0
1427        // for both single-chunk and multi-chunk paths so the
1428        // orchestrator catches header-level corruption symmetrically.
1429        // Single-chunk's later `from_bytes` is a strictly stronger
1430        // check; the helper is a fast pre-decode rejection.
1431        // Multi-chunk used to defer ALL validation to the target
1432        // after reassembly — chunk 0 of a multi-chunk send with the
1433        // wrong magic / version / origin would only be caught after
1434        // reassembly finished.
1435        validate_chunk_header(chunk_index, &snapshot_bytes, daemon_origin)?;
1436
1437        // Only validate and advance phase on the first chunk
1438        if chunk_index == 0 && total_chunks == 1 {
1439            // Single-chunk: validate immediately and set snapshot
1440            let snapshot = StateSnapshot::from_bytes(&snapshot_bytes).ok_or_else(|| {
1441                MigrationError::StateFailed("failed to parse snapshot bytes".into())
1442            })?;
1443
1444            // Cross-check the wire-supplied seq_through against the
1445            // payload's snapshot.through_seq. Pre-fix a buggy or
1446            // malicious source could ship a SnapshotReady whose wire
1447            // seq_through disagreed with the snapshot's through_seq;
1448            // the disagreement then propagated to the target where
1449            // restore_snapshot consumed snapshot.through_seq for
1450            // replayed_through but logs / retry decisions / audit
1451            // used the wire field, producing a debugging trap.
1452            if snapshot.through_seq != seq_through {
1453                return Err(MigrationError::StateFailed(format!(
1454                    "SnapshotReady: wire seq_through {} disagrees with snapshot.through_seq {}",
1455                    seq_through, snapshot.through_seq,
1456                )));
1457            }
1458
1459            if record.state.phase() == MigrationPhase::Snapshot {
1460                record.state.set_snapshot(snapshot)?;
1461            }
1462        } else if chunk_index == 0 {
1463            // Multi-chunk: can't validate until target reassembles all chunks.
1464            // Advance phase past Snapshot so buffering and subsequent phases work.
1465            // The target will validate the full snapshot after reassembly.
1466            if record.state.phase() == MigrationPhase::Snapshot {
1467                record.state.force_phase(MigrationPhase::Transfer);
1468            }
1469        }
1470
1471        // Update superposition on first chunk
1472        if chunk_index == 0 {
1473            record.superposition.advance(MigrationPhase::Transfer);
1474        }
1475
1476        // Forward to target
1477        Ok(MigrationMessage::SnapshotReady {
1478            daemon_origin,
1479            snapshot_bytes,
1480            seq_through,
1481            chunk_index,
1482            total_chunks,
1483        })
1484    }
1485
1486    /// Handle restore complete on target (phase 2→3).
1487    ///
1488    /// Advances to Replay phase. Returns buffered events message if there
1489    /// are any, or None if no events were buffered.
1490    pub fn on_restore_complete(
1491        &self,
1492        daemon_origin: u64,
1493        _restored_seq: u64,
1494    ) -> Result<Option<MigrationMessage>, MigrationError> {
1495        let entry = self
1496            .migrations
1497            .get(&daemon_origin)
1498            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1499
1500        let mut record = entry.lock();
1501
1502        // Advance: Transfer → Restore → Replay
1503        if record.state.phase() == MigrationPhase::Transfer {
1504            record.state.transfer_complete()?;
1505        }
1506        if record.state.phase() == MigrationPhase::Restore {
1507            record.state.restore_complete()?;
1508        }
1509
1510        record.superposition.advance(MigrationPhase::Replay);
1511
1512        // No buffered events ride here — buffering goes through the
1513        // source-side handler exclusively (`MigrationSourceHandler::buffer_event`
1514        // / `on_cutover`'s drain). The orchestrator never holds a
1515        // buffer of its own; the only buffered-events surface is
1516        // the source's drain at cutover time.
1517        Ok(None)
1518    }
1519
1520    /// Handle replay complete on target (phase 3→4).
1521    ///
1522    /// `target_head` is the target's chain head after replay, shipped
1523    /// over the wire by the target node. The orchestrator stamps it
1524    /// into `SuperpositionState::target_replayed` so the continuity
1525    /// proof carries the real cryptographic anchor even when the
1526    /// orchestrator lives on a third node and has no local daemon
1527    /// registry entry to read from.
1528    ///
1529    /// Returns `CutoverNotify` to send to the source node.
1530    pub fn on_replay_complete(
1531        &self,
1532        daemon_origin: u64,
1533        replayed_seq: u64,
1534        target_head: CausalLink,
1535    ) -> Result<MigrationMessage, MigrationError> {
1536        let entry = self
1537            .migrations
1538            .get(&daemon_origin)
1539            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1540
1541        let mut record = entry.lock();
1542
1543        // Sanity-check the wire-shipped head against the wire-shipped
1544        // replayed_seq. A mismatch signals either a buggy target or a
1545        // wire-level tamper; either way the link is no longer
1546        // trustworthy as the migration's authoritative anchor.
1547        if target_head.sequence != replayed_seq {
1548            tracing::warn!(
1549                daemon_origin = format_args!("{:#x}", daemon_origin),
1550                head_seq = target_head.sequence,
1551                replayed_seq,
1552                "on_replay_complete: target_head.sequence disagrees with replayed_seq; \
1553                 using target_head as-is but the continuity proof anchor may not match \
1554                 the post-replay chain head"
1555            );
1556        }
1557        record.superposition.target_replayed(target_head);
1558
1559        // Advance to Cutover
1560        if record.state.phase() == MigrationPhase::Replay {
1561            record.state.replay_complete()?;
1562        }
1563
1564        record.superposition.advance(MigrationPhase::Cutover);
1565        record.superposition.collapse();
1566
1567        let target_node = record.state.target_node();
1568
1569        Ok(MigrationMessage::CutoverNotify {
1570            daemon_origin,
1571            target_node,
1572        })
1573    }
1574
1575    /// Handle cutover acknowledged by source.
1576    ///
1577    /// Source has stopped accepting writes. Advances to Complete.
1578    pub fn on_cutover_acknowledged(&self, daemon_origin: u64) -> Result<(), MigrationError> {
1579        let entry = self
1580            .migrations
1581            .get(&daemon_origin)
1582            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1583
1584        let mut record = entry.lock();
1585
1586        if record.state.phase() == MigrationPhase::Cutover {
1587            record.state.cutover_complete()?;
1588        }
1589
1590        record.superposition.advance(MigrationPhase::Complete);
1591        record.superposition.resolve();
1592
1593        Ok(())
1594    }
1595
1596    /// Handle cleanup complete from source (phase 5→6).
1597    ///
1598    /// The source has stopped accepting writes and freed its local daemon
1599    /// state. Advances Cutover→Complete on the orchestrator — the source's
1600    /// local `on_cutover_acknowledged` call is a no-op when the orchestrator
1601    /// lives on a different node (it operates on the source's local
1602    /// orchestrator, which has no record), so `CleanupComplete` is the
1603    /// authoritative signal on the orchestrator side. The record is kept in
1604    /// place until the target acknowledges activation via `on_activate_ack`,
1605    /// so the subprotocol handler still has somewhere to look up
1606    /// `target_node` when it needs to route `ActivateTarget`.
1607    pub fn on_cleanup_complete(
1608        &self,
1609        daemon_origin: u64,
1610    ) -> Result<MigrationMessage, MigrationError> {
1611        let entry = self
1612            .migrations
1613            .get(&daemon_origin)
1614            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1615        let mut record = entry.lock();
1616        if record.state.phase() == MigrationPhase::Cutover {
1617            record.state.cutover_complete()?;
1618        }
1619        // Also resolve `SuperpositionState` here, mirroring
1620        // `on_cutover_acknowledged`. On a remote orchestrator
1621        // `on_cutover_acknowledged` is a no-op (operates on the
1622        // source's local orchestrator, which has no record for
1623        // this daemon), so this path is the ONLY authoritative
1624        // one. Without the resolve, `SuperpositionState` would be
1625        // stuck mid-collapse and operator dashboards / readiness
1626        // probes / SDK handles keyed on superposition state
1627        // wouldn't observe resolution until `on_activate_ack`
1628        // removed the record entirely. The advance/resolve is
1629        // idempotent — safe to run on the local-orchestrator path
1630        // too if both signals arrive on the same node.
1631        record.superposition.advance(MigrationPhase::Complete);
1632        record.superposition.resolve();
1633        Ok(MigrationMessage::ActivateTarget { daemon_origin })
1634    }
1635
1636    /// Handle activation acknowledgement from target (phase 6 terminus).
1637    ///
1638    /// The target has drained remaining events and is now the authoritative
1639    /// copy. This is the true end of the migration lifecycle; the record is
1640    /// removed here, not in `on_cleanup_complete`.
1641    pub fn on_activate_ack(
1642        &self,
1643        daemon_origin: u64,
1644        _replayed_seq: u64,
1645    ) -> Result<(), MigrationError> {
1646        self.migrations
1647            .remove(&daemon_origin)
1648            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1649        Ok(())
1650    }
1651
1652    /// Abort a migration at any phase.
1653    ///
1654    /// Returns the abort message to broadcast to involved nodes.
1655    /// `reason` is wrapped in [`MigrationFailureReason::StateFailed`]
1656    /// since a generic abort doesn't fit any of the more specific
1657    /// variants.
1658    pub fn abort_migration(
1659        &self,
1660        daemon_origin: u64,
1661        reason: String,
1662    ) -> Result<MigrationMessage, MigrationError> {
1663        self.abort_migration_with_reason(daemon_origin, MigrationFailureReason::StateFailed(reason))
1664    }
1665
1666    /// Abort a migration with a caller-supplied structured reason.
1667    ///
1668    /// Removes the orchestrator's record AND, if wired, also clears
1669    /// the matching entry on the local `MigrationSourceHandler`.
1670    /// Pre-fix only the orchestrator entry was removed; the source
1671    /// handler's `migrations` map retained the entry, so
1672    /// `is_migrating()` stayed true forever and `buffer_event` kept
1673    /// stuffing the now-undrained buffered_events vector. Subsequent
1674    /// retry attempts then tripped `AlreadyMigrating` against a
1675    /// migration the orchestrator believed was already aborted.
1676    pub fn abort_migration_with_reason(
1677        &self,
1678        daemon_origin: u64,
1679        reason: MigrationFailureReason,
1680    ) -> Result<MigrationMessage, MigrationError> {
1681        self.migrations
1682            .remove(&daemon_origin)
1683            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1684
1685        // Clear the source-side mirror entry. `abort` is a no-op if
1686        // the daemon was never tracked there (e.g. remote-source
1687        // migration where this node is only the orchestrator), so
1688        // calling it unconditionally is safe.
1689        if let Some(source) = &self.source_handler {
1690            let _ = source.abort(daemon_origin);
1691        }
1692
1693        Ok(MigrationMessage::MigrationFailed {
1694            daemon_origin,
1695            reason,
1696        })
1697    }
1698
1699    /// Check if a daemon is currently being migrated.
1700    pub fn is_migrating(&self, daemon_origin: u64) -> bool {
1701        self.migrations.contains_key(&daemon_origin)
1702    }
1703
1704    /// Get migration status for a daemon.
1705    pub fn status(&self, daemon_origin: u64) -> Option<MigrationPhase> {
1706        self.migrations
1707            .get(&daemon_origin)
1708            .map(|entry| entry.lock().state.phase())
1709    }
1710
1711    /// Get the source node for an in-flight migration.
1712    pub fn source_node(&self, daemon_origin: u64) -> Option<u64> {
1713        self.migrations
1714            .get(&daemon_origin)
1715            .map(|entry| entry.lock().state.source_node())
1716    }
1717
1718    /// Get the target node for an in-flight migration.
1719    pub fn target_node(&self, daemon_origin: u64) -> Option<u64> {
1720        self.migrations
1721            .get(&daemon_origin)
1722            .map(|entry| entry.lock().state.target_node())
1723    }
1724
1725    /// Get superposition phase for a daemon.
1726    pub fn superposition_phase(
1727        &self,
1728        daemon_origin: u64,
1729    ) -> Option<crate::adapter::net::continuity::superposition::SuperpositionPhase> {
1730        self.migrations
1731            .get(&daemon_origin)
1732            .map(|entry| entry.lock().superposition.phase())
1733    }
1734
1735    /// List all in-flight migrations: (daemon_origin, phase, elapsed_ms).
1736    pub fn list_migrations(&self) -> Vec<MigrationListItem> {
1737        self.migrations
1738            .iter()
1739            .map(|entry| {
1740                let record = entry.lock();
1741                // Saturating cast through the same helper migration.rs
1742                // already publishes. Pre-fix the raw `as u64` wrapped
1743                // instead of saturating on a (theoretical) u128 over
1744                // u64::MAX milliseconds — inconsistent with the
1745                // canonical state.elapsed_ms() used in migration.rs:283.
1746                let elapsed =
1747                    u64::try_from(record.started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
1748                MigrationListItem {
1749                    daemon_origin: *entry.key(),
1750                    source_node: record.state.source_node(),
1751                    target_node: record.state.target_node(),
1752                    phase: record.state.phase(),
1753                    elapsed_ms: elapsed,
1754                    age_in_phase_ms: record.state.age_in_phase_ms(),
1755                    snapshot_bytes: record.state.snapshot_size_bytes(),
1756                    retries: record.state.retry_count(),
1757                }
1758            })
1759            .collect()
1760    }
1761
1762    /// Number of active migrations.
1763    pub fn active_count(&self) -> usize {
1764        self.migrations.len()
1765    }
1766}
1767
1768impl std::fmt::Debug for MigrationOrchestrator {
1769    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1770        f.debug_struct("MigrationOrchestrator")
1771            .field("active_migrations", &self.migrations.len())
1772            .field("local_node_id", &format!("{:#x}", self.local_node_id))
1773            .finish()
1774    }
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779    use super::*;
1780    use crate::adapter::net::behavior::capability::CapabilityFilter;
1781    use crate::adapter::net::compute::{
1782        DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
1783    };
1784    use crate::adapter::net::identity::EntityKeypair;
1785    use bytes::{BufMut, Bytes};
1786
1787    /// Build a minimal chunk-0-shaped buffer:
1788    /// `[magic(4)][version(1)][entity_id(32)][... padding ...]`.
1789    /// Validator only looks at the first 37 bytes; padding to
1790    /// 64 is for visual clarity.
1791    fn synth_chunk_header(magic: &[u8; 4], version: u8, entity_bytes: &[u8; 32]) -> Vec<u8> {
1792        let mut buf = Vec::with_capacity(64);
1793        buf.extend_from_slice(magic);
1794        buf.push(version);
1795        buf.extend_from_slice(entity_bytes);
1796        buf.extend_from_slice(&[0u8; 27]);
1797        buf
1798    }
1799
1800    #[test]
1801    fn validate_chunk_header_accepts_well_formed_chunk_0() {
1802        let kp = EntityKeypair::generate();
1803        let bytes = synth_chunk_header(
1804            SNAPSHOT_WIRE_MAGIC,
1805            SNAPSHOT_VERSION,
1806            kp.entity_id().as_bytes(),
1807        );
1808        let res = validate_chunk_header(0, &bytes, kp.origin_hash());
1809        assert!(res.is_ok(), "well-formed chunk 0 must validate: {res:?}");
1810    }
1811
1812    #[test]
1813    fn validate_chunk_header_rejects_wrong_magic() {
1814        let kp = EntityKeypair::generate();
1815        let bytes = synth_chunk_header(b"XXXX", SNAPSHOT_VERSION, kp.entity_id().as_bytes());
1816        let err = validate_chunk_header(0, &bytes, kp.origin_hash()).unwrap_err();
1817        assert!(matches!(err, MigrationError::StateFailed(_)));
1818    }
1819
1820    #[test]
1821    fn validate_chunk_header_rejects_wrong_version() {
1822        let kp = EntityKeypair::generate();
1823        let bytes = synth_chunk_header(SNAPSHOT_WIRE_MAGIC, 0xFE, kp.entity_id().as_bytes());
1824        let err = validate_chunk_header(0, &bytes, kp.origin_hash()).unwrap_err();
1825        assert!(matches!(err, MigrationError::StateFailed(_)));
1826    }
1827
1828    #[test]
1829    fn validate_chunk_header_rejects_wrong_origin_claim() {
1830        let kp_a = EntityKeypair::generate();
1831        let kp_b = EntityKeypair::generate();
1832        let bytes = synth_chunk_header(
1833            SNAPSHOT_WIRE_MAGIC,
1834            SNAPSHOT_VERSION,
1835            kp_a.entity_id().as_bytes(),
1836        );
1837        // Expect kp_b's origin but the chunk carries kp_a's entity.
1838        let err = validate_chunk_header(0, &bytes, kp_b.origin_hash()).unwrap_err();
1839        assert!(matches!(err, MigrationError::StateFailed(_)));
1840    }
1841
1842    #[test]
1843    fn validate_chunk_header_rejects_too_short_chunk_0() {
1844        let bytes = [0u8; 12]; // < 37 envelope-min
1845        let err = validate_chunk_header(0, &bytes, 0xDEAD_BEEF).unwrap_err();
1846        assert!(matches!(err, MigrationError::StateFailed(_)));
1847    }
1848
1849    #[test]
1850    fn validate_chunk_header_passes_non_chunk_0_unchecked() {
1851        // Subsequent chunks carry no envelope — validator no-ops.
1852        let bytes = [0u8; 8]; // arbitrary short payload
1853        for idx in [1u32, 2, 47, u32::MAX] {
1854            let res = validate_chunk_header(idx, &bytes, 0xDEAD_BEEF);
1855            assert!(res.is_ok(), "chunk_index {idx} must pass through");
1856        }
1857    }
1858
1859    struct CounterDaemon {
1860        count: u64,
1861    }
1862
1863    impl CounterDaemon {
1864        fn new() -> Self {
1865            Self { count: 0 }
1866        }
1867    }
1868
1869    impl MeshDaemon for CounterDaemon {
1870        fn name(&self) -> &str {
1871            "counter"
1872        }
1873        fn requirements(&self) -> CapabilityFilter {
1874            CapabilityFilter::default()
1875        }
1876        fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
1877            self.count += 1;
1878            Ok(vec![Bytes::from(self.count.to_le_bytes().to_vec())])
1879        }
1880        fn snapshot(&self) -> Option<Bytes> {
1881            Some(Bytes::from(self.count.to_le_bytes().to_vec()))
1882        }
1883        fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
1884            if state.len() != 8 {
1885                return Err(DaemonError::RestoreFailed("bad state size".into()));
1886            }
1887            self.count = u64::from_le_bytes(state[..8].try_into().unwrap());
1888            Ok(())
1889        }
1890    }
1891
1892    fn setup_registry() -> (Arc<DaemonRegistry>, u64) {
1893        let reg = Arc::new(DaemonRegistry::new());
1894        let kp = EntityKeypair::generate();
1895        let origin = kp.origin_hash();
1896        let host = DaemonHost::new(
1897            Box::new(CounterDaemon::new()),
1898            kp,
1899            DaemonHostConfig::default(),
1900        );
1901        reg.register(host).unwrap();
1902        (reg, origin)
1903    }
1904
1905    /// CR-32: pin that the unwired-source-handler path emits a
1906    /// loud `tracing::warn!` referencing `source_handler`. Pre-fix
1907    /// this path silently fell back to `daemon_registry.snapshot`
1908    /// — events arriving between snapshot capture and cutover got
1909    /// lost.
1910    ///
1911    /// History: an earlier attempt added a `cfg(not(test))` gate
1912    /// that returned `Err` in production. That broke integration
1913    /// tests because they link against the library compiled
1914    /// WITHOUT `cfg(test)`. Reverted to warn-loud + still-fall-
1915    /// back. Production callers wire `source_handler` via
1916    /// `MigrationDispatcher::new`; the warn fires only on direct
1917    /// orchestrator construction (tests / SDK consumers who skip
1918    /// the dispatcher).
1919    ///
1920    /// Tripwire pins the warn-message shape so a future maintainer
1921    /// who removes the `tracing::warn!` (or drops the
1922    /// `source_handler` reference) trips the test.
1923    #[test]
1924    fn cr32_unwired_source_handler_must_emit_loud_warn() {
1925        let src = include_str!("orchestrator.rs");
1926
1927        // Anchor on a string we BUILD at runtime so this test's
1928        // own source doesn't contain the verbatim anchor —
1929        // otherwise `src.find(anchor)` would match the test's own
1930        // literal and the test would silently pass even after the
1931        // production warn block was removed.
1932        //
1933        // The runtime-assembled anchor only matches the
1934        // production comment line, which writes the marker as
1935        // a plain English phrase rather than concatenating
1936        // fragments.
1937        let anchor = format!("{}{}{}", "Surface ", "the unwired-", "source-handler");
1938        let anchor_idx = src.find(&anchor).expect(
1939            "regression: the production unwired-source-handler marker in \
1940             start_migration's None arm is gone — either the fix was reverted or \
1941             the comment was rewritten. If the fix is intentionally being changed, \
1942             update this test.",
1943        );
1944
1945        // Sanity: the anchor must occur exactly ONCE so the
1946        // production block is the only match. The earlier shape
1947        // would falsely pass if the anchor existed anywhere else
1948        // in the file, including in this test's own source.
1949        let occurrences = src.matches(&anchor).count();
1950        assert_eq!(
1951            occurrences, 1,
1952            "anchor must occur exactly once in orchestrator.rs (production \
1953             site). Got {occurrences} occurrences — the test source likely contains \
1954             a verbatim copy of the anchor, defeating the tripwire."
1955        );
1956
1957        let block: String = src[anchor_idx..]
1958            .lines()
1959            .take(20)
1960            .collect::<Vec<_>>()
1961            .join("\n");
1962        assert!(
1963            block.contains("tracing::warn!"),
1964            "regression: unwired-source-handler path must emit \
1965             tracing::warn!. Block:\n{}",
1966            block
1967        );
1968        assert!(
1969            block.contains("source_handler"),
1970            "regression: warn message must reference source_handler"
1971        );
1972        assert!(
1973            block.contains("MigrationDispatcher::new") || block.contains("with_source_handler"),
1974            "regression: warn message must point operators at how to wire \
1975             the handler (`MigrationDispatcher::new` or \
1976             `MigrationOrchestrator::with_source_handler`) so the log line is \
1977             actionable. Block:\n{}",
1978            block
1979        );
1980    }
1981
1982    #[test]
1983    fn test_start_migration_local_source() {
1984        let (reg, origin) = setup_registry();
1985        let orch = MigrationOrchestrator::new(reg, 0x1111);
1986
1987        let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
1988        assert!(!msgs.is_empty(), "must emit at least one chunk");
1989        match &msgs[0] {
1990            MigrationMessage::SnapshotReady { daemon_origin, .. } => {
1991                assert_eq!(*daemon_origin, origin);
1992            }
1993            other => panic!("expected SnapshotReady, got {:?}", other),
1994        }
1995
1996        assert!(orch.is_migrating(origin));
1997        assert_eq!(orch.status(origin), Some(MigrationPhase::Transfer));
1998    }
1999
2000    /// Regression: pre-fix the local-source path called
2001    /// `daemon_registry.snapshot()` directly and never invoked
2002    /// `MigrationSourceHandler::start_snapshot`. The source-side
2003    /// handler had no record of the migration; `is_migrating(origin)`
2004    /// returned false; callers that consulted the source handler to
2005    /// gate their buffering / write-rejection paths skipped them.
2006    /// At cutover, `on_cutover` returned `DaemonNotFound` and the
2007    /// dispatcher's tolerance fallback swallowed any buffered
2008    /// events that *might* have been collected. The fix wires the
2009    /// orchestrator to the source handler via `with_source_handler`
2010    /// and routes the local-source path through
2011    /// `source_handler.start_snapshot`. Pin both halves of the
2012    /// post-fix invariant directly.
2013    #[test]
2014    fn local_source_migration_registers_in_source_handler() {
2015        let (reg, origin) = setup_registry();
2016        let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
2017        let orch =
2018            MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
2019
2020        // Pre-condition: no migration registered anywhere.
2021        assert!(!source_handler.is_migrating(origin));
2022        assert!(!orch.is_migrating(origin));
2023
2024        let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2025
2026        // Post-condition: BOTH the orchestrator and the source
2027        // handler have records of the migration. Pre-fix only the
2028        // orchestrator had a record; the source handler's
2029        // `is_migrating(origin)` returned false (the
2030        // failure mode).
2031        assert!(
2032            source_handler.is_migrating(origin),
2033            "regression: source_handler must have a record \
2034             of the local-source migration after `start_migration` \
2035             returns",
2036        );
2037        assert!(orch.is_migrating(origin));
2038    }
2039
2040    /// Regression: pin that the dispatcher's cutover path now finds
2041    /// a real `source_handler` record for local-source migrations
2042    /// and correctly drains buffered events. Pre-fix `on_cutover`
2043    /// returned `DaemonNotFound` for any local-source migration
2044    /// (the orchestrator never called `start_snapshot`), and the
2045    /// dispatcher's tolerance fallback (`migration_handler.rs:537`)
2046    /// swallowed the error and treated the cutover as having no
2047    /// buffered events to forward. Post-fix `on_cutover` finds the
2048    /// record and returns the buffered events for forwarding.
2049    /// A future refactor that drops the `start_snapshot` wire-up
2050    /// in the orchestrator's local branch would silently regress
2051    /// this end-to-end drain — this test pins it directly.
2052    #[test]
2053    fn local_source_cutover_drains_buffered_events_through_source_handler() {
2054        use crate::adapter::net::state::causal::CausalEvent;
2055        use bytes::Bytes;
2056
2057        let (reg, origin) = setup_registry();
2058        let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
2059        let orch =
2060            MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
2061
2062        let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2063
2064        // Buffer two events through the source handler — the
2065        // dispatcher will drain these on cutover.
2066        for seq in 1..=2u64 {
2067            let event = CausalEvent {
2068                link: CausalLink {
2069                    origin_hash: origin,
2070                    horizon_encoded: 0,
2071                    sequence: seq,
2072                    parent_hash: 0,
2073                },
2074                payload: Bytes::from_static(b"buffered"),
2075                received_at: 0,
2076            };
2077            assert!(source_handler.buffer_event(origin, event).unwrap());
2078        }
2079
2080        let drained = source_handler
2081            .on_cutover(origin)
2082            .expect("post-fix on_cutover must find the local-source migration record");
2083        assert_eq!(
2084            drained.len(),
2085            2,
2086            "cutover must drain the buffered events for forwarding to target — \
2087             pre-fix this returned `DaemonNotFound` for local-source migrations \
2088             and the buffered events were silently lost",
2089        );
2090    }
2091
2092    /// Regression: with the source handler registered,
2093    /// `source_handler.buffer_event` is now invokable for a
2094    /// local-source migration. Pre-fix it returned `Ok(false)`
2095    /// ("no migration active") because `start_snapshot` had never
2096    /// run. Pin the fix-enabled-functionality directly: a caller
2097    /// that funnels post-snapshot events through
2098    /// `source_handler.buffer_event` gets them buffered (and
2099    /// drainable at cutover via `on_cutover`).
2100    #[test]
2101    fn local_source_migration_enables_source_handler_buffering() {
2102        use crate::adapter::net::state::causal::CausalEvent;
2103        use bytes::Bytes;
2104
2105        let (reg, origin) = setup_registry();
2106        let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
2107        let orch =
2108            MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
2109
2110        let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2111
2112        // Now post-snapshot events can be buffered via the source
2113        // handler. Pre-fix this returned `Ok(false)` because
2114        // `is_migrating(origin)` was false.
2115        let event = CausalEvent {
2116            link: CausalLink {
2117                origin_hash: origin,
2118                horizon_encoded: 0,
2119                sequence: 1,
2120                parent_hash: 0,
2121            },
2122            payload: Bytes::from_static(b"post-snapshot event"),
2123            received_at: 0,
2124        };
2125        let buffered = source_handler.buffer_event(origin, event).unwrap();
2126        assert!(
2127            buffered,
2128            "fix must enable source-handler buffering for \
2129             local-source migrations — pre-fix `buffer_event` returned \
2130             `Ok(false)` because the migration was never registered",
2131        );
2132
2133        let drained = source_handler.take_buffered_events(origin).unwrap();
2134        assert_eq!(
2135            drained.len(),
2136            1,
2137            "buffered event must be drainable through the source handler",
2138        );
2139    }
2140
2141    #[test]
2142    fn test_start_migration_remote_source() {
2143        let (reg, origin) = setup_registry();
2144        let orch = MigrationOrchestrator::new(reg, 0x3333);
2145
2146        let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2147        assert_eq!(
2148            msgs.len(),
2149            1,
2150            "remote-source path emits exactly one TakeSnapshot"
2151        );
2152        match &msgs[0] {
2153            MigrationMessage::TakeSnapshot {
2154                daemon_origin,
2155                target_node,
2156            } => {
2157                assert_eq!(*daemon_origin, origin);
2158                assert_eq!(*target_node, 0x2222);
2159            }
2160            other => panic!("expected TakeSnapshot, got {:?}", other),
2161        }
2162
2163        assert_eq!(orch.status(origin), Some(MigrationPhase::Snapshot));
2164    }
2165
2166    #[test]
2167    fn test_duplicate_migration_rejected() {
2168        let (reg, origin) = setup_registry();
2169        let orch = MigrationOrchestrator::new(reg, 0x1111);
2170
2171        orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2172        let err = orch.start_migration(origin, 0x1111, 0x3333).unwrap_err();
2173        assert_eq!(err, MigrationError::AlreadyMigrating(origin));
2174    }
2175
2176    /// Regression: `start_migration_auto` returns
2177    /// `MigrationError::NoTargetAvailable` (not
2178    /// `TargetUnavailable(0)`) when the scheduler finds no
2179    /// candidate satisfying the daemon's capability filter.
2180    /// Pre-fix the auto path constructed
2181    /// `TargetUnavailable(0)`, surfacing "target node 0x0
2182    /// unavailable" to operators — confusing because no
2183    /// specific node id was ever attempted; the auto-placement
2184    /// found nobody to attempt against.
2185    #[test]
2186    fn start_migration_auto_returns_no_target_available_when_scheduler_finds_nothing() {
2187        use crate::adapter::net::behavior::capability::CapabilitySet;
2188        use crate::adapter::net::behavior::fold::{CapabilityFold, Fold};
2189
2190        let (reg, origin) = setup_registry();
2191        let orch = MigrationOrchestrator::new(reg, 0x1111);
2192
2193        // Empty fold — no candidate nodes anywhere.
2194        let fold: Arc<Fold<CapabilityFold>> =
2195            Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
2196        let scheduler = super::super::Scheduler::new(fold, 0x1111, CapabilitySet::default());
2197
2198        // A filter that nothing in the empty index can satisfy.
2199        let filter = CapabilityFilter::default();
2200
2201        let err = orch
2202            .start_migration_auto(origin, 0x1111, &scheduler, &filter)
2203            .unwrap_err();
2204        assert_eq!(
2205            err,
2206            MigrationError::NoTargetAvailable,
2207            "auto-placement with no candidates must surface as \
2208             NoTargetAvailable, not TargetUnavailable(0). The 0 \
2209             was a fake node id that pre-fix appeared in operator \
2210             error logs as `target node 0x0 unavailable`."
2211        );
2212    }
2213
2214    #[test]
2215    fn test_abort_migration() {
2216        let (reg, origin) = setup_registry();
2217        let orch = MigrationOrchestrator::new(reg, 0x1111);
2218
2219        orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2220        assert!(orch.is_migrating(origin));
2221
2222        let msg = orch.abort_migration(origin, "test abort".into()).unwrap();
2223        match msg {
2224            MigrationMessage::MigrationFailed { reason, .. } => {
2225                // `abort_migration` wraps its string in `StateFailed`.
2226                match reason {
2227                    MigrationFailureReason::StateFailed(msg) => {
2228                        assert_eq!(msg, "test abort")
2229                    }
2230                    other => panic!("expected StateFailed, got {other:?}"),
2231                }
2232            }
2233            _ => panic!("expected MigrationFailed"),
2234        }
2235
2236        assert!(!orch.is_migrating(origin));
2237    }
2238
2239    /// Regression: pre-fix `abort_migration_with_reason` only
2240    /// removed the orchestrator's record. The matching
2241    /// `MigrationSourceHandler` entry stayed put, so
2242    /// `is_migrating()` on the source remained `true`,
2243    /// `buffer_event` kept appending to a never-drained vector,
2244    /// and a retry trip-tested `AlreadyMigrating` against a
2245    /// migration the orchestrator believed was aborted. The fix
2246    /// calls `source.abort` from the orchestrator's abort path.
2247    #[test]
2248    fn abort_migration_propagates_to_source_handler() {
2249        use crate::adapter::net::compute::migration_source::MigrationSourceHandler;
2250        let (reg, origin) = setup_registry();
2251        let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
2252        let orch = MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source.clone());
2253
2254        // Stand up the migration end-to-end via the orchestrator's
2255        // `start_migration`, which records on BOTH sides (the
2256        // orchestrator's record AND the source handler's mirror).
2257        // Pre-fix the orchestrator's `abort_migration_with_reason`
2258        // only cleared its own record, leaving the source mirror
2259        // intact.
2260        orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2261        assert!(
2262            orch.is_migrating(origin),
2263            "orchestrator records the migration"
2264        );
2265        assert!(
2266            source.is_migrating(origin),
2267            "source handler also records the migration via the orchestrator",
2268        );
2269
2270        // Abort. Both sides must clear.
2271        orch.abort_migration(origin, "test abort".into()).unwrap();
2272        assert!(
2273            !orch.is_migrating(origin),
2274            "orchestrator must clear its record"
2275        );
2276        assert!(
2277            !source.is_migrating(origin),
2278            "source handler must also clear its mirror entry on abort",
2279        );
2280
2281        // The decisive sealed property: a fresh
2282        // `start_migration` for the same daemon now succeeds.
2283        // Pre-fix this would `AlreadyMigrating` because the source
2284        // handler still tracked the daemon.
2285        orch.start_migration(origin, 0x1111, 0x3333).unwrap();
2286    }
2287
2288    /// After deleting `MigrationOrchestrator::buffer_event` (the
2289    /// dead surface that no production caller ever used),
2290    /// `on_restore_complete` ships a `MigrationMessage` that
2291    /// always carries no buffered events. The source-side
2292    /// `MigrationSourceHandler::on_cutover` is the only path
2293    /// that drains real buffered events; the orchestrator
2294    /// surface is dead and removed.
2295    #[test]
2296    fn on_restore_complete_ships_no_buffered_events() {
2297        let (reg, origin) = setup_registry();
2298        let orch = MigrationOrchestrator::new(reg, 0x3333);
2299
2300        orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2301        // Drive Snapshot → Transfer → Restore by force-phase so
2302        // we don't need a real source-side handshake.
2303        {
2304            let entry = orch.migrations.get(&origin).unwrap();
2305            let mut record = entry.lock();
2306            record.state.force_phase(MigrationPhase::Restore);
2307        }
2308
2309        let result = orch.on_restore_complete(origin, 0).unwrap();
2310        assert!(
2311            result.is_none(),
2312            "on_restore_complete must never emit BufferedEvents — \
2313             that buffer is exclusively a source-handler surface"
2314        );
2315    }
2316
2317    #[test]
2318    fn test_wire_roundtrip_take_snapshot() {
2319        let msg = MigrationMessage::TakeSnapshot {
2320            daemon_origin: 0xAAAA,
2321            target_node: 0x2222,
2322        };
2323        let encoded = wire::encode(&msg).unwrap();
2324        let decoded = wire::decode(&encoded).unwrap();
2325        match decoded {
2326            MigrationMessage::TakeSnapshot {
2327                daemon_origin,
2328                target_node,
2329            } => {
2330                assert_eq!(daemon_origin, 0xAAAA);
2331                assert_eq!(target_node, 0x2222);
2332            }
2333            _ => panic!("expected TakeSnapshot"),
2334        }
2335    }
2336
2337    #[test]
2338    fn test_wire_roundtrip_snapshot_ready() {
2339        let msg = MigrationMessage::SnapshotReady {
2340            daemon_origin: 0xBBBB,
2341            snapshot_bytes: vec![1, 2, 3, 4, 5],
2342            seq_through: 42,
2343            chunk_index: 0,
2344            total_chunks: 1,
2345        };
2346        let encoded = wire::encode(&msg).unwrap();
2347        let decoded = wire::decode(&encoded).unwrap();
2348        match decoded {
2349            MigrationMessage::SnapshotReady {
2350                daemon_origin,
2351                snapshot_bytes,
2352                seq_through,
2353                chunk_index,
2354                total_chunks,
2355            } => {
2356                assert_eq!(daemon_origin, 0xBBBB);
2357                assert_eq!(snapshot_bytes, vec![1, 2, 3, 4, 5]);
2358                assert_eq!(seq_through, 42);
2359                assert_eq!(chunk_index, 0);
2360                assert_eq!(total_chunks, 1);
2361            }
2362            _ => panic!("expected SnapshotReady"),
2363        }
2364    }
2365
2366    #[test]
2367    fn test_chunk_snapshot_small() {
2368        let chunks = chunk_snapshot(0xAAAA, vec![1, 2, 3], 10).unwrap();
2369        assert_eq!(chunks.len(), 1);
2370        match &chunks[0] {
2371            MigrationMessage::SnapshotReady {
2372                chunk_index,
2373                total_chunks,
2374                snapshot_bytes,
2375                ..
2376            } => {
2377                assert_eq!(*chunk_index, 0);
2378                assert_eq!(*total_chunks, 1);
2379                assert_eq!(snapshot_bytes, &[1, 2, 3]);
2380            }
2381            _ => panic!("expected SnapshotReady"),
2382        }
2383    }
2384
2385    #[test]
2386    fn test_chunk_snapshot_large() {
2387        // Create a snapshot larger than MAX_SNAPSHOT_CHUNK_SIZE
2388        let big = vec![0xABu8; MAX_SNAPSHOT_CHUNK_SIZE * 3 + 100];
2389        let total_len = big.len();
2390        let chunks = chunk_snapshot(0xBBBB, big, 42).unwrap();
2391
2392        assert_eq!(chunks.len(), 4); // 3 full + 1 partial
2393
2394        // Verify chunk metadata
2395        for (i, chunk) in chunks.iter().enumerate() {
2396            match chunk {
2397                MigrationMessage::SnapshotReady {
2398                    chunk_index,
2399                    total_chunks,
2400                    daemon_origin,
2401                    seq_through,
2402                    ..
2403                } => {
2404                    assert_eq!(*chunk_index, i as u32);
2405                    assert_eq!(*total_chunks, 4);
2406                    assert_eq!(*daemon_origin, 0xBBBB);
2407                    assert_eq!(*seq_through, 42);
2408                }
2409                _ => panic!("expected SnapshotReady"),
2410            }
2411        }
2412
2413        // Verify reassembly
2414        let mut reassembler = SnapshotReassembler::new();
2415        for chunk in chunks {
2416            if let MigrationMessage::SnapshotReady {
2417                daemon_origin,
2418                snapshot_bytes,
2419                seq_through,
2420                chunk_index,
2421                total_chunks,
2422            } = chunk
2423            {
2424                let result = reassembler
2425                    .feed(
2426                        daemon_origin,
2427                        snapshot_bytes,
2428                        seq_through,
2429                        chunk_index,
2430                        total_chunks,
2431                    )
2432                    .expect("legitimate chunks must not be rejected");
2433                if chunk_index < total_chunks - 1 {
2434                    assert!(result.is_none());
2435                } else {
2436                    let full = result.expect("last chunk should complete reassembly");
2437                    assert_eq!(full.len(), total_len);
2438                    assert!(full.iter().all(|&b| b == 0xAB));
2439                }
2440            }
2441        }
2442    }
2443
2444    #[test]
2445    fn test_reassembler_cancel() {
2446        let mut reassembler = SnapshotReassembler::new();
2447        reassembler.feed(0xAAAA, vec![1, 2], 10, 0, 3).unwrap();
2448        assert_eq!(reassembler.pending_count(), 1);
2449        reassembler.cancel(0xAAAA);
2450        assert_eq!(reassembler.pending_count(), 0);
2451    }
2452
2453    // ---- Regression tests: SnapshotReassembler DoS / forgery holes ----
2454
2455    #[test]
2456    fn test_regression_reassembler_rejects_chunk_index_out_of_range() {
2457        // Regression: feed() never checked that `chunk_index < total_chunks`,
2458        // so an attacker could declare total_chunks=3 and feed indices
2459        // {0, 5, 7}. The BTreeMap happily stored them, `chunks.len() == 3 ==
2460        // total_chunks` fired "complete", and the reassembler concatenated
2461        // three non-contiguous chunks as if they were chunks 0,1,2 —
2462        // silently forging a snapshot from attacker-chosen partial content.
2463        //
2464        // Fix: feed() rejects any chunk with `chunk_index >= total_chunks`
2465        // before touching state.
2466        let mut reassembler = SnapshotReassembler::new();
2467
2468        let r0 = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3);
2469        assert!(r0.is_ok(), "in-range chunk must be accepted: {:?}", r0);
2470
2471        let forged = reassembler.feed(0xAAAA, vec![2; 10], 1, 5, 3);
2472        assert!(
2473            matches!(
2474                forged,
2475                Err(ReassemblyError::ChunkIndexOutOfRange {
2476                    chunk_index: 5,
2477                    total_chunks: 3,
2478                })
2479            ),
2480            "chunk_index=5 with total_chunks=3 must be rejected, got {:?}",
2481            forged
2482        );
2483
2484        // The reassembly must not have "completed" from the forged chunk —
2485        // still waiting for real chunks 1 and 2.
2486        assert_eq!(
2487            reassembler.pending_count(),
2488            1,
2489            "state must stay in-flight after rejected chunk"
2490        );
2491    }
2492
2493    #[test]
2494    fn test_regression_reassembler_rejects_zero_total_chunks() {
2495        // Regression: total_chunks == 0 created a ReassemblyState that
2496        // could never complete (len check 0 == 0 never true after the
2497        // first insert), leaking memory. Fix: reject at the entry point.
2498        let mut reassembler = SnapshotReassembler::new();
2499        let result = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 0);
2500        assert!(matches!(result, Err(ReassemblyError::ZeroTotalChunks)));
2501        assert_eq!(reassembler.pending_count(), 0);
2502    }
2503
2504    #[test]
2505    fn test_regression_reassembler_caps_total_chunks() {
2506        // Regression: an attacker could declare total_chunks = u32::MAX
2507        // and flood the BTreeMap with up to ~4B insertions before any
2508        // completion check would fire. Fix: cap total_chunks at
2509        // MAX_TOTAL_CHUNKS (well above any legitimate snapshot).
2510        let mut reassembler = SnapshotReassembler::new();
2511        let result = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, u32::MAX);
2512        assert!(matches!(
2513            result,
2514            Err(ReassemblyError::TotalChunksTooLarge {
2515                total_chunks: u32::MAX
2516            })
2517        ));
2518        assert_eq!(reassembler.pending_count(), 0);
2519    }
2520
2521    #[test]
2522    fn test_regression_reassembler_rejects_oversized_chunk() {
2523        // Defense in depth: even if the transport framing lets a larger
2524        // payload through, the reassembler refuses a single chunk bigger
2525        // than MAX_SNAPSHOT_CHUNK_SIZE.
2526        let mut reassembler = SnapshotReassembler::new();
2527        let oversized = vec![0u8; MAX_SNAPSHOT_CHUNK_SIZE + 1];
2528        let result = reassembler.feed(0xAAAA, oversized, 1, 0, 3);
2529        assert!(
2530            matches!(result, Err(ReassemblyError::ChunkTooLarge { .. })),
2531            "got {:?}",
2532            result
2533        );
2534    }
2535
2536    #[test]
2537    fn test_regression_reassembler_rejects_total_chunks_mismatch() {
2538        // Regression: an attacker who opened a reassembly with
2539        // total_chunks=3 could send a later chunk declaring total_chunks=100
2540        // and the code would just keep inserting. Fix: the first chunk's
2541        // total_chunks is locked in; later chunks must agree.
2542        let mut reassembler = SnapshotReassembler::new();
2543        reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
2544        let result = reassembler.feed(0xAAAA, vec![2; 10], 1, 1, 100);
2545        assert!(
2546            matches!(
2547                result,
2548                Err(ReassemblyError::TotalChunksMismatch {
2549                    got: 100,
2550                    expected: 3,
2551                })
2552            ),
2553            "got {:?}",
2554            result
2555        );
2556        assert_eq!(reassembler.pending_count(), 1);
2557    }
2558
2559    /// Zero-byte chunks are rejected at the boundary. Pre-fix
2560    /// `MAX_TOTAL_CHUNKS = 700_000` zero-byte chunks could be
2561    /// admitted per reassembly without the byte-budget cap firing —
2562    /// nonsensical for legitimate snapshots and a cheap way to
2563    /// inflate BTreeMap bookkeeping.
2564    #[test]
2565    fn reassembler_refuses_zero_byte_chunk() {
2566        let mut reassembler = SnapshotReassembler::new();
2567        let result = reassembler.feed(0xAAAA, vec![], 1, 0, 3);
2568        assert!(
2569            matches!(result, Err(ReassemblyError::ChunkTooLarge { len: 0 })),
2570            "got {:?}",
2571            result
2572        );
2573        assert_eq!(reassembler.pending_count(), 0);
2574    }
2575
2576    /// The total_chunks==1 fast path must not bypass the
2577    /// total_chunks-mismatch guard. A peer that opened reassembly
2578    /// with chunk 0/3 for `(daemon, seq)` could otherwise follow up
2579    /// with chunk 0/1 for the same key and have the second payload
2580    /// accepted as a complete snapshot — substituting its content
2581    /// for the in-flight multi-chunk one. The fast path now consults
2582    /// `pending` first and surfaces TotalChunksMismatch when the
2583    /// declared total disagrees with the in-flight state.
2584    #[test]
2585    fn fast_path_rejects_single_chunk_after_multi_chunk_state() {
2586        let mut reassembler = SnapshotReassembler::new();
2587        // Open with chunk 0/3.
2588        reassembler.feed(0xAAAA, vec![1; 10], 7, 0, 3).unwrap();
2589        // Attacker follows up declaring total_chunks=1 for same key.
2590        let result = reassembler.feed(0xAAAA, vec![2; 10], 7, 0, 1);
2591        assert!(
2592            matches!(
2593                result,
2594                Err(ReassemblyError::TotalChunksMismatch {
2595                    got: 1,
2596                    expected: 3,
2597                })
2598            ),
2599            "fast path must refuse substitution; got {:?}",
2600            result
2601        );
2602        // The original in-flight reassembly must still exist; the
2603        // attempted substitution must not have evicted it.
2604        assert_eq!(reassembler.pending_count(), 1);
2605    }
2606
2607    #[test]
2608    fn test_regression_reassembler_evicts_older_seq_per_daemon() {
2609        // Regression: `pending` was keyed by (daemon_origin, seq_through)
2610        // and a fresh seq_through did NOT evict older pending reassemblies
2611        // for the same daemon. A peer could open unbounded in-flight
2612        // entries by incrementing seq_through forever.
2613        //
2614        // Fix: at most one in-flight reassembly per daemon. A newer
2615        // seq_through evicts older ones; older seq_through values are
2616        // rejected as stale.
2617        let mut reassembler = SnapshotReassembler::new();
2618
2619        reassembler.feed(0xAAAA, vec![1; 10], 10, 0, 3).unwrap();
2620        reassembler.feed(0xAAAA, vec![1; 10], 11, 0, 3).unwrap();
2621        reassembler.feed(0xAAAA, vec![1; 10], 12, 0, 3).unwrap();
2622
2623        assert_eq!(
2624            reassembler.pending_count(),
2625            1,
2626            "only the newest seq_through for a daemon should remain in flight"
2627        );
2628
2629        // A stale seq_through is rejected — not silently dropped on the floor.
2630        let stale = reassembler.feed(0xAAAA, vec![1; 10], 5, 0, 3);
2631        assert!(
2632            matches!(
2633                stale,
2634                Err(ReassemblyError::StaleSeqThrough { got: 5, latest: 12 })
2635            ),
2636            "stale seq_through must be rejected, got {:?}",
2637            stale
2638        );
2639        assert_eq!(reassembler.pending_count(), 1);
2640    }
2641
2642    /// Regression: a peer-driven reassembly that declares a large
2643    /// `total_chunks` and ships chunks just up to the per-entry
2644    /// byte cap is refused, rather than silently parking memory
2645    /// indefinitely. Pre-fix `MAX_TOTAL_CHUNKS × MAX_SNAPSHOT_CHUNK_SIZE`
2646    /// could buffer ~4.3 GiB per `(origin, seq)` key forever
2647    /// because the eviction at `seq_through > latest` doesn't fire
2648    /// when an attacker re-uses the same `seq_through`.
2649    #[test]
2650    fn reassembler_refuses_chunk_that_overflows_pending_byte_cap() {
2651        let mut reassembler = SnapshotReassembler::new();
2652
2653        // Pre-fill to just under the cap. Each chunk is the max
2654        // legal chunk size; we send unique indices so no chunk is
2655        // displaced.
2656        let chunk_full = vec![0xCCu8; MAX_SNAPSHOT_CHUNK_SIZE];
2657        let chunks_to_fill = MAX_PENDING_REASSEMBLY_BYTES / MAX_SNAPSHOT_CHUNK_SIZE;
2658        // Choose a `total_chunks` that fits the prefill + at least
2659        // two more — so the entry is still incomplete after prefill
2660        // and the next chunk lands in the same key.
2661        let total_chunks = (chunks_to_fill as u32) + 2;
2662        for i in 0..(chunks_to_fill as u32) {
2663            reassembler
2664                .feed(0xAAAA, chunk_full.clone(), 1, i, total_chunks)
2665                .unwrap();
2666        }
2667
2668        // The next chunk would push buffered past the cap. It must
2669        // be refused with `TooManyPendingBytes`, not silently
2670        // accepted.
2671        let next_idx = chunks_to_fill as u32;
2672        let result = reassembler.feed(0xAAAA, chunk_full.clone(), 1, next_idx, total_chunks);
2673        assert!(
2674            matches!(result, Err(ReassemblyError::TooManyPendingBytes { .. })),
2675            "chunk that would overflow the per-entry cap must be refused, got {:?}",
2676            result,
2677        );
2678
2679        // Re-sending an index that is ALREADY buffered must succeed
2680        // (the displaced chunk's bytes are subtracted before the cap
2681        // re-check). Pin this so the cap doesn't break legitimate
2682        // duplicate-chunk delivery.
2683        let resend = reassembler.feed(0xAAAA, chunk_full.clone(), 1, 0, total_chunks);
2684        assert!(
2685            resend.is_ok(),
2686            "re-sending an already-buffered chunk index must succeed, got {:?}",
2687            resend
2688        );
2689    }
2690
2691    /// Regression: an entry parked at the per-entry byte cap could
2692    /// stay in `pending` forever because the `seq_through > latest`
2693    /// eviction never fires while a hostile peer re-uses the same
2694    /// `seq_through`. The age sweep is the second line of defense:
2695    /// any entry whose last-progress is older than `max_age` is
2696    /// dropped on the next `sweep_stale` call.
2697    #[test]
2698    fn reassembler_sweep_stale_drops_quiet_entries() {
2699        let mut reassembler = SnapshotReassembler::new();
2700        reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
2701        assert_eq!(reassembler.pending_count(), 1);
2702
2703        // Wait until the entry's last_progress_at is older than the
2704        // sweep age, then sweep.
2705        std::thread::sleep(Duration::from_millis(20));
2706        let evicted = reassembler.sweep_stale(Duration::from_millis(10));
2707        assert_eq!(evicted, 1, "stale entry must be evicted");
2708        assert_eq!(reassembler.pending_count(), 0);
2709    }
2710
2711    /// Pin the slow-but-progressing legitimate peer case: every
2712    /// chunk that lands resets `last_progress_at`, so the sweep
2713    /// only kills entries that have actually gone quiet — not ones
2714    /// that are simply slow to receive every chunk.
2715    #[test]
2716    fn reassembler_sweep_keeps_progressing_entries() {
2717        let mut reassembler = SnapshotReassembler::new();
2718        reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
2719        std::thread::sleep(Duration::from_millis(20));
2720
2721        // A second chunk lands — last_progress_at refreshes.
2722        reassembler.feed(0xAAAA, vec![1; 10], 1, 1, 3).unwrap();
2723
2724        // Sweep with an age that would have killed the entry from
2725        // its original creation, but is generous relative to the
2726        // fresh chunk's timestamp.
2727        let evicted = reassembler.sweep_stale(Duration::from_millis(15));
2728        assert_eq!(
2729            evicted, 0,
2730            "entry that received a chunk within max_age must survive"
2731        );
2732        assert_eq!(reassembler.pending_count(), 1);
2733    }
2734
2735    /// Pin the cross-daemon healing path: even if a particular
2736    /// daemon's hostile entry never sees another chunk, the
2737    /// opportunistic sweep at the head of every `feed` call drops
2738    /// it on the next traffic from ANY daemon.
2739    #[test]
2740    fn reassembler_opportunistic_sweep_in_feed_drops_quiet_entries() {
2741        // Use a tiny opportunistic-sweep age so the in-`feed`
2742        // sweep fires within test timescales.
2743        let mut reassembler = SnapshotReassembler::with_max_pending_age(Duration::from_millis(10));
2744
2745        // Hostile daemon parks an entry under (origin=0xBAD, seq=1).
2746        reassembler.feed(0xBAD, vec![0xFF; 10], 1, 0, 3).unwrap();
2747        assert_eq!(reassembler.pending_count(), 1);
2748
2749        // Time passes; hostile peer never sends anything else.
2750        std::thread::sleep(Duration::from_millis(25));
2751
2752        // A completely unrelated daemon's chunk arrives. The
2753        // opportunistic sweep at the head of `feed` must drop the
2754        // hostile entry as a side effect — not just the
2755        // explicit-`sweep_stale` driver.
2756        reassembler.feed(0xC0DE, vec![1; 10], 5, 0, 3).unwrap();
2757
2758        // Only the new daemon's entry remains; the hostile
2759        // 0xBAD entry was swept.
2760        assert_eq!(reassembler.pending_count(), 1);
2761    }
2762
2763    /// Sweeping a stale buffer must not amnesia the daemon's
2764    /// `latest_seq`: a peer that comes back later trying to
2765    /// re-open the same `seq_through` is still rejected as stale,
2766    /// so the sweep can't be turned into a snapshot-replacement
2767    /// amplifier.
2768    #[test]
2769    fn reassembler_sweep_stale_preserves_latest_seq() {
2770        let mut reassembler = SnapshotReassembler::new();
2771        reassembler.feed(0xAAAA, vec![1; 10], 100, 0, 3).unwrap();
2772
2773        std::thread::sleep(Duration::from_millis(20));
2774        let evicted = reassembler.sweep_stale(Duration::from_millis(10));
2775        assert_eq!(evicted, 1);
2776
2777        // Old seq_through must still be rejected as stale even
2778        // though the in-flight buffer was dropped.
2779        let stale = reassembler.feed(0xAAAA, vec![1; 10], 50, 0, 3);
2780        assert!(
2781            matches!(
2782                stale,
2783                Err(ReassemblyError::StaleSeqThrough {
2784                    got: 50,
2785                    latest: 100,
2786                })
2787            ),
2788            "post-sweep replay of an older seq_through must still be rejected, got {:?}",
2789            stale,
2790        );
2791    }
2792
2793    /// Pin the at-cap + quiet attack: a peer fills an entry to
2794    /// just under `MAX_PENDING_REASSEMBLY_BYTES` and goes silent.
2795    /// Pre-fix the per-entry byte cap blocked further amplification
2796    /// but the parked bytes stayed forever; the sweep closes that
2797    /// hole.
2798    #[test]
2799    fn reassembler_sweep_releases_buffer_parked_at_byte_cap() {
2800        let mut reassembler = SnapshotReassembler::new();
2801        let chunk_full = vec![0xCCu8; MAX_SNAPSHOT_CHUNK_SIZE];
2802        let chunks_to_fill = MAX_PENDING_REASSEMBLY_BYTES / MAX_SNAPSHOT_CHUNK_SIZE;
2803        let total_chunks = (chunks_to_fill as u32) + 2;
2804        for i in 0..(chunks_to_fill as u32) {
2805            reassembler
2806                .feed(0xAAAA, chunk_full.clone(), 1, i, total_chunks)
2807                .unwrap();
2808        }
2809        assert_eq!(reassembler.pending_count(), 1);
2810
2811        // Peer goes silent. Pre-fix the entry would stay parked
2812        // at ~MAX_PENDING_REASSEMBLY_BYTES indefinitely.
2813        std::thread::sleep(Duration::from_millis(20));
2814        let evicted = reassembler.sweep_stale(Duration::from_millis(10));
2815        assert_eq!(evicted, 1, "parked-at-cap entry must be released by sweep");
2816        assert_eq!(reassembler.pending_count(), 0);
2817    }
2818
2819    #[test]
2820    fn test_regression_reassembler_distinct_daemons_coexist() {
2821        // Eviction is per-daemon, not global — parallel migrations of
2822        // different daemons must be able to share the reassembler.
2823        let mut reassembler = SnapshotReassembler::new();
2824        reassembler.feed(0x1111, vec![1; 10], 1, 0, 3).unwrap();
2825        reassembler.feed(0x2222, vec![2; 10], 7, 0, 3).unwrap();
2826        reassembler.feed(0x3333, vec![3; 10], 9, 0, 3).unwrap();
2827        assert_eq!(reassembler.pending_count(), 3);
2828    }
2829
2830    #[test]
2831    fn test_regression_wire_decode_rejects_zero_total_chunks() {
2832        // Regression: the wire decoder accepted any u32 for total_chunks
2833        // and chunk_index, including nonsense like total_chunks=0. A
2834        // defensive validation at the wire boundary stops malformed
2835        // messages from ever reaching the reassembler.
2836        use bytes::BufMut;
2837        let mut buf = Vec::new();
2838        buf.put_u8(wire::MSG_SNAPSHOT_READY);
2839        buf.put_u64_le(0xAAAA); // daemon_origin
2840        buf.put_u64_le(1); // seq_through
2841        buf.put_u32_le(0); // chunk_index
2842        buf.put_u32_le(0); // total_chunks — invalid
2843        buf.put_u32_le(0); // len
2844        let err = wire::decode(&buf).expect_err("total_chunks=0 must be rejected");
2845        let err_msg = format!("{}", err);
2846        assert!(
2847            err_msg.contains("total_chunks"),
2848            "error must mention total_chunks, got {:?}",
2849            err_msg
2850        );
2851    }
2852
2853    #[test]
2854    fn test_regression_wire_decode_rejects_chunk_index_out_of_range() {
2855        use bytes::BufMut;
2856        let mut buf = Vec::new();
2857        buf.put_u8(wire::MSG_SNAPSHOT_READY);
2858        buf.put_u64_le(0xAAAA);
2859        buf.put_u64_le(1);
2860        buf.put_u32_le(5); // chunk_index
2861        buf.put_u32_le(3); // total_chunks — index out of range
2862        buf.put_u32_le(0);
2863        let err = wire::decode(&buf).expect_err("chunk_index >= total_chunks must be rejected");
2864        let err_msg = format!("{}", err);
2865        assert!(
2866            err_msg.contains("chunk_index"),
2867            "error must mention chunk_index, got {:?}",
2868            err_msg
2869        );
2870    }
2871
2872    #[test]
2873    fn test_regression_wire_decode_rejects_total_chunks_overflow() {
2874        use bytes::BufMut;
2875        let mut buf = Vec::new();
2876        buf.put_u8(wire::MSG_SNAPSHOT_READY);
2877        buf.put_u64_le(0xAAAA);
2878        buf.put_u64_le(1);
2879        buf.put_u32_le(0);
2880        buf.put_u32_le(u32::MAX); // total_chunks — exceeds MAX_TOTAL_CHUNKS
2881        buf.put_u32_le(0);
2882        let err = wire::decode(&buf).expect_err("total_chunks > MAX_TOTAL_CHUNKS must be rejected");
2883        let err_msg = format!("{}", err);
2884        assert!(
2885            err_msg.contains("MAX_TOTAL_CHUNKS"),
2886            "error must mention MAX_TOTAL_CHUNKS, got {:?}",
2887            err_msg
2888        );
2889    }
2890
2891    #[test]
2892    fn test_regression_reassembler_end_to_end_forged_chunk_cannot_complete() {
2893        // Integration: simulate an attacker who learns total_chunks=4 from a
2894        // legitimate first chunk and then tries to race ahead with forged
2895        // content at indices beyond the range. Even if indices {0,5,7} each
2896        // carry attacker-chosen bytes, the reassembler must never "complete"
2897        // a snapshot without receiving every real index in 0..total_chunks.
2898        let mut reassembler = SnapshotReassembler::new();
2899
2900        // Real chunk 0 — opens the reassembly at total_chunks=4.
2901        let r0 = reassembler.feed(0xDEAD, vec![0xA0; 10], 1, 0, 4).unwrap();
2902        assert!(r0.is_none());
2903
2904        // Forged out-of-range chunks — all rejected, none completes.
2905        for bad_idx in [4, 5, 7, 999] {
2906            let r = reassembler.feed(0xDEAD, vec![0xFF; 10], 1, bad_idx, 4);
2907            assert!(
2908                matches!(r, Err(ReassemblyError::ChunkIndexOutOfRange { .. })),
2909                "index {} must be rejected, got {:?}",
2910                bad_idx,
2911                r
2912            );
2913        }
2914
2915        // A snapshot-like "complete" signal can only come from filling
2916        // real indices 1, 2, 3.
2917        assert!(reassembler
2918            .feed(0xDEAD, vec![0xA1; 10], 1, 1, 4)
2919            .unwrap()
2920            .is_none());
2921        assert!(reassembler
2922            .feed(0xDEAD, vec![0xA2; 10], 1, 2, 4)
2923            .unwrap()
2924            .is_none());
2925        let full = reassembler
2926            .feed(0xDEAD, vec![0xA3; 10], 1, 3, 4)
2927            .unwrap()
2928            .expect("all four real chunks received — reassembly must complete");
2929        // Concatenation order is by chunk_index ascending, so the payload
2930        // is exactly the legitimate chunks 0,1,2,3 — not a forgery.
2931        assert_eq!(full.len(), 40);
2932        assert!(full[..10].iter().all(|&b| b == 0xA0));
2933        assert!(full[10..20].iter().all(|&b| b == 0xA1));
2934        assert!(full[20..30].iter().all(|&b| b == 0xA2));
2935        assert!(full[30..].iter().all(|&b| b == 0xA3));
2936    }
2937
2938    #[test]
2939    fn test_wire_roundtrip_chunked_snapshot() {
2940        let msg = MigrationMessage::SnapshotReady {
2941            daemon_origin: 0xCCCC,
2942            snapshot_bytes: vec![42; 100],
2943            seq_through: 99,
2944            chunk_index: 2,
2945            total_chunks: 5,
2946        };
2947        let encoded = wire::encode(&msg).unwrap();
2948        let decoded = wire::decode(&encoded).unwrap();
2949        match decoded {
2950            MigrationMessage::SnapshotReady {
2951                chunk_index,
2952                total_chunks,
2953                ..
2954            } => {
2955                assert_eq!(chunk_index, 2);
2956                assert_eq!(total_chunks, 5);
2957            }
2958            _ => panic!("expected SnapshotReady"),
2959        }
2960    }
2961
2962    #[test]
2963    fn test_wire_roundtrip_failed() {
2964        // Round-trip every variant of MigrationFailureReason to
2965        // pin the wire-layout contract. A future bump that drops
2966        // or adds a variant without updating the match will trip
2967        // the exhaustive match below.
2968        for reason in [
2969            MigrationFailureReason::NotReady,
2970            MigrationFailureReason::FactoryNotFound,
2971            MigrationFailureReason::ComputeNotSupported,
2972            MigrationFailureReason::StateFailed("something broke".into()),
2973            MigrationFailureReason::AlreadyMigrating,
2974            MigrationFailureReason::IdentityTransportFailed("seal failed".into()),
2975            MigrationFailureReason::NotReadyTimeout { attempts: 5 },
2976        ] {
2977            let msg = MigrationMessage::MigrationFailed {
2978                daemon_origin: 0xCCCC,
2979                reason: reason.clone(),
2980            };
2981            let encoded = wire::encode(&msg).unwrap();
2982            let decoded = wire::decode(&encoded).unwrap();
2983            match decoded {
2984                MigrationMessage::MigrationFailed {
2985                    daemon_origin,
2986                    reason: r,
2987                } => {
2988                    assert_eq!(daemon_origin, 0xCCCC);
2989                    assert_eq!(r, reason);
2990                }
2991                _ => panic!("expected MigrationFailed"),
2992            }
2993        }
2994    }
2995
2996    #[test]
2997    fn test_wire_roundtrip_buffered_events() {
2998        let events = vec![
2999            CausalEvent {
3000                link: CausalLink::genesis(0xAAAA, 0),
3001                payload: Bytes::from_static(b"event1"),
3002                received_at: 100,
3003            },
3004            CausalEvent {
3005                link: CausalLink {
3006                    origin_hash: 0xAAAA,
3007                    horizon_encoded: 1,
3008                    sequence: 1,
3009                    parent_hash: 12345,
3010                },
3011                payload: Bytes::from_static(b"event2"),
3012                received_at: 200,
3013            },
3014        ];
3015        let msg = MigrationMessage::BufferedEvents {
3016            daemon_origin: 0xAAAA,
3017            events,
3018        };
3019        let encoded = wire::encode(&msg).unwrap();
3020        let decoded = wire::decode(&encoded).unwrap();
3021        match decoded {
3022            MigrationMessage::BufferedEvents {
3023                daemon_origin,
3024                events,
3025            } => {
3026                assert_eq!(daemon_origin, 0xAAAA);
3027                assert_eq!(events.len(), 2);
3028                assert_eq!(events[0].payload, Bytes::from_static(b"event1"));
3029                assert_eq!(events[0].received_at, 100);
3030                assert_eq!(events[1].link.sequence, 1);
3031                assert_eq!(events[1].link.parent_hash, 12345);
3032                assert_eq!(events[1].payload, Bytes::from_static(b"event2"));
3033                assert_eq!(events[1].received_at, 200);
3034            }
3035            _ => panic!("expected BufferedEvents"),
3036        }
3037    }
3038
3039    #[test]
3040    fn test_wire_encode_rejects_oversized_failure_reason() {
3041        // Regression: `reason.len() as u16` previously truncated silently when
3042        // the reason exceeded u16::MAX, producing a stream the decoder
3043        // misparses. Encoding must now return an error.
3044        let oversized = "x".repeat(u16::MAX as usize + 1);
3045        let msg = MigrationMessage::MigrationFailed {
3046            daemon_origin: 0xDEAD,
3047            reason: MigrationFailureReason::StateFailed(oversized),
3048        };
3049        let result = wire::encode(&msg);
3050        assert!(
3051            matches!(result, Err(MigrationError::StateFailed(_))),
3052            "encode of oversized reason must error, got {:?}",
3053            result
3054        );
3055    }
3056
3057    #[test]
3058    fn test_wire_rejects_unknown_failure_code() {
3059        // Manually-crafted `MSG_FAILED` with code 0xFFFF (unknown
3060        // variant). Decoder must refuse rather than mis-parse.
3061        let mut buf = Vec::new();
3062        buf.put_u8(wire::MSG_FAILED);
3063        buf.put_u64_le(0xBEEF);
3064        buf.put_u16_le(0xFFFF); // unknown code
3065        let err = wire::decode(&buf).expect_err("unknown code must reject");
3066        match err {
3067            MigrationError::StateFailed(msg) => {
3068                assert!(msg.contains("unknown MigrationFailureReason code"));
3069            }
3070            other => panic!("expected StateFailed, got {other:?}"),
3071        }
3072    }
3073
3074    #[test]
3075    fn test_list_migrations() {
3076        let (reg, origin) = setup_registry();
3077        let orch = MigrationOrchestrator::new(reg, 0x1111);
3078
3079        assert!(orch.list_migrations().is_empty());
3080
3081        orch.start_migration(origin, 0x1111, 0x2222).unwrap();
3082
3083        let list = orch.list_migrations();
3084        assert_eq!(list.len(), 1);
3085        assert_eq!(list[0].daemon_origin, origin);
3086        assert_eq!(list[0].source_node, 0x1111);
3087        assert_eq!(list[0].target_node, 0x2222);
3088        assert_eq!(list[0].retries, 0);
3089    }
3090}