net/adapter/net/compute/orchestrator.rs
1//! Migration orchestrator — coordinates all 6 phases of daemon migration.
2//!
3//! The orchestrator runs on the controller node (which may be the source,
4//! target, or a third-party coordinator). It sequences phase transitions,
5//! routes migration messages, and handles failures/timeouts.
6
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use bytes::Buf;
11use dashmap::DashMap;
12use parking_lot::Mutex;
13
14use super::migration::{MigrationError, MigrationFailureReason, MigrationPhase, MigrationState};
15use super::migration_source::MigrationSourceHandler;
16use super::registry::DaemonRegistry;
17use crate::adapter::net::continuity::superposition::SuperpositionState;
18use crate::adapter::net::identity::EntityId;
19use crate::adapter::net::state::causal::{CausalEvent, CausalLink};
20use crate::adapter::net::state::snapshot::{StateSnapshot, SNAPSHOT_VERSION};
21
22/// Snapshot wire-format magic bytes — duplicated locally from
23/// `crate::adapter::net::state::snapshot` (the constant is
24/// module-private there). Mirror of `V1_MAGIC = *b"CDS1"`.
25const SNAPSHOT_WIRE_MAGIC: &[u8; 4] = b"CDS1";
26
27/// Validate a chunk's wire envelope without forcing the
28/// orchestrator to reassemble the full snapshot.
29///
30/// On `chunk_index == 0` (the chunk that carries the snapshot
31/// header — single-chunk OR first piece of a multi-chunk send),
32/// confirm:
33/// - The first 4 bytes match the snapshot wire magic.
34/// - Byte 4 is the expected SNAPSHOT_VERSION.
35/// - Bytes 5..37 carry an `entity_id` whose `origin_hash()` matches
36/// the migration's recorded `daemon_origin`.
37///
38/// On `chunk_index > 0`, the chunk is a raw payload fragment with
39/// no envelope — return Ok and let the target's reassembler verify
40/// the assembled snapshot.
41///
42/// Strictly weaker than `StateSnapshot::from_bytes` for the
43/// single-chunk path (from_bytes parses everything past byte 37
44/// too), but matters for multi-chunk where from_bytes can't run
45/// at the orchestrator. Closes the asymmetry where single-chunk
46/// corruption caught at the orchestrator while multi-chunk
47/// corruption deferred entirely to the target.
48#[expect(
49 clippy::expect_used,
50 reason = "snapshot_bytes.len() >= 37 checked above; slicing [5..37] yields exactly 32 bytes"
51)]
52fn validate_chunk_header(
53 chunk_index: u32,
54 snapshot_bytes: &[u8],
55 expected_daemon_origin: u64,
56) -> Result<(), MigrationError> {
57 if chunk_index != 0 {
58 return Ok(());
59 }
60 // Magic (4) + version (1) + entity_id (32) = 37 bytes minimum
61 // for the validator to do anything useful.
62 if snapshot_bytes.len() < 37 {
63 return Err(MigrationError::StateFailed(format!(
64 "SnapshotReady chunk 0 is {} bytes — too short for snapshot envelope (need >= 37)",
65 snapshot_bytes.len()
66 )));
67 }
68 if &snapshot_bytes[..4] != SNAPSHOT_WIRE_MAGIC {
69 return Err(MigrationError::StateFailed(format!(
70 "SnapshotReady chunk 0 has wrong magic bytes: {:?}",
71 &snapshot_bytes[..4]
72 )));
73 }
74 let version = snapshot_bytes[4];
75 if version != SNAPSHOT_VERSION {
76 return Err(MigrationError::StateFailed(format!(
77 "SnapshotReady chunk 0 carries snapshot version {} (expected {})",
78 version, SNAPSHOT_VERSION
79 )));
80 }
81 let entity_bytes: [u8; 32] = snapshot_bytes[5..37].try_into().expect("range is 32 bytes");
82 let claimed_origin = EntityId::from_bytes(entity_bytes).origin_hash();
83 if claimed_origin != expected_daemon_origin {
84 return Err(MigrationError::StateFailed(format!(
85 "SnapshotReady chunk 0 entity_id origin {:#x} does not match daemon_origin {:#x}",
86 claimed_origin, expected_daemon_origin
87 )));
88 }
89 Ok(())
90}
91
92// ── Migration message protocol ──────────────────────────────────────────────
93
94/// Wire message types for the migration subprotocol (0x0500).
95#[derive(Debug, Clone)]
96pub enum MigrationMessage {
97 /// Phase 0→1: Request snapshot on source.
98 TakeSnapshot {
99 /// Origin hash of daemon to migrate.
100 daemon_origin: u64,
101 /// Destination node ID.
102 target_node: u64,
103 },
104
105 /// Phase 1→2: Snapshot taken, payload included.
106 ///
107 /// Large snapshots are chunked across multiple `SnapshotReady` messages.
108 /// The receiver must reassemble all chunks (0..total_chunks) before
109 /// deserializing the snapshot. Single-chunk snapshots have
110 /// `chunk_index = 0, total_chunks = 1`.
111 SnapshotReady {
112 /// Origin hash of daemon being migrated.
113 daemon_origin: u64,
114 /// Serialized `StateSnapshot` bytes (or chunk thereof).
115 snapshot_bytes: Vec<u8>,
116 /// Sequence number the snapshot covers through.
117 seq_through: u64,
118 /// Zero-based index of this chunk.
119 chunk_index: u32,
120 /// Total number of chunks for this snapshot.
121 total_chunks: u32,
122 },
123
124 /// Phase 2→3: Target restored daemon from snapshot.
125 RestoreComplete {
126 /// Origin hash of daemon being migrated.
127 daemon_origin: u64,
128 /// Sequence number restored through.
129 restored_seq: u64,
130 },
131
132 /// Phase 3→4: Target finished replaying buffered events.
133 ReplayComplete {
134 /// Origin hash of daemon being migrated.
135 daemon_origin: u64,
136 /// Sequence number replayed through.
137 replayed_seq: u64,
138 /// Target's chain head after replay. The orchestrator stamps
139 /// this into its `SuperpositionState` so continuity proofs
140 /// carry the real cryptographic anchor even when the
141 /// orchestrator lives on a third node and has no local
142 /// daemon registry entry to read from.
143 target_head: CausalLink,
144 },
145
146 /// Phase 4: Source stops accepting writes, routing switches.
147 CutoverNotify {
148 /// Origin hash of daemon being migrated.
149 daemon_origin: u64,
150 /// Target node that is now authoritative.
151 target_node: u64,
152 },
153
154 /// Phase 5: Source cleaned up.
155 CleanupComplete {
156 /// Origin hash of daemon whose migration is complete.
157 daemon_origin: u64,
158 },
159
160 /// Any phase: Abort migration.
161 MigrationFailed {
162 /// Origin hash of daemon whose migration failed.
163 daemon_origin: u64,
164 /// Structured reason code — source dispatches on this to
165 /// decide whether the migration is retriable. See
166 /// [`MigrationFailureReason`].
167 reason: MigrationFailureReason,
168 },
169
170 /// Buffered events from source for replay on target.
171 BufferedEvents {
172 /// Origin hash of daemon being migrated.
173 daemon_origin: u64,
174 /// Events to replay, in causal order.
175 events: Vec<CausalEvent>,
176 },
177
178 /// Phase 5→6: Source has cleaned up; target should now go live.
179 ///
180 /// Emitted by the orchestrator once it observes `CleanupComplete`. The
181 /// target calls `MigrationTargetHandler::activate` in response and
182 /// replies with `ActivateAck`.
183 ActivateTarget {
184 /// Origin hash of daemon whose target should activate.
185 daemon_origin: u64,
186 },
187
188 /// Phase 6: Target has activated and is now authoritative.
189 ActivateAck {
190 /// Origin hash of daemon whose migration is complete.
191 daemon_origin: u64,
192 /// Sequence number the target is authoritative through.
193 replayed_seq: u64,
194 },
195}
196
197// ── Wire format ─────────────────────────────────────────────────────────────
198
199/// Wire format encode/decode for migration messages.
200pub mod wire {
201 use super::*;
202 use bytes::{Buf, BufMut};
203
204 /// Wire type: request snapshot on source.
205 pub const MSG_TAKE_SNAPSHOT: u8 = 0;
206 /// Wire type: snapshot taken, payload included.
207 pub const MSG_SNAPSHOT_READY: u8 = 1;
208 /// Wire type: target restored daemon from snapshot.
209 pub const MSG_RESTORE_COMPLETE: u8 = 2;
210 /// Wire type: target finished replaying buffered events.
211 pub const MSG_REPLAY_COMPLETE: u8 = 3;
212 /// Wire type: source stops writes, routing switches.
213 pub const MSG_CUTOVER_NOTIFY: u8 = 4;
214 /// Wire type: source cleaned up.
215 pub const MSG_CLEANUP_COMPLETE: u8 = 5;
216 /// Wire type: migration failed/aborted.
217 pub const MSG_FAILED: u8 = 6;
218 /// Wire type: buffered events for replay.
219 pub const MSG_BUFFERED_EVENTS: u8 = 7;
220 /// Wire type: orchestrator tells target to activate.
221 pub const MSG_ACTIVATE_TARGET: u8 = 8;
222 /// Wire type: target acknowledges activation.
223 pub const MSG_ACTIVATE_ACK: u8 = 9;
224
225 /// Encode a migration message to bytes.
226 ///
227 /// Returns `MigrationError::StateFailed` when a length-prefixed field
228 /// would not fit in its on-wire width. Length prefixes are `u32` for
229 /// payloads and counts and `u16` for the failure reason string; silently
230 /// truncating to fit would corrupt the stream and confuse the decoder.
231 pub fn encode(msg: &MigrationMessage) -> Result<Vec<u8>, MigrationError> {
232 // Helper: convert a usize length to u32 with an error on overflow.
233 fn len_u32(field: &str, n: usize) -> Result<u32, MigrationError> {
234 u32::try_from(n).map_err(|_| {
235 MigrationError::StateFailed(format!("{} length {} exceeds u32::MAX", field, n))
236 })
237 }
238
239 let mut buf = Vec::with_capacity(128);
240
241 match msg {
242 MigrationMessage::TakeSnapshot {
243 daemon_origin,
244 target_node,
245 } => {
246 buf.put_u8(MSG_TAKE_SNAPSHOT);
247 buf.put_u64_le(*daemon_origin);
248 buf.put_u64_le(*target_node);
249 }
250 MigrationMessage::SnapshotReady {
251 daemon_origin,
252 snapshot_bytes,
253 seq_through,
254 chunk_index,
255 total_chunks,
256 } => {
257 let payload_len = len_u32("snapshot_bytes", snapshot_bytes.len())?;
258 buf.put_u8(MSG_SNAPSHOT_READY);
259 buf.put_u64_le(*daemon_origin);
260 buf.put_u64_le(*seq_through);
261 buf.put_u32_le(*chunk_index);
262 buf.put_u32_le(*total_chunks);
263 buf.put_u32_le(payload_len);
264 buf.extend_from_slice(snapshot_bytes);
265 }
266 MigrationMessage::RestoreComplete {
267 daemon_origin,
268 restored_seq,
269 } => {
270 buf.put_u8(MSG_RESTORE_COMPLETE);
271 buf.put_u64_le(*daemon_origin);
272 buf.put_u64_le(*restored_seq);
273 }
274 MigrationMessage::ReplayComplete {
275 daemon_origin,
276 replayed_seq,
277 target_head,
278 } => {
279 buf.put_u8(MSG_REPLAY_COMPLETE);
280 buf.put_u64_le(*daemon_origin);
281 buf.put_u64_le(*replayed_seq);
282 buf.extend_from_slice(&target_head.to_bytes());
283 }
284 MigrationMessage::CutoverNotify {
285 daemon_origin,
286 target_node,
287 } => {
288 buf.put_u8(MSG_CUTOVER_NOTIFY);
289 buf.put_u64_le(*daemon_origin);
290 buf.put_u64_le(*target_node);
291 }
292 MigrationMessage::CleanupComplete { daemon_origin } => {
293 buf.put_u8(MSG_CLEANUP_COMPLETE);
294 buf.put_u64_le(*daemon_origin);
295 }
296 MigrationMessage::MigrationFailed {
297 daemon_origin,
298 reason,
299 } => {
300 buf.put_u8(MSG_FAILED);
301 buf.put_u64_le(*daemon_origin);
302 // Wire layout:
303 // code: u16 le
304 // then variant-specific payload (0 bytes for
305 // zero-payload variants; `u16 le + bytes` for
306 // string-bearing variants; `u8` for NotReadyTimeout).
307 buf.put_u16_le(reason.code());
308 match reason {
309 MigrationFailureReason::NotReady
310 | MigrationFailureReason::FactoryNotFound
311 | MigrationFailureReason::ComputeNotSupported
312 | MigrationFailureReason::AlreadyMigrating => {}
313 MigrationFailureReason::StateFailed(msg)
314 | MigrationFailureReason::IdentityTransportFailed(msg) => {
315 let len = u16::try_from(msg.len()).map_err(|_| {
316 MigrationError::StateFailed(format!(
317 "failure reason message length {} exceeds u16::MAX",
318 msg.len()
319 ))
320 })?;
321 buf.put_u16_le(len);
322 buf.extend_from_slice(msg.as_bytes());
323 }
324 MigrationFailureReason::NotReadyTimeout { attempts } => {
325 buf.put_u8(*attempts);
326 }
327 }
328 }
329 MigrationMessage::BufferedEvents {
330 daemon_origin,
331 events,
332 } => {
333 let event_count = len_u32("events", events.len())?;
334 buf.put_u8(MSG_BUFFERED_EVENTS);
335 buf.put_u64_le(*daemon_origin);
336 buf.put_u32_le(event_count);
337 for event in events {
338 let payload_len = len_u32("event payload", event.payload.len())?;
339 let link_bytes = event.link.to_bytes();
340 buf.extend_from_slice(&link_bytes);
341 buf.put_u32_le(payload_len);
342 buf.extend_from_slice(&event.payload);
343 buf.put_u64_le(event.received_at);
344 }
345 }
346 MigrationMessage::ActivateTarget { daemon_origin } => {
347 buf.put_u8(MSG_ACTIVATE_TARGET);
348 buf.put_u64_le(*daemon_origin);
349 }
350 MigrationMessage::ActivateAck {
351 daemon_origin,
352 replayed_seq,
353 } => {
354 buf.put_u8(MSG_ACTIVATE_ACK);
355 buf.put_u64_le(*daemon_origin);
356 buf.put_u64_le(*replayed_seq);
357 }
358 }
359
360 Ok(buf)
361 }
362
363 /// Decode a migration message from bytes.
364 pub fn decode(data: &[u8]) -> Result<MigrationMessage, MigrationError> {
365 if data.is_empty() {
366 return Err(MigrationError::StateFailed("empty message".into()));
367 }
368
369 let mut cur = std::io::Cursor::new(data);
370
371 let msg_type = cur.get_u8();
372
373 match msg_type {
374 MSG_TAKE_SNAPSHOT => {
375 if cur.remaining() < 16 {
376 return Err(MigrationError::StateFailed("truncated TakeSnapshot".into()));
377 }
378 Ok(MigrationMessage::TakeSnapshot {
379 daemon_origin: cur.get_u64_le(),
380 target_node: cur.get_u64_le(),
381 })
382 }
383 MSG_SNAPSHOT_READY => {
384 // daemon_origin(8) + seq_through(8) + chunk_index(4) + total_chunks(4) + len(4) = 28
385 if cur.remaining() < 28 {
386 return Err(MigrationError::StateFailed(
387 "truncated SnapshotReady".into(),
388 ));
389 }
390 let daemon_origin = cur.get_u64_le();
391 let seq_through = cur.get_u64_le();
392 let chunk_index = cur.get_u32_le();
393 let total_chunks = cur.get_u32_le();
394 let len = cur.get_u32_le() as usize;
395 // Reject structurally invalid chunks at the wire boundary so
396 // malformed messages never even reach the reassembler. The
397 // reassembler enforces the same invariants defensively.
398 if total_chunks == 0 {
399 return Err(MigrationError::StateFailed(
400 "SnapshotReady: total_chunks must be >= 1".into(),
401 ));
402 }
403 if total_chunks > MAX_TOTAL_CHUNKS {
404 return Err(MigrationError::StateFailed(format!(
405 "SnapshotReady: total_chunks {} exceeds MAX_TOTAL_CHUNKS ({})",
406 total_chunks, MAX_TOTAL_CHUNKS
407 )));
408 }
409 if chunk_index >= total_chunks {
410 return Err(MigrationError::StateFailed(format!(
411 "SnapshotReady: chunk_index {} out of range for total_chunks {}",
412 chunk_index, total_chunks
413 )));
414 }
415 if len > MAX_SNAPSHOT_CHUNK_SIZE {
416 return Err(MigrationError::StateFailed(format!(
417 "SnapshotReady: chunk len {} exceeds MAX_SNAPSHOT_CHUNK_SIZE ({})",
418 len, MAX_SNAPSHOT_CHUNK_SIZE
419 )));
420 }
421 if cur.remaining() < len {
422 return Err(MigrationError::StateFailed(
423 "truncated snapshot payload".into(),
424 ));
425 }
426 let mut snapshot_bytes = vec![0u8; len];
427 cur.copy_to_slice(&mut snapshot_bytes);
428 Ok(MigrationMessage::SnapshotReady {
429 daemon_origin,
430 snapshot_bytes,
431 seq_through,
432 chunk_index,
433 total_chunks,
434 })
435 }
436 MSG_RESTORE_COMPLETE => {
437 if cur.remaining() < 16 {
438 return Err(MigrationError::StateFailed(
439 "truncated RestoreComplete".into(),
440 ));
441 }
442 Ok(MigrationMessage::RestoreComplete {
443 daemon_origin: cur.get_u64_le(),
444 restored_seq: cur.get_u64_le(),
445 })
446 }
447 MSG_REPLAY_COMPLETE => {
448 // 8 (origin) + 8 (seq) + 32 (target_head) = 48
449 if cur.remaining() < 48 {
450 return Err(MigrationError::StateFailed(
451 "truncated ReplayComplete".into(),
452 ));
453 }
454 let daemon_origin = cur.get_u64_le();
455 let replayed_seq = cur.get_u64_le();
456 let mut head_bytes = [0u8; 32];
457 cur.copy_to_slice(&mut head_bytes);
458 let target_head = CausalLink::from_bytes(&head_bytes).ok_or_else(|| {
459 MigrationError::StateFailed(
460 "ReplayComplete: malformed target_head bytes".into(),
461 )
462 })?;
463 Ok(MigrationMessage::ReplayComplete {
464 daemon_origin,
465 replayed_seq,
466 target_head,
467 })
468 }
469 MSG_CUTOVER_NOTIFY => {
470 if cur.remaining() < 16 {
471 return Err(MigrationError::StateFailed(
472 "truncated CutoverNotify".into(),
473 ));
474 }
475 Ok(MigrationMessage::CutoverNotify {
476 daemon_origin: cur.get_u64_le(),
477 target_node: cur.get_u64_le(),
478 })
479 }
480 MSG_CLEANUP_COMPLETE => {
481 if cur.remaining() < 8 {
482 return Err(MigrationError::StateFailed(
483 "truncated CleanupComplete".into(),
484 ));
485 }
486 Ok(MigrationMessage::CleanupComplete {
487 daemon_origin: cur.get_u64_le(),
488 })
489 }
490 MSG_FAILED => {
491 if cur.remaining() < 8 + 2 {
492 return Err(MigrationError::StateFailed(
493 "truncated MigrationFailed header".into(),
494 ));
495 }
496 let daemon_origin = cur.get_u64_le();
497 let code = cur.get_u16_le();
498 let reason = decode_failure_reason(&mut cur, code)?;
499 Ok(MigrationMessage::MigrationFailed {
500 daemon_origin,
501 reason,
502 })
503 }
504 MSG_BUFFERED_EVENTS => {
505 if cur.remaining() < 12 {
506 return Err(MigrationError::StateFailed(
507 "truncated BufferedEvents".into(),
508 ));
509 }
510 let daemon_origin = cur.get_u64_le();
511 let count = cur.get_u32_le() as usize;
512 // Bound `count` against the remaining wire bytes before
513 // allocating. Each event on the wire is at least
514 // CAUSAL_LINK_SIZE(24) + u32 payload_len(4) + u64
515 // received_at(8) = 36 bytes (empty payload). Without this
516 // check, a malformed packet could claim `count = u32::MAX`
517 // and force the decoder to allocate ~4G Vec slots before
518 // the per-event bound checks fire — a cheap DoS against
519 // the migration subprotocol.
520 use crate::adapter::net::state::causal::CAUSAL_LINK_SIZE;
521 const MIN_EVENT_WIRE_SIZE: usize = CAUSAL_LINK_SIZE + 4 + 8;
522 // Hard cap as defense-in-depth. Well above any realistic
523 // buffered-event batch (the orchestrator sends one
524 // per-daemon batch at restore-complete; millions of
525 // events per daemon per migration is already pathological).
526 const MAX_BUFFERED_EVENTS: usize = 1_000_000;
527 let max_possible = cur.remaining() / MIN_EVENT_WIRE_SIZE;
528 if count > max_possible || count > MAX_BUFFERED_EVENTS {
529 return Err(MigrationError::StateFailed(format!(
530 "BufferedEvents: count {} exceeds bound (remaining={}, \
531 min_event_size={}, max_possible={}, hard_cap={})",
532 count,
533 cur.remaining(),
534 MIN_EVENT_WIRE_SIZE,
535 max_possible,
536 MAX_BUFFERED_EVENTS,
537 )));
538 }
539 let mut events = Vec::with_capacity(count);
540 for _ in 0..count {
541 if cur.remaining() < CAUSAL_LINK_SIZE + 4 {
542 return Err(MigrationError::StateFailed(
543 "truncated buffered event".into(),
544 ));
545 }
546 let mut link_bytes = [0u8; CAUSAL_LINK_SIZE];
547 cur.copy_to_slice(&mut link_bytes);
548 let link = CausalLink::from_bytes(&link_bytes)
549 .ok_or_else(|| MigrationError::StateFailed("invalid causal link".into()))?;
550 let payload_len = cur.get_u32_le() as usize;
551 // Per-event payload cap. Defence-in-depth against
552 // a peer that ships a buffered-events message
553 // declaring a max-u32 (4 GiB) payload — without
554 // the cap, `Vec::with_capacity(payload_len)` 30
555 // lines below would attempt the allocation. Cap
556 // at MAX_SNAPSHOT_CHUNK_SIZE, the same byte limit
557 // every other per-event wire surface uses; a real
558 // BufferedEvents stream never carries payloads
559 // larger than the snapshot chunk size.
560 if payload_len > MAX_SNAPSHOT_CHUNK_SIZE {
561 return Err(MigrationError::StateFailed(format!(
562 "buffered event payload {} exceeds per-event cap {}",
563 payload_len, MAX_SNAPSHOT_CHUNK_SIZE
564 )));
565 }
566 // Saturate-add so `payload_len + 8` can't wrap on
567 // 32-bit targets and cause the `<` check below to
568 // pass against an attacker-shaped length. The
569 // crate's primary deployment is 64-bit, but the
570 // type is `usize` and a 32-bit cdylib build would
571 // expose the wrap.
572 let need = payload_len.saturating_add(8);
573 if cur.remaining() < need {
574 return Err(MigrationError::StateFailed(
575 "truncated event payload".into(),
576 ));
577 }
578 let mut payload = vec![0u8; payload_len];
579 cur.copy_to_slice(&mut payload);
580 let received_at = cur.get_u64_le();
581 events.push(CausalEvent {
582 link,
583 payload: bytes::Bytes::from(payload),
584 received_at,
585 });
586 }
587 Ok(MigrationMessage::BufferedEvents {
588 daemon_origin,
589 events,
590 })
591 }
592 MSG_ACTIVATE_TARGET => {
593 if cur.remaining() < 8 {
594 return Err(MigrationError::StateFailed(
595 "truncated ActivateTarget".into(),
596 ));
597 }
598 Ok(MigrationMessage::ActivateTarget {
599 daemon_origin: cur.get_u64_le(),
600 })
601 }
602 MSG_ACTIVATE_ACK => {
603 if cur.remaining() < 16 {
604 return Err(MigrationError::StateFailed("truncated ActivateAck".into()));
605 }
606 Ok(MigrationMessage::ActivateAck {
607 daemon_origin: cur.get_u64_le(),
608 replayed_seq: cur.get_u64_le(),
609 })
610 }
611 _ => Err(MigrationError::StateFailed(format!(
612 "unknown message type: {}",
613 msg_type
614 ))),
615 }
616 }
617}
618
619/// Decode a `MigrationFailureReason` from the `MSG_FAILED` variant
620/// payload. The 16-bit tag already consumed by the caller selects
621/// the variant; unknown tags are rejected so forward-compat is
622/// explicit rather than silent-ignore.
623fn decode_failure_reason(
624 cur: &mut std::io::Cursor<&[u8]>,
625 code: u16,
626) -> Result<MigrationFailureReason, MigrationError> {
627 match code {
628 0 => Ok(MigrationFailureReason::NotReady),
629 1 => Ok(MigrationFailureReason::FactoryNotFound),
630 2 => Ok(MigrationFailureReason::ComputeNotSupported),
631 3 => {
632 let msg = read_u16_string(cur, "StateFailed message")?;
633 Ok(MigrationFailureReason::StateFailed(msg))
634 }
635 4 => Ok(MigrationFailureReason::AlreadyMigrating),
636 5 => {
637 let msg = read_u16_string(cur, "IdentityTransportFailed message")?;
638 Ok(MigrationFailureReason::IdentityTransportFailed(msg))
639 }
640 6 => {
641 if cur.remaining() < 1 {
642 return Err(MigrationError::StateFailed(
643 "truncated NotReadyTimeout attempts field".into(),
644 ));
645 }
646 Ok(MigrationFailureReason::NotReadyTimeout {
647 attempts: cur.get_u8(),
648 })
649 }
650 other => Err(MigrationError::StateFailed(format!(
651 "unknown MigrationFailureReason code {other}",
652 ))),
653 }
654}
655
656fn read_u16_string(cur: &mut std::io::Cursor<&[u8]>, ctx: &str) -> Result<String, MigrationError> {
657 if cur.remaining() < 2 {
658 return Err(MigrationError::StateFailed(format!(
659 "truncated {ctx} length prefix",
660 )));
661 }
662 let len = cur.get_u16_le() as usize;
663 if cur.remaining() < len {
664 return Err(MigrationError::StateFailed(format!("truncated {ctx} body")));
665 }
666 let mut bytes = vec![0u8; len];
667 cur.copy_to_slice(&mut bytes);
668 String::from_utf8(bytes)
669 .map_err(|e| MigrationError::StateFailed(format!("{ctx} is not valid UTF-8: {e}")))
670}
671
672// ── Snapshot chunking ────────────────────────────────────────────────────────
673
674/// Maximum snapshot chunk size. Sized to fit within `MAX_PAYLOAD_SIZE` after
675/// accounting for the SnapshotReady wire header overhead
676/// (msg_type + daemon_origin + seq_through + chunk_index + total_chunks + len = 29 bytes)
677/// and leaving headroom for the outer transport framing.
678pub const MAX_SNAPSHOT_CHUNK_SIZE: usize = 7000;
679
680/// Maximum transferable snapshot size: `u32::MAX` chunks * 7,000 bytes per chunk.
681///
682/// This is ~28 TB — effectively unlimited for daemon state. The `StateSnapshot`
683/// wire format itself caps at ~4 GB (`state_len: u32`), so in practice the
684/// snapshot serialization limit is reached first.
685pub const MAX_SNAPSHOT_SIZE: usize = u32::MAX as usize * MAX_SNAPSHOT_CHUNK_SIZE;
686
687/// Maximum `total_chunks` the reassembler will accept per reassembly.
688///
689/// `StateSnapshot` wire format caps payload at ~4 GB (`state_len: u32`), so
690/// `ceil(u32::MAX / MAX_SNAPSHOT_CHUNK_SIZE)` ≈ 613,566 chunks is the largest
691/// legitimate value. We cap above that with headroom; anything higher is an
692/// attacker declaring a fake `total_chunks` to either flood us with
693/// BTreeMap insertions or stall the reassembler forever waiting for chunks
694/// that will never arrive.
695pub const MAX_TOTAL_CHUNKS: u32 = 700_000;
696
697/// Hard upper bound on bytes buffered for a SINGLE in-flight reassembly.
698///
699/// `MAX_TOTAL_CHUNKS × MAX_SNAPSHOT_CHUNK_SIZE` ≈ 4.3 GiB; combined with the
700/// fact that `seq_through == latest` doesn't trigger eviction, an attacker
701/// can park up to that much memory per `(daemon_origin, seq_through)` and
702/// refresh forever without ever completing the snapshot. This cap is a
703/// hard ceiling on the per-entry buffer regardless of the declared
704/// `total_chunks`. Real daemon snapshots run in the megabytes, not
705/// gigabytes — 64 MiB leaves plenty of headroom while bounding the
706/// flood-amplification a malicious peer can produce.
707pub const MAX_PENDING_REASSEMBLY_BYTES: usize = 64 * 1024 * 1024;
708
709/// Maximum age of a pending reassembly entry before it is swept.
710///
711/// Even with the per-entry byte cap, a peer can park up to
712/// `MAX_PENDING_REASSEMBLY_BYTES` indefinitely under a single
713/// `(daemon_origin, seq_through)` key: the cap refuses *additional*
714/// chunks once buffered bytes reach the ceiling, but it doesn't
715/// evict what's already there, and the `seq_through > latest`
716/// eviction never fires while the peer re-uses the same
717/// `seq_through`. Across many distinct `daemon_origin` values that
718/// produces unbounded growth. The age sweep closes that hole by
719/// removing entries whose last-progress timestamp is older than
720/// this duration. Real snapshots complete in seconds; 5 minutes
721/// leaves headroom for a slow legitimate peer while bounding the
722/// persistence of attacker-shaped state.
723pub const MAX_PENDING_REASSEMBLY_AGE: Duration = Duration::from_secs(300);
724
725/// Split a snapshot into chunked `SnapshotReady` messages.
726///
727/// Small snapshots (<= `MAX_SNAPSHOT_CHUNK_SIZE`) produce a single message
728/// with `chunk_index = 0, total_chunks = 1`. Larger snapshots are split
729/// into multiple messages that the receiver must reassemble.
730///
731/// Returns `MigrationError::SnapshotTooLarge` if the snapshot exceeds
732/// `MAX_SNAPSHOT_SIZE` (~28 TB).
733pub fn chunk_snapshot(
734 daemon_origin: u64,
735 snapshot_bytes: Vec<u8>,
736 seq_through: u64,
737) -> Result<Vec<MigrationMessage>, MigrationError> {
738 if snapshot_bytes.len() <= MAX_SNAPSHOT_CHUNK_SIZE {
739 return Ok(vec![MigrationMessage::SnapshotReady {
740 daemon_origin,
741 snapshot_bytes,
742 seq_through,
743 chunk_index: 0,
744 total_chunks: 1,
745 }]);
746 }
747
748 let total_chunks = snapshot_bytes.len().div_ceil(MAX_SNAPSHOT_CHUNK_SIZE);
749 let total_chunks =
750 u32::try_from(total_chunks).map_err(|_| MigrationError::SnapshotTooLarge {
751 size: snapshot_bytes.len(),
752 max: MAX_SNAPSHOT_SIZE,
753 })?;
754
755 Ok(snapshot_bytes
756 .chunks(MAX_SNAPSHOT_CHUNK_SIZE)
757 .enumerate()
758 .map(|(i, chunk)| MigrationMessage::SnapshotReady {
759 daemon_origin,
760 snapshot_bytes: chunk.to_vec(),
761 seq_through,
762 chunk_index: i as u32,
763 total_chunks,
764 })
765 .collect())
766}
767
768/// Why a chunk was rejected by [`SnapshotReassembler::feed`].
769#[derive(Debug, Clone, Copy, PartialEq, Eq)]
770pub enum ReassemblyError {
771 /// `total_chunks == 0` — a well-formed message always declares at least one.
772 ZeroTotalChunks,
773 /// `chunk_index >= total_chunks` — attacker trying to smuggle out-of-range
774 /// indices past the "all chunks received" count check.
775 ChunkIndexOutOfRange {
776 /// The chunk index declared by the peer.
777 chunk_index: u32,
778 /// The `total_chunks` declared by the peer.
779 total_chunks: u32,
780 },
781 /// `total_chunks > MAX_TOTAL_CHUNKS` — peer declared more chunks than any
782 /// legitimate snapshot could produce.
783 TotalChunksTooLarge {
784 /// The `total_chunks` declared by the peer.
785 total_chunks: u32,
786 },
787 /// An individual chunk exceeds `MAX_SNAPSHOT_CHUNK_SIZE`.
788 ChunkTooLarge {
789 /// The chunk length observed.
790 len: usize,
791 },
792 /// A later chunk declared a different `total_chunks` than the first chunk
793 /// for the same `(daemon_origin, seq_through)`. Peer is either buggy or
794 /// trying to resize an in-flight reassembly to force extra allocations.
795 TotalChunksMismatch {
796 /// The value declared by the current chunk.
797 got: u32,
798 /// The value locked in by the first chunk.
799 expected: u32,
800 },
801 /// Peer sent a chunk for an older `seq_through` after we already
802 /// accepted a newer one for the same daemon.
803 StaleSeqThrough {
804 /// The `seq_through` on the incoming chunk.
805 got: u64,
806 /// The newest `seq_through` we've accepted for this daemon.
807 latest: u64,
808 },
809 /// Buffered bytes for this `(daemon_origin, seq_through)` would
810 /// exceed `MAX_PENDING_REASSEMBLY_BYTES`. Refusing the chunk
811 /// bounds the memory amplification a peer can drive by sending
812 /// only some of the chunks for an outsized declared snapshot.
813 TooManyPendingBytes {
814 /// Bytes already buffered for this entry.
815 buffered: usize,
816 /// Length of the chunk being rejected.
817 incoming: usize,
818 /// Per-entry cap.
819 cap: usize,
820 },
821}
822
823impl std::fmt::Display for ReassemblyError {
824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825 match self {
826 Self::ZeroTotalChunks => write!(f, "total_chunks == 0"),
827 Self::ChunkIndexOutOfRange {
828 chunk_index,
829 total_chunks,
830 } => write!(
831 f,
832 "chunk_index {} out of range for total_chunks {}",
833 chunk_index, total_chunks
834 ),
835 Self::TotalChunksTooLarge { total_chunks } => write!(
836 f,
837 "total_chunks {} exceeds MAX_TOTAL_CHUNKS ({})",
838 total_chunks, MAX_TOTAL_CHUNKS
839 ),
840 Self::ChunkTooLarge { len } => write!(
841 f,
842 "chunk length {} exceeds MAX_SNAPSHOT_CHUNK_SIZE ({})",
843 len, MAX_SNAPSHOT_CHUNK_SIZE
844 ),
845 Self::TotalChunksMismatch { got, expected } => write!(
846 f,
847 "total_chunks {} does not match first chunk's declared {}",
848 got, expected
849 ),
850 Self::StaleSeqThrough { got, latest } => write!(
851 f,
852 "seq_through {} is older than latest accepted {} for this daemon",
853 got, latest
854 ),
855 Self::TooManyPendingBytes {
856 buffered,
857 incoming,
858 cap,
859 } => write!(
860 f,
861 "buffered {} + incoming {} would exceed per-entry cap {}",
862 buffered, incoming, cap
863 ),
864 }
865 }
866}
867
868impl std::error::Error for ReassemblyError {}
869
870/// Reassembles chunked `SnapshotReady` messages into a complete snapshot.
871///
872/// Keyed by `(daemon_origin, seq_through)` so chunks from different snapshot
873/// generations cannot be mixed. At most one in-flight reassembly is kept
874/// per daemon — a chunk for a newer `seq_through` evicts any older pending
875/// state for that daemon, and chunks for older `seq_through` are rejected.
876pub struct SnapshotReassembler {
877 /// Pending reassemblies: (daemon_origin, seq_through) → chunks.
878 pending: std::collections::HashMap<(u64, u64), ReassemblyState>,
879 /// Latest `seq_through` accepted per daemon, for stale-chunk rejection
880 /// even after a reassembly completes and is evicted from `pending`.
881 latest_seq: std::collections::HashMap<u64, u64>,
882 /// Max age applied by the opportunistic sweep at the head of
883 /// every `feed`. Defaults to `MAX_PENDING_REASSEMBLY_AGE`. The
884 /// public `sweep_stale` accepts its own `max_age` and ignores
885 /// this; this field exists so tests can drive the implicit
886 /// sweep without waiting wall-clock minutes.
887 max_pending_age: Duration,
888}
889
890struct ReassemblyState {
891 total_chunks: u32,
892 chunks: std::collections::BTreeMap<u32, Vec<u8>>,
893 /// Sum of `chunks` values' lengths. Maintained explicitly (rather
894 /// than recomputed via `chunks.values().map(Vec::len).sum()` per
895 /// feed) so the `MAX_PENDING_REASSEMBLY_BYTES` gate is O(1) per
896 /// chunk instead of O(chunks).
897 bytes_buffered: usize,
898 /// Time of the most recent chunk arrival for this entry. Resets
899 /// on every accepted chunk so a slow-but-progressing peer never
900 /// trips the age sweep; a stalled entry that hasn't received a
901 /// chunk in `MAX_PENDING_REASSEMBLY_AGE` is dropped by either
902 /// `sweep_stale` or the opportunistic sweep at the head of
903 /// `feed`.
904 last_progress_at: Instant,
905}
906
907impl SnapshotReassembler {
908 /// Create a new reassembler with the default
909 /// `MAX_PENDING_REASSEMBLY_AGE` opportunistic-sweep age.
910 pub fn new() -> Self {
911 Self::with_max_pending_age(MAX_PENDING_REASSEMBLY_AGE)
912 }
913
914 /// Create a reassembler with a custom opportunistic-sweep age.
915 /// Production callers should use `new()`; this exists primarily
916 /// for tests that need to exercise the in-`feed` sweep without
917 /// waiting wall-clock minutes.
918 pub fn with_max_pending_age(max_pending_age: Duration) -> Self {
919 Self {
920 pending: std::collections::HashMap::new(),
921 latest_seq: std::collections::HashMap::new(),
922 max_pending_age,
923 }
924 }
925
926 /// Feed a snapshot chunk.
927 ///
928 /// Returns `Ok(Some(bytes))` when all chunks for the current
929 /// `(daemon_origin, seq_through)` have been received, `Ok(None)` while
930 /// still waiting, and `Err(ReassemblyError)` if the chunk is malformed or
931 /// part of an attacker-shaped sequence. Rejected chunks never mutate
932 /// in-flight state.
933 pub fn feed(
934 &mut self,
935 daemon_origin: u64,
936 snapshot_bytes: Vec<u8>,
937 seq_through: u64,
938 chunk_index: u32,
939 total_chunks: u32,
940 ) -> Result<Option<Vec<u8>>, ReassemblyError> {
941 // ---- Per-chunk validation (no mutation until we've passed these) ----
942 // Pre-fix the opportunistic age sweep ran BEFORE these
943 // structural checks, so a peer that shipped a torrent of
944 // malformed chunks (total_chunks=0, oversize, etc.) forced
945 // an O(n) sweep on every reject — amplifying the malformed-
946 // chunk DoS. Move the sweep after validation so attacker
947 // junk is rejected for O(1) instead.
948 if total_chunks == 0 {
949 return Err(ReassemblyError::ZeroTotalChunks);
950 }
951 if total_chunks > MAX_TOTAL_CHUNKS {
952 return Err(ReassemblyError::TotalChunksTooLarge { total_chunks });
953 }
954 if chunk_index >= total_chunks {
955 return Err(ReassemblyError::ChunkIndexOutOfRange {
956 chunk_index,
957 total_chunks,
958 });
959 }
960
961 // Opportunistic age sweep. Even without an external scheduler
962 // driving `sweep_stale`, the pending map self-heals as new
963 // traffic arrives, so a hostile peer who parks an entry at
964 // the byte cap and goes silent can't keep it alive
965 // indefinitely. Cheap: `pending` is bounded to one entry
966 // per daemon and the retain is O(n) over a small map.
967 self.sweep_stale(self.max_pending_age);
968 // Zero-byte chunks are nonsensical: every legitimate
969 // SnapshotReady carries at least one byte of state (an empty
970 // snapshot would be a 1-byte length-prefixed empty payload,
971 // not a 0-byte chunk). Pre-fix a peer could ship
972 // MAX_TOTAL_CHUNKS = 700_000 zero-byte chunks per reassembly
973 // without ever consuming the documented byte-budget guard,
974 // bookkeeping `BTreeMap` entries until `MAX_TOTAL_CHUNKS`
975 // alone bounded the abuse. Refuse them at the boundary.
976 if snapshot_bytes.is_empty() {
977 return Err(ReassemblyError::ChunkTooLarge { len: 0 });
978 }
979 if snapshot_bytes.len() > MAX_SNAPSHOT_CHUNK_SIZE {
980 return Err(ReassemblyError::ChunkTooLarge {
981 len: snapshot_bytes.len(),
982 });
983 }
984 if let Some(&latest) = self.latest_seq.get(&daemon_origin) {
985 if seq_through < latest {
986 return Err(ReassemblyError::StaleSeqThrough {
987 got: seq_through,
988 latest,
989 });
990 }
991 }
992
993 // A newer seq_through for the same daemon evicts older in-flight state.
994 // This is what the public docstring always claimed; without it, the
995 // `pending` map grew unbounded across seq_through values.
996 if self
997 .latest_seq
998 .get(&daemon_origin)
999 .is_none_or(|&latest| seq_through > latest)
1000 {
1001 self.pending
1002 .retain(|&(origin, seq), _| origin != daemon_origin || seq == seq_through);
1003 self.latest_seq.insert(daemon_origin, seq_through);
1004 }
1005
1006 // Single-chunk fast path: no state to keep. Honour the
1007 // total_chunks-mismatch guard before bypassing — a peer that
1008 // shipped chunk 0/3 for `(daemon_origin, seq_through)` and
1009 // followed up with chunk 0/1 for the same key would otherwise
1010 // have the second message accepted as a complete snapshot,
1011 // dodging the mismatch error the multi-chunk path below
1012 // returns. The dedup-by-seq_through eviction above only fires
1013 // when a *newer* seq_through arrives; same-key collisions
1014 // still need to be caught here.
1015 if total_chunks == 1 {
1016 if let Some(state) = self.pending.get(&(daemon_origin, seq_through)) {
1017 if state.total_chunks != 1 {
1018 return Err(ReassemblyError::TotalChunksMismatch {
1019 got: 1,
1020 expected: state.total_chunks,
1021 });
1022 }
1023 }
1024 self.pending.remove(&(daemon_origin, seq_through));
1025 return Ok(Some(snapshot_bytes));
1026 }
1027
1028 let key = (daemon_origin, seq_through);
1029 let state = self.pending.entry(key).or_insert_with(|| ReassemblyState {
1030 total_chunks,
1031 chunks: std::collections::BTreeMap::new(),
1032 bytes_buffered: 0,
1033 last_progress_at: Instant::now(),
1034 });
1035
1036 // The first chunk fixes total_chunks; later chunks must agree.
1037 if state.total_chunks != total_chunks {
1038 return Err(ReassemblyError::TotalChunksMismatch {
1039 got: total_chunks,
1040 expected: state.total_chunks,
1041 });
1042 }
1043
1044 // Per-entry bytes cap. Refuse a chunk that would push the
1045 // accumulated buffer past `MAX_PENDING_REASSEMBLY_BYTES`.
1046 // Re-sending the same chunk index doesn't double-count: we
1047 // subtract the displaced chunk's length below before
1048 // re-checking. A peer that declares an oversized snapshot
1049 // and ships only some of the chunks can no longer park
1050 // ~4 GiB indefinitely — the cap forces the entry to be
1051 // refused once buffered bytes exceed the ceiling.
1052 let displaced_len = state.chunks.get(&chunk_index).map(Vec::len).unwrap_or(0);
1053 let projected = state
1054 .bytes_buffered
1055 .saturating_sub(displaced_len)
1056 .saturating_add(snapshot_bytes.len());
1057 if projected > MAX_PENDING_REASSEMBLY_BYTES {
1058 return Err(ReassemblyError::TooManyPendingBytes {
1059 buffered: state.bytes_buffered,
1060 incoming: snapshot_bytes.len(),
1061 cap: MAX_PENDING_REASSEMBLY_BYTES,
1062 });
1063 }
1064
1065 let new_len = snapshot_bytes.len();
1066 state.chunks.insert(chunk_index, snapshot_bytes);
1067 state.bytes_buffered = state
1068 .bytes_buffered
1069 .saturating_sub(displaced_len)
1070 .saturating_add(new_len);
1071 state.last_progress_at = Instant::now();
1072
1073 // With `chunk_index < total_chunks` enforced above, the BTreeMap's
1074 // keys are all in 0..total_chunks. Reaching total_chunks entries
1075 // therefore means we have every distinct index exactly once.
1076 if state.chunks.len() == state.total_chunks as usize {
1077 #[expect(
1078 clippy::unwrap_used,
1079 reason = "pending.entry(key).or_insert_with above guarantees the key is present"
1080 )]
1081 let state = self.pending.remove(&key).unwrap();
1082 let mut full = Vec::with_capacity(state.chunks.values().map(|c| c.len()).sum());
1083 for (_idx, chunk) in state.chunks {
1084 full.extend_from_slice(&chunk);
1085 }
1086 Ok(Some(full))
1087 } else {
1088 Ok(None)
1089 }
1090 }
1091
1092 /// Cancel reassembly for a daemon (e.g., on migration abort).
1093 ///
1094 /// Removes all pending reassemblies for this daemon regardless of
1095 /// `seq_through`. Does **not** reset `latest_seq`, so a subsequent
1096 /// replay of old chunks is still rejected.
1097 pub fn cancel(&mut self, daemon_origin: u64) {
1098 self.pending
1099 .retain(|&(origin, _), _| origin != daemon_origin);
1100 }
1101
1102 /// Drop pending reassemblies whose last-progress timestamp is
1103 /// older than `max_age`. Returns the number of entries evicted.
1104 ///
1105 /// Called opportunistically at the head of every `feed`, but
1106 /// also exposed publicly so a topology-aware caller (e.g. the
1107 /// migration dispatcher's housekeeping tick) can drive it on a
1108 /// timer when no inbound traffic is arriving — the
1109 /// `seq_through == latest` path in `feed` cannot self-trigger
1110 /// the sweep without a fresh chunk to cause it.
1111 ///
1112 /// Does **not** reset `latest_seq`: a peer that comes back later
1113 /// with the same `seq_through` is still rejected via the usual
1114 /// `StaleSeqThrough` gate, so dropping the in-flight buffer
1115 /// can't be turned into a snapshot-replacement amplifier.
1116 pub fn sweep_stale(&mut self, max_age: Duration) -> usize {
1117 let before = self.pending.len();
1118 let now = Instant::now();
1119 self.pending.retain(|_, state| {
1120 now.checked_duration_since(state.last_progress_at)
1121 .is_none_or(|age| age < max_age)
1122 });
1123 before - self.pending.len()
1124 }
1125
1126 /// Number of pending reassemblies.
1127 pub fn pending_count(&self) -> usize {
1128 self.pending.len()
1129 }
1130}
1131
1132impl Default for SnapshotReassembler {
1133 fn default() -> Self {
1134 Self::new()
1135 }
1136}
1137
1138// ── Orchestrator ─────────────────────────────────────────────────────────────
1139
1140/// Tracks an in-flight migration with its superposition state.
1141struct MigrationRecord {
1142 state: MigrationState,
1143 superposition: SuperpositionState,
1144 started_at: Instant,
1145}
1146
1147/// One row of [`MigrationOrchestrator::list_migrations`]. Used
1148/// by operator-facing surfaces (Deck MIGRATIONS tab via the
1149/// `MigrationSnapshot` wire form, ICE blast-radius simulator).
1150/// Pre-`MigrationListItem` this was a `(u64, MigrationPhase, u64)`
1151/// tuple; the operator-facing columns outgrew the tuple's
1152/// readability budget, and a named struct documents which
1153/// field is which without per-caller comments.
1154#[derive(Clone, Debug)]
1155pub struct MigrationListItem {
1156 /// Origin hash of the daemon being migrated.
1157 pub daemon_origin: u64,
1158 /// Source node ID.
1159 pub source_node: u64,
1160 /// Target node ID.
1161 pub target_node: u64,
1162 /// Current phase.
1163 pub phase: MigrationPhase,
1164 /// Milliseconds since the migration started.
1165 pub elapsed_ms: u64,
1166 /// Milliseconds since the current phase was entered. Distinct
1167 /// from `elapsed_ms` — a migration ten minutes old that
1168 /// transitioned to Replay one minute ago reports `60_000` here.
1169 pub age_in_phase_ms: u64,
1170 /// Snapshot payload size in bytes; `None` while the source
1171 /// hasn't produced a snapshot yet.
1172 pub snapshot_bytes: Option<u64>,
1173 /// Retry attempts accumulated by orchestrator-driven retries.
1174 pub retries: u32,
1175}
1176
1177/// Coordinates all 6 phases of daemon migration.
1178///
1179/// The orchestrator manages the lifecycle of migrations: initiating snapshots,
1180/// forwarding snapshot data to targets, coordinating replay, executing cutover,
1181/// and cleaning up the source.
1182pub struct MigrationOrchestrator {
1183 /// In-flight migrations: daemon_origin → record.
1184 migrations: DashMap<u64, Mutex<MigrationRecord>>,
1185 /// Local daemon registry (for taking snapshots on local daemons).
1186 daemon_registry: Arc<DaemonRegistry>,
1187 /// Local node ID.
1188 local_node_id: u64,
1189 /// Source-side migration handler.
1190 ///
1191 /// Production wiring (`MigrationDispatcher::new`) sets this so
1192 /// the local-source path of `start_migration` registers the
1193 /// migration in the source-side handler — without that
1194 /// registration, `is_migrating(origin)` returns false and
1195 /// `DaemonRegistry::deliver` keeps mutating the source daemon's
1196 /// state past the snapshot's `seq_through`, so events arriving
1197 /// between snapshot capture and cutover would be silently lost.
1198 /// Tests that don't exercise the local-source path can leave
1199 /// this as `None`; the fallback is direct-snapshot behavior
1200 /// with a `tracing::warn!` flagging the gap.
1201 source_handler: Option<Arc<MigrationSourceHandler>>,
1202}
1203
1204impl MigrationOrchestrator {
1205 /// Create a new orchestrator.
1206 ///
1207 /// The source-side handler defaults to `None`. Production
1208 /// callers should chain [`Self::with_source_handler`] to wire
1209 /// the orchestrator to the dispatcher's source handler — see
1210 /// the field doc for why that matters.
1211 pub fn new(daemon_registry: Arc<DaemonRegistry>, local_node_id: u64) -> Self {
1212 Self {
1213 migrations: DashMap::new(),
1214 daemon_registry,
1215 local_node_id,
1216 source_handler: None,
1217 }
1218 }
1219
1220 /// Builder: install the source-side migration handler. Required
1221 /// for correct local-source migration behavior.
1222 /// `MigrationDispatcher::new` already calls this; tests that
1223 /// exercise the local-source code path must call it explicitly.
1224 pub fn with_source_handler(mut self, source_handler: Arc<MigrationSourceHandler>) -> Self {
1225 self.source_handler = Some(source_handler);
1226 self
1227 }
1228
1229 /// Shared handle to the daemon registry this orchestrator was
1230 /// built against. Exposed so the migration subprotocol
1231 /// dispatcher can reach the registry without an extra `Arc`
1232 /// plumbed alongside.
1233 pub fn daemon_registry(&self) -> &Arc<DaemonRegistry> {
1234 &self.daemon_registry
1235 }
1236
1237 /// Initiate a migration (phase 0: Snapshot).
1238 ///
1239 /// If the source is the local node, takes the snapshot immediately and
1240 /// returns `SnapshotReady`. Otherwise, returns `TakeSnapshot` for the
1241 /// caller to send to the source node.
1242 pub fn start_migration(
1243 &self,
1244 daemon_origin: u64,
1245 source_node: u64,
1246 target_node: u64,
1247 ) -> Result<Vec<MigrationMessage>, MigrationError> {
1248 // Atomic check-and-insert via entry() to prevent TOCTOU races
1249 let entry = match self.migrations.entry(daemon_origin) {
1250 dashmap::mapref::entry::Entry::Occupied(_) => {
1251 return Err(MigrationError::AlreadyMigrating(daemon_origin));
1252 }
1253 dashmap::mapref::entry::Entry::Vacant(entry) => entry,
1254 };
1255
1256 let mut state = MigrationState::new(daemon_origin, source_node, target_node);
1257
1258 // If we are the source, take snapshot locally.
1259 if source_node == self.local_node_id {
1260 // The snapshot MUST be routed through the source-side
1261 // handler so it gets registered for the migration;
1262 // otherwise `source_handler.is_migrating(origin)`
1263 // returns false and `DaemonRegistry::deliver` keeps
1264 // routing post-snapshot events into the live source
1265 // daemon's state, losing them at cutover. The remote-
1266 // source path goes through
1267 // `source_handler.start_snapshot` via the dispatcher
1268 // (`migration_handler.rs:310-312`); we mirror that
1269 // here.
1270 //
1271 // `orchestrator_node` is `local_node_id` here — for a
1272 // self-initiated local migration the orchestrator IS this
1273 // node, so the source handler's reply-routing field
1274 // points back to us.
1275 let snapshot = match &self.source_handler {
1276 Some(handler) => {
1277 handler.start_snapshot(daemon_origin, target_node, self.local_node_id)?
1278 }
1279 None => {
1280 // Surface the unwired-source-handler path
1281 // loudly. Production wiring
1282 // (`MigrationDispatcher::new`) always sets the
1283 // handler, so this branch is only reached by
1284 // tests / direct orchestrator construction.
1285 // Gating this with `cfg(not(test))` and Err in
1286 // production would break integration tests
1287 // that link against the library compiled
1288 // WITHOUT `cfg(test)`. Warn-loud is the
1289 // actionable signal: operators running under
1290 // tracing see the missing-handler condition
1291 // without the cfg mismatch silently breaking
1292 // integration tests that intentionally
1293 // exercise the orchestrator without a
1294 // dispatcher.
1295 tracing::warn!(
1296 daemon_origin = format_args!("{:#x}", daemon_origin),
1297 "MigrationOrchestrator::start_migration on local source without \
1298 a source_handler installed — events arriving between snapshot \
1299 capture and cutover may be silently lost. \
1300 Production callers wire the handler via \
1301 `MigrationDispatcher::new`. Direct orchestrator construction \
1302 should call `MigrationOrchestrator::with_source_handler`."
1303 );
1304 self.daemon_registry
1305 .snapshot(daemon_origin)
1306 .map_err(|e| MigrationError::StateFailed(e.to_string()))?
1307 .ok_or_else(|| {
1308 MigrationError::StateFailed(
1309 "daemon is stateless or snapshot failed".into(),
1310 )
1311 })?
1312 }
1313 };
1314
1315 // Surface oversized-snapshot errors as a
1316 // MigrationError instead of a panic that would crash the
1317 // dispatch task without releasing locks.
1318 let snapshot_bytes = snapshot
1319 .try_to_bytes()
1320 .map_err(|e| MigrationError::StateFailed(e.to_string()))?;
1321 let seq_through = snapshot.through_seq;
1322
1323 state.set_snapshot(snapshot)?;
1324
1325 let source_head = state
1326 .snapshot()
1327 .map(|s| s.chain_link)
1328 .unwrap_or_else(|| CausalLink::genesis(daemon_origin, 0));
1329
1330 let superposition = SuperpositionState::new(daemon_origin, source_head);
1331
1332 entry.insert(Mutex::new(MigrationRecord {
1333 state,
1334 superposition,
1335 started_at: Instant::now(),
1336 }));
1337
1338 // Pre-fix this returned a single `SnapshotReady`
1339 // with `chunk_index: 0, total_chunks: 1` regardless
1340 // of `snapshot_bytes.len()`. Any snapshot larger
1341 // than `MAX_SNAPSHOT_CHUNK_SIZE` (7 KB) was
1342 // rejected at the wire encoder
1343 // (`migration_handler.rs:336`) and the receiver
1344 // dropped it as `ChunkTooLarge`. Locally-initiated
1345 // migration of any stateful daemon with a
1346 // non-trivial state vector (cached models, large
1347 // bindings, behaviour history) thus could not be
1348 // sent at all. Route through `chunk_snapshot` so a
1349 // multi-chunk snapshot returns multiple messages
1350 // for the caller to dispatch in order.
1351 chunk_snapshot(daemon_origin, snapshot_bytes, seq_through)
1352 } else {
1353 let source_head = CausalLink::genesis(daemon_origin, 0);
1354 let superposition = SuperpositionState::new(daemon_origin, source_head);
1355
1356 entry.insert(Mutex::new(MigrationRecord {
1357 state,
1358 superposition,
1359 started_at: Instant::now(),
1360 }));
1361
1362 Ok(vec![MigrationMessage::TakeSnapshot {
1363 daemon_origin,
1364 target_node,
1365 }])
1366 }
1367 }
1368
1369 /// Initiate a migration with automatic target selection.
1370 ///
1371 /// Uses the scheduler to find the best migration-capable target node
1372 /// based on the daemon's capability requirements. The scheduler queries
1373 /// the `CapabilityIndex` for nodes advertising `subprotocol:0x0500`.
1374 ///
1375 /// Returns the target node ID and the first migration message.
1376 pub fn start_migration_auto(
1377 &self,
1378 daemon_origin: u64,
1379 source_node: u64,
1380 scheduler: &super::Scheduler,
1381 daemon_filter: &crate::adapter::net::behavior::capability::CapabilityFilter,
1382 ) -> Result<(u64, Vec<MigrationMessage>), MigrationError> {
1383 // Map a scheduler "no candidate" / "index unavailable"
1384 // outcome to the typed `NoTargetAvailable` variant. Pre-
1385 // fix this used `TargetUnavailable(0)`, surfacing
1386 // "target node 0x0 unavailable" to operators — confusing
1387 // because no specific node id was ever attempted; the
1388 // auto-placement found nobody to attempt against.
1389 // Phase G slice 8 — auto-target migration runs through v2
1390 // placement by default. Ranking flows via
1391 // `select_migration_target` + LOCKED §7 tie-breaker;
1392 // observable eligibility matches v1 because v2 wraps
1393 // `LegacyPlacement::permissive`. The legacy
1394 // `Scheduler::place_migration` is still available for
1395 // callers who explicitly want the v1 contract.
1396 let placement = scheduler
1397 .place_migration_v2(daemon_filter, source_node)
1398 .map_err(|_| MigrationError::NoTargetAvailable)?;
1399
1400 let target_node = placement.node_id;
1401 let msgs = self.start_migration(daemon_origin, source_node, target_node)?;
1402 Ok((target_node, msgs))
1403 }
1404
1405 /// Handle snapshot taken on source (phase 1→2).
1406 ///
1407 /// Validates and stores the snapshot, advances to Transfer phase.
1408 /// Returns the message to forward to the target node. For chunked
1409 /// snapshots, only the first chunk (index 0) triggers validation
1410 /// and phase advancement — subsequent chunks are forwarded as-is.
1411 pub fn on_snapshot_ready(
1412 &self,
1413 daemon_origin: u64,
1414 snapshot_bytes: Vec<u8>,
1415 seq_through: u64,
1416 chunk_index: u32,
1417 total_chunks: u32,
1418 ) -> Result<MigrationMessage, MigrationError> {
1419 let entry = self
1420 .migrations
1421 .get(&daemon_origin)
1422 .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1423
1424 let mut record = entry.lock();
1425
1426 // Per-chunk envelope validation. Runs on chunk_index == 0
1427 // for both single-chunk and multi-chunk paths so the
1428 // orchestrator catches header-level corruption symmetrically.
1429 // Single-chunk's later `from_bytes` is a strictly stronger
1430 // check; the helper is a fast pre-decode rejection.
1431 // Multi-chunk used to defer ALL validation to the target
1432 // after reassembly — chunk 0 of a multi-chunk send with the
1433 // wrong magic / version / origin would only be caught after
1434 // reassembly finished.
1435 validate_chunk_header(chunk_index, &snapshot_bytes, daemon_origin)?;
1436
1437 // Only validate and advance phase on the first chunk
1438 if chunk_index == 0 && total_chunks == 1 {
1439 // Single-chunk: validate immediately and set snapshot
1440 let snapshot = StateSnapshot::from_bytes(&snapshot_bytes).ok_or_else(|| {
1441 MigrationError::StateFailed("failed to parse snapshot bytes".into())
1442 })?;
1443
1444 // Cross-check the wire-supplied seq_through against the
1445 // payload's snapshot.through_seq. Pre-fix a buggy or
1446 // malicious source could ship a SnapshotReady whose wire
1447 // seq_through disagreed with the snapshot's through_seq;
1448 // the disagreement then propagated to the target where
1449 // restore_snapshot consumed snapshot.through_seq for
1450 // replayed_through but logs / retry decisions / audit
1451 // used the wire field, producing a debugging trap.
1452 if snapshot.through_seq != seq_through {
1453 return Err(MigrationError::StateFailed(format!(
1454 "SnapshotReady: wire seq_through {} disagrees with snapshot.through_seq {}",
1455 seq_through, snapshot.through_seq,
1456 )));
1457 }
1458
1459 if record.state.phase() == MigrationPhase::Snapshot {
1460 record.state.set_snapshot(snapshot)?;
1461 }
1462 } else if chunk_index == 0 {
1463 // Multi-chunk: can't validate until target reassembles all chunks.
1464 // Advance phase past Snapshot so buffering and subsequent phases work.
1465 // The target will validate the full snapshot after reassembly.
1466 if record.state.phase() == MigrationPhase::Snapshot {
1467 record.state.force_phase(MigrationPhase::Transfer);
1468 }
1469 }
1470
1471 // Update superposition on first chunk
1472 if chunk_index == 0 {
1473 record.superposition.advance(MigrationPhase::Transfer);
1474 }
1475
1476 // Forward to target
1477 Ok(MigrationMessage::SnapshotReady {
1478 daemon_origin,
1479 snapshot_bytes,
1480 seq_through,
1481 chunk_index,
1482 total_chunks,
1483 })
1484 }
1485
1486 /// Handle restore complete on target (phase 2→3).
1487 ///
1488 /// Advances to Replay phase. Returns buffered events message if there
1489 /// are any, or None if no events were buffered.
1490 pub fn on_restore_complete(
1491 &self,
1492 daemon_origin: u64,
1493 _restored_seq: u64,
1494 ) -> Result<Option<MigrationMessage>, MigrationError> {
1495 let entry = self
1496 .migrations
1497 .get(&daemon_origin)
1498 .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1499
1500 let mut record = entry.lock();
1501
1502 // Advance: Transfer → Restore → Replay
1503 if record.state.phase() == MigrationPhase::Transfer {
1504 record.state.transfer_complete()?;
1505 }
1506 if record.state.phase() == MigrationPhase::Restore {
1507 record.state.restore_complete()?;
1508 }
1509
1510 record.superposition.advance(MigrationPhase::Replay);
1511
1512 // No buffered events ride here — buffering goes through the
1513 // source-side handler exclusively (`MigrationSourceHandler::buffer_event`
1514 // / `on_cutover`'s drain). The orchestrator never holds a
1515 // buffer of its own; the only buffered-events surface is
1516 // the source's drain at cutover time.
1517 Ok(None)
1518 }
1519
1520 /// Handle replay complete on target (phase 3→4).
1521 ///
1522 /// `target_head` is the target's chain head after replay, shipped
1523 /// over the wire by the target node. The orchestrator stamps it
1524 /// into `SuperpositionState::target_replayed` so the continuity
1525 /// proof carries the real cryptographic anchor even when the
1526 /// orchestrator lives on a third node and has no local daemon
1527 /// registry entry to read from.
1528 ///
1529 /// Returns `CutoverNotify` to send to the source node.
1530 pub fn on_replay_complete(
1531 &self,
1532 daemon_origin: u64,
1533 replayed_seq: u64,
1534 target_head: CausalLink,
1535 ) -> Result<MigrationMessage, MigrationError> {
1536 let entry = self
1537 .migrations
1538 .get(&daemon_origin)
1539 .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1540
1541 let mut record = entry.lock();
1542
1543 // Sanity-check the wire-shipped head against the wire-shipped
1544 // replayed_seq. A mismatch signals either a buggy target or a
1545 // wire-level tamper; either way the link is no longer
1546 // trustworthy as the migration's authoritative anchor.
1547 if target_head.sequence != replayed_seq {
1548 tracing::warn!(
1549 daemon_origin = format_args!("{:#x}", daemon_origin),
1550 head_seq = target_head.sequence,
1551 replayed_seq,
1552 "on_replay_complete: target_head.sequence disagrees with replayed_seq; \
1553 using target_head as-is but the continuity proof anchor may not match \
1554 the post-replay chain head"
1555 );
1556 }
1557 record.superposition.target_replayed(target_head);
1558
1559 // Advance to Cutover
1560 if record.state.phase() == MigrationPhase::Replay {
1561 record.state.replay_complete()?;
1562 }
1563
1564 record.superposition.advance(MigrationPhase::Cutover);
1565 record.superposition.collapse();
1566
1567 let target_node = record.state.target_node();
1568
1569 Ok(MigrationMessage::CutoverNotify {
1570 daemon_origin,
1571 target_node,
1572 })
1573 }
1574
1575 /// Handle cutover acknowledged by source.
1576 ///
1577 /// Source has stopped accepting writes. Advances to Complete.
1578 pub fn on_cutover_acknowledged(&self, daemon_origin: u64) -> Result<(), MigrationError> {
1579 let entry = self
1580 .migrations
1581 .get(&daemon_origin)
1582 .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1583
1584 let mut record = entry.lock();
1585
1586 if record.state.phase() == MigrationPhase::Cutover {
1587 record.state.cutover_complete()?;
1588 }
1589
1590 record.superposition.advance(MigrationPhase::Complete);
1591 record.superposition.resolve();
1592
1593 Ok(())
1594 }
1595
1596 /// Handle cleanup complete from source (phase 5→6).
1597 ///
1598 /// The source has stopped accepting writes and freed its local daemon
1599 /// state. Advances Cutover→Complete on the orchestrator — the source's
1600 /// local `on_cutover_acknowledged` call is a no-op when the orchestrator
1601 /// lives on a different node (it operates on the source's local
1602 /// orchestrator, which has no record), so `CleanupComplete` is the
1603 /// authoritative signal on the orchestrator side. The record is kept in
1604 /// place until the target acknowledges activation via `on_activate_ack`,
1605 /// so the subprotocol handler still has somewhere to look up
1606 /// `target_node` when it needs to route `ActivateTarget`.
1607 pub fn on_cleanup_complete(
1608 &self,
1609 daemon_origin: u64,
1610 ) -> Result<MigrationMessage, MigrationError> {
1611 let entry = self
1612 .migrations
1613 .get(&daemon_origin)
1614 .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1615 let mut record = entry.lock();
1616 if record.state.phase() == MigrationPhase::Cutover {
1617 record.state.cutover_complete()?;
1618 }
1619 // Also resolve `SuperpositionState` here, mirroring
1620 // `on_cutover_acknowledged`. On a remote orchestrator
1621 // `on_cutover_acknowledged` is a no-op (operates on the
1622 // source's local orchestrator, which has no record for
1623 // this daemon), so this path is the ONLY authoritative
1624 // one. Without the resolve, `SuperpositionState` would be
1625 // stuck mid-collapse and operator dashboards / readiness
1626 // probes / SDK handles keyed on superposition state
1627 // wouldn't observe resolution until `on_activate_ack`
1628 // removed the record entirely. The advance/resolve is
1629 // idempotent — safe to run on the local-orchestrator path
1630 // too if both signals arrive on the same node.
1631 record.superposition.advance(MigrationPhase::Complete);
1632 record.superposition.resolve();
1633 Ok(MigrationMessage::ActivateTarget { daemon_origin })
1634 }
1635
1636 /// Handle activation acknowledgement from target (phase 6 terminus).
1637 ///
1638 /// The target has drained remaining events and is now the authoritative
1639 /// copy. This is the true end of the migration lifecycle; the record is
1640 /// removed here, not in `on_cleanup_complete`.
1641 pub fn on_activate_ack(
1642 &self,
1643 daemon_origin: u64,
1644 _replayed_seq: u64,
1645 ) -> Result<(), MigrationError> {
1646 self.migrations
1647 .remove(&daemon_origin)
1648 .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1649 Ok(())
1650 }
1651
1652 /// Abort a migration at any phase.
1653 ///
1654 /// Returns the abort message to broadcast to involved nodes.
1655 /// `reason` is wrapped in [`MigrationFailureReason::StateFailed`]
1656 /// since a generic abort doesn't fit any of the more specific
1657 /// variants.
1658 pub fn abort_migration(
1659 &self,
1660 daemon_origin: u64,
1661 reason: String,
1662 ) -> Result<MigrationMessage, MigrationError> {
1663 self.abort_migration_with_reason(daemon_origin, MigrationFailureReason::StateFailed(reason))
1664 }
1665
1666 /// Abort a migration with a caller-supplied structured reason.
1667 ///
1668 /// Removes the orchestrator's record AND, if wired, also clears
1669 /// the matching entry on the local `MigrationSourceHandler`.
1670 /// Pre-fix only the orchestrator entry was removed; the source
1671 /// handler's `migrations` map retained the entry, so
1672 /// `is_migrating()` stayed true forever and `buffer_event` kept
1673 /// stuffing the now-undrained buffered_events vector. Subsequent
1674 /// retry attempts then tripped `AlreadyMigrating` against a
1675 /// migration the orchestrator believed was already aborted.
1676 pub fn abort_migration_with_reason(
1677 &self,
1678 daemon_origin: u64,
1679 reason: MigrationFailureReason,
1680 ) -> Result<MigrationMessage, MigrationError> {
1681 self.migrations
1682 .remove(&daemon_origin)
1683 .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;
1684
1685 // Clear the source-side mirror entry. `abort` is a no-op if
1686 // the daemon was never tracked there (e.g. remote-source
1687 // migration where this node is only the orchestrator), so
1688 // calling it unconditionally is safe.
1689 if let Some(source) = &self.source_handler {
1690 let _ = source.abort(daemon_origin);
1691 }
1692
1693 Ok(MigrationMessage::MigrationFailed {
1694 daemon_origin,
1695 reason,
1696 })
1697 }
1698
1699 /// Check if a daemon is currently being migrated.
1700 pub fn is_migrating(&self, daemon_origin: u64) -> bool {
1701 self.migrations.contains_key(&daemon_origin)
1702 }
1703
1704 /// Get migration status for a daemon.
1705 pub fn status(&self, daemon_origin: u64) -> Option<MigrationPhase> {
1706 self.migrations
1707 .get(&daemon_origin)
1708 .map(|entry| entry.lock().state.phase())
1709 }
1710
1711 /// Get the source node for an in-flight migration.
1712 pub fn source_node(&self, daemon_origin: u64) -> Option<u64> {
1713 self.migrations
1714 .get(&daemon_origin)
1715 .map(|entry| entry.lock().state.source_node())
1716 }
1717
1718 /// Get the target node for an in-flight migration.
1719 pub fn target_node(&self, daemon_origin: u64) -> Option<u64> {
1720 self.migrations
1721 .get(&daemon_origin)
1722 .map(|entry| entry.lock().state.target_node())
1723 }
1724
1725 /// Get superposition phase for a daemon.
1726 pub fn superposition_phase(
1727 &self,
1728 daemon_origin: u64,
1729 ) -> Option<crate::adapter::net::continuity::superposition::SuperpositionPhase> {
1730 self.migrations
1731 .get(&daemon_origin)
1732 .map(|entry| entry.lock().superposition.phase())
1733 }
1734
1735 /// List all in-flight migrations: (daemon_origin, phase, elapsed_ms).
1736 pub fn list_migrations(&self) -> Vec<MigrationListItem> {
1737 self.migrations
1738 .iter()
1739 .map(|entry| {
1740 let record = entry.lock();
1741 // Saturating cast through the same helper migration.rs
1742 // already publishes. Pre-fix the raw `as u64` wrapped
1743 // instead of saturating on a (theoretical) u128 over
1744 // u64::MAX milliseconds — inconsistent with the
1745 // canonical state.elapsed_ms() used in migration.rs:283.
1746 let elapsed =
1747 u64::try_from(record.started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
1748 MigrationListItem {
1749 daemon_origin: *entry.key(),
1750 source_node: record.state.source_node(),
1751 target_node: record.state.target_node(),
1752 phase: record.state.phase(),
1753 elapsed_ms: elapsed,
1754 age_in_phase_ms: record.state.age_in_phase_ms(),
1755 snapshot_bytes: record.state.snapshot_size_bytes(),
1756 retries: record.state.retry_count(),
1757 }
1758 })
1759 .collect()
1760 }
1761
1762 /// Number of active migrations.
1763 pub fn active_count(&self) -> usize {
1764 self.migrations.len()
1765 }
1766}
1767
1768impl std::fmt::Debug for MigrationOrchestrator {
1769 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1770 f.debug_struct("MigrationOrchestrator")
1771 .field("active_migrations", &self.migrations.len())
1772 .field("local_node_id", &format!("{:#x}", self.local_node_id))
1773 .finish()
1774 }
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779 use super::*;
1780 use crate::adapter::net::behavior::capability::CapabilityFilter;
1781 use crate::adapter::net::compute::{
1782 DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
1783 };
1784 use crate::adapter::net::identity::EntityKeypair;
1785 use bytes::{BufMut, Bytes};
1786
1787 /// Build a minimal chunk-0-shaped buffer:
1788 /// `[magic(4)][version(1)][entity_id(32)][... padding ...]`.
1789 /// Validator only looks at the first 37 bytes; padding to
1790 /// 64 is for visual clarity.
1791 fn synth_chunk_header(magic: &[u8; 4], version: u8, entity_bytes: &[u8; 32]) -> Vec<u8> {
1792 let mut buf = Vec::with_capacity(64);
1793 buf.extend_from_slice(magic);
1794 buf.push(version);
1795 buf.extend_from_slice(entity_bytes);
1796 buf.extend_from_slice(&[0u8; 27]);
1797 buf
1798 }
1799
1800 #[test]
1801 fn validate_chunk_header_accepts_well_formed_chunk_0() {
1802 let kp = EntityKeypair::generate();
1803 let bytes = synth_chunk_header(
1804 SNAPSHOT_WIRE_MAGIC,
1805 SNAPSHOT_VERSION,
1806 kp.entity_id().as_bytes(),
1807 );
1808 let res = validate_chunk_header(0, &bytes, kp.origin_hash());
1809 assert!(res.is_ok(), "well-formed chunk 0 must validate: {res:?}");
1810 }
1811
1812 #[test]
1813 fn validate_chunk_header_rejects_wrong_magic() {
1814 let kp = EntityKeypair::generate();
1815 let bytes = synth_chunk_header(b"XXXX", SNAPSHOT_VERSION, kp.entity_id().as_bytes());
1816 let err = validate_chunk_header(0, &bytes, kp.origin_hash()).unwrap_err();
1817 assert!(matches!(err, MigrationError::StateFailed(_)));
1818 }
1819
1820 #[test]
1821 fn validate_chunk_header_rejects_wrong_version() {
1822 let kp = EntityKeypair::generate();
1823 let bytes = synth_chunk_header(SNAPSHOT_WIRE_MAGIC, 0xFE, kp.entity_id().as_bytes());
1824 let err = validate_chunk_header(0, &bytes, kp.origin_hash()).unwrap_err();
1825 assert!(matches!(err, MigrationError::StateFailed(_)));
1826 }
1827
1828 #[test]
1829 fn validate_chunk_header_rejects_wrong_origin_claim() {
1830 let kp_a = EntityKeypair::generate();
1831 let kp_b = EntityKeypair::generate();
1832 let bytes = synth_chunk_header(
1833 SNAPSHOT_WIRE_MAGIC,
1834 SNAPSHOT_VERSION,
1835 kp_a.entity_id().as_bytes(),
1836 );
1837 // Expect kp_b's origin but the chunk carries kp_a's entity.
1838 let err = validate_chunk_header(0, &bytes, kp_b.origin_hash()).unwrap_err();
1839 assert!(matches!(err, MigrationError::StateFailed(_)));
1840 }
1841
1842 #[test]
1843 fn validate_chunk_header_rejects_too_short_chunk_0() {
1844 let bytes = [0u8; 12]; // < 37 envelope-min
1845 let err = validate_chunk_header(0, &bytes, 0xDEAD_BEEF).unwrap_err();
1846 assert!(matches!(err, MigrationError::StateFailed(_)));
1847 }
1848
1849 #[test]
1850 fn validate_chunk_header_passes_non_chunk_0_unchecked() {
1851 // Subsequent chunks carry no envelope — validator no-ops.
1852 let bytes = [0u8; 8]; // arbitrary short payload
1853 for idx in [1u32, 2, 47, u32::MAX] {
1854 let res = validate_chunk_header(idx, &bytes, 0xDEAD_BEEF);
1855 assert!(res.is_ok(), "chunk_index {idx} must pass through");
1856 }
1857 }
1858
1859 struct CounterDaemon {
1860 count: u64,
1861 }
1862
1863 impl CounterDaemon {
1864 fn new() -> Self {
1865 Self { count: 0 }
1866 }
1867 }
1868
1869 impl MeshDaemon for CounterDaemon {
1870 fn name(&self) -> &str {
1871 "counter"
1872 }
1873 fn requirements(&self) -> CapabilityFilter {
1874 CapabilityFilter::default()
1875 }
1876 fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
1877 self.count += 1;
1878 Ok(vec![Bytes::from(self.count.to_le_bytes().to_vec())])
1879 }
1880 fn snapshot(&self) -> Option<Bytes> {
1881 Some(Bytes::from(self.count.to_le_bytes().to_vec()))
1882 }
1883 fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
1884 if state.len() != 8 {
1885 return Err(DaemonError::RestoreFailed("bad state size".into()));
1886 }
1887 self.count = u64::from_le_bytes(state[..8].try_into().unwrap());
1888 Ok(())
1889 }
1890 }
1891
1892 fn setup_registry() -> (Arc<DaemonRegistry>, u64) {
1893 let reg = Arc::new(DaemonRegistry::new());
1894 let kp = EntityKeypair::generate();
1895 let origin = kp.origin_hash();
1896 let host = DaemonHost::new(
1897 Box::new(CounterDaemon::new()),
1898 kp,
1899 DaemonHostConfig::default(),
1900 );
1901 reg.register(host).unwrap();
1902 (reg, origin)
1903 }
1904
1905 /// CR-32: pin that the unwired-source-handler path emits a
1906 /// loud `tracing::warn!` referencing `source_handler`. Pre-fix
1907 /// this path silently fell back to `daemon_registry.snapshot`
1908 /// — events arriving between snapshot capture and cutover got
1909 /// lost.
1910 ///
1911 /// History: an earlier attempt added a `cfg(not(test))` gate
1912 /// that returned `Err` in production. That broke integration
1913 /// tests because they link against the library compiled
1914 /// WITHOUT `cfg(test)`. Reverted to warn-loud + still-fall-
1915 /// back. Production callers wire `source_handler` via
1916 /// `MigrationDispatcher::new`; the warn fires only on direct
1917 /// orchestrator construction (tests / SDK consumers who skip
1918 /// the dispatcher).
1919 ///
1920 /// Tripwire pins the warn-message shape so a future maintainer
1921 /// who removes the `tracing::warn!` (or drops the
1922 /// `source_handler` reference) trips the test.
1923 #[test]
1924 fn cr32_unwired_source_handler_must_emit_loud_warn() {
1925 let src = include_str!("orchestrator.rs");
1926
1927 // Anchor on a string we BUILD at runtime so this test's
1928 // own source doesn't contain the verbatim anchor —
1929 // otherwise `src.find(anchor)` would match the test's own
1930 // literal and the test would silently pass even after the
1931 // production warn block was removed.
1932 //
1933 // The runtime-assembled anchor only matches the
1934 // production comment line, which writes the marker as
1935 // a plain English phrase rather than concatenating
1936 // fragments.
1937 let anchor = format!("{}{}{}", "Surface ", "the unwired-", "source-handler");
1938 let anchor_idx = src.find(&anchor).expect(
1939 "regression: the production unwired-source-handler marker in \
1940 start_migration's None arm is gone — either the fix was reverted or \
1941 the comment was rewritten. If the fix is intentionally being changed, \
1942 update this test.",
1943 );
1944
1945 // Sanity: the anchor must occur exactly ONCE so the
1946 // production block is the only match. The earlier shape
1947 // would falsely pass if the anchor existed anywhere else
1948 // in the file, including in this test's own source.
1949 let occurrences = src.matches(&anchor).count();
1950 assert_eq!(
1951 occurrences, 1,
1952 "anchor must occur exactly once in orchestrator.rs (production \
1953 site). Got {occurrences} occurrences — the test source likely contains \
1954 a verbatim copy of the anchor, defeating the tripwire."
1955 );
1956
1957 let block: String = src[anchor_idx..]
1958 .lines()
1959 .take(20)
1960 .collect::<Vec<_>>()
1961 .join("\n");
1962 assert!(
1963 block.contains("tracing::warn!"),
1964 "regression: unwired-source-handler path must emit \
1965 tracing::warn!. Block:\n{}",
1966 block
1967 );
1968 assert!(
1969 block.contains("source_handler"),
1970 "regression: warn message must reference source_handler"
1971 );
1972 assert!(
1973 block.contains("MigrationDispatcher::new") || block.contains("with_source_handler"),
1974 "regression: warn message must point operators at how to wire \
1975 the handler (`MigrationDispatcher::new` or \
1976 `MigrationOrchestrator::with_source_handler`) so the log line is \
1977 actionable. Block:\n{}",
1978 block
1979 );
1980 }
1981
1982 #[test]
1983 fn test_start_migration_local_source() {
1984 let (reg, origin) = setup_registry();
1985 let orch = MigrationOrchestrator::new(reg, 0x1111);
1986
1987 let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
1988 assert!(!msgs.is_empty(), "must emit at least one chunk");
1989 match &msgs[0] {
1990 MigrationMessage::SnapshotReady { daemon_origin, .. } => {
1991 assert_eq!(*daemon_origin, origin);
1992 }
1993 other => panic!("expected SnapshotReady, got {:?}", other),
1994 }
1995
1996 assert!(orch.is_migrating(origin));
1997 assert_eq!(orch.status(origin), Some(MigrationPhase::Transfer));
1998 }
1999
2000 /// Regression: pre-fix the local-source path called
2001 /// `daemon_registry.snapshot()` directly and never invoked
2002 /// `MigrationSourceHandler::start_snapshot`. The source-side
2003 /// handler had no record of the migration; `is_migrating(origin)`
2004 /// returned false; callers that consulted the source handler to
2005 /// gate their buffering / write-rejection paths skipped them.
2006 /// At cutover, `on_cutover` returned `DaemonNotFound` and the
2007 /// dispatcher's tolerance fallback swallowed any buffered
2008 /// events that *might* have been collected. The fix wires the
2009 /// orchestrator to the source handler via `with_source_handler`
2010 /// and routes the local-source path through
2011 /// `source_handler.start_snapshot`. Pin both halves of the
2012 /// post-fix invariant directly.
2013 #[test]
2014 fn local_source_migration_registers_in_source_handler() {
2015 let (reg, origin) = setup_registry();
2016 let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
2017 let orch =
2018 MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
2019
2020 // Pre-condition: no migration registered anywhere.
2021 assert!(!source_handler.is_migrating(origin));
2022 assert!(!orch.is_migrating(origin));
2023
2024 let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2025
2026 // Post-condition: BOTH the orchestrator and the source
2027 // handler have records of the migration. Pre-fix only the
2028 // orchestrator had a record; the source handler's
2029 // `is_migrating(origin)` returned false (the
2030 // failure mode).
2031 assert!(
2032 source_handler.is_migrating(origin),
2033 "regression: source_handler must have a record \
2034 of the local-source migration after `start_migration` \
2035 returns",
2036 );
2037 assert!(orch.is_migrating(origin));
2038 }
2039
2040 /// Regression: pin that the dispatcher's cutover path now finds
2041 /// a real `source_handler` record for local-source migrations
2042 /// and correctly drains buffered events. Pre-fix `on_cutover`
2043 /// returned `DaemonNotFound` for any local-source migration
2044 /// (the orchestrator never called `start_snapshot`), and the
2045 /// dispatcher's tolerance fallback (`migration_handler.rs:537`)
2046 /// swallowed the error and treated the cutover as having no
2047 /// buffered events to forward. Post-fix `on_cutover` finds the
2048 /// record and returns the buffered events for forwarding.
2049 /// A future refactor that drops the `start_snapshot` wire-up
2050 /// in the orchestrator's local branch would silently regress
2051 /// this end-to-end drain — this test pins it directly.
2052 #[test]
2053 fn local_source_cutover_drains_buffered_events_through_source_handler() {
2054 use crate::adapter::net::state::causal::CausalEvent;
2055 use bytes::Bytes;
2056
2057 let (reg, origin) = setup_registry();
2058 let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
2059 let orch =
2060 MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
2061
2062 let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2063
2064 // Buffer two events through the source handler — the
2065 // dispatcher will drain these on cutover.
2066 for seq in 1..=2u64 {
2067 let event = CausalEvent {
2068 link: CausalLink {
2069 origin_hash: origin,
2070 horizon_encoded: 0,
2071 sequence: seq,
2072 parent_hash: 0,
2073 },
2074 payload: Bytes::from_static(b"buffered"),
2075 received_at: 0,
2076 };
2077 assert!(source_handler.buffer_event(origin, event).unwrap());
2078 }
2079
2080 let drained = source_handler
2081 .on_cutover(origin)
2082 .expect("post-fix on_cutover must find the local-source migration record");
2083 assert_eq!(
2084 drained.len(),
2085 2,
2086 "cutover must drain the buffered events for forwarding to target — \
2087 pre-fix this returned `DaemonNotFound` for local-source migrations \
2088 and the buffered events were silently lost",
2089 );
2090 }
2091
2092 /// Regression: with the source handler registered,
2093 /// `source_handler.buffer_event` is now invokable for a
2094 /// local-source migration. Pre-fix it returned `Ok(false)`
2095 /// ("no migration active") because `start_snapshot` had never
2096 /// run. Pin the fix-enabled-functionality directly: a caller
2097 /// that funnels post-snapshot events through
2098 /// `source_handler.buffer_event` gets them buffered (and
2099 /// drainable at cutover via `on_cutover`).
2100 #[test]
2101 fn local_source_migration_enables_source_handler_buffering() {
2102 use crate::adapter::net::state::causal::CausalEvent;
2103 use bytes::Bytes;
2104
2105 let (reg, origin) = setup_registry();
2106 let source_handler = Arc::new(MigrationSourceHandler::new(reg.clone()));
2107 let orch =
2108 MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source_handler.clone());
2109
2110 let _ = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2111
2112 // Now post-snapshot events can be buffered via the source
2113 // handler. Pre-fix this returned `Ok(false)` because
2114 // `is_migrating(origin)` was false.
2115 let event = CausalEvent {
2116 link: CausalLink {
2117 origin_hash: origin,
2118 horizon_encoded: 0,
2119 sequence: 1,
2120 parent_hash: 0,
2121 },
2122 payload: Bytes::from_static(b"post-snapshot event"),
2123 received_at: 0,
2124 };
2125 let buffered = source_handler.buffer_event(origin, event).unwrap();
2126 assert!(
2127 buffered,
2128 "fix must enable source-handler buffering for \
2129 local-source migrations — pre-fix `buffer_event` returned \
2130 `Ok(false)` because the migration was never registered",
2131 );
2132
2133 let drained = source_handler.take_buffered_events(origin).unwrap();
2134 assert_eq!(
2135 drained.len(),
2136 1,
2137 "buffered event must be drainable through the source handler",
2138 );
2139 }
2140
2141 #[test]
2142 fn test_start_migration_remote_source() {
2143 let (reg, origin) = setup_registry();
2144 let orch = MigrationOrchestrator::new(reg, 0x3333);
2145
2146 let msgs = orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2147 assert_eq!(
2148 msgs.len(),
2149 1,
2150 "remote-source path emits exactly one TakeSnapshot"
2151 );
2152 match &msgs[0] {
2153 MigrationMessage::TakeSnapshot {
2154 daemon_origin,
2155 target_node,
2156 } => {
2157 assert_eq!(*daemon_origin, origin);
2158 assert_eq!(*target_node, 0x2222);
2159 }
2160 other => panic!("expected TakeSnapshot, got {:?}", other),
2161 }
2162
2163 assert_eq!(orch.status(origin), Some(MigrationPhase::Snapshot));
2164 }
2165
2166 #[test]
2167 fn test_duplicate_migration_rejected() {
2168 let (reg, origin) = setup_registry();
2169 let orch = MigrationOrchestrator::new(reg, 0x1111);
2170
2171 orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2172 let err = orch.start_migration(origin, 0x1111, 0x3333).unwrap_err();
2173 assert_eq!(err, MigrationError::AlreadyMigrating(origin));
2174 }
2175
2176 /// Regression: `start_migration_auto` returns
2177 /// `MigrationError::NoTargetAvailable` (not
2178 /// `TargetUnavailable(0)`) when the scheduler finds no
2179 /// candidate satisfying the daemon's capability filter.
2180 /// Pre-fix the auto path constructed
2181 /// `TargetUnavailable(0)`, surfacing "target node 0x0
2182 /// unavailable" to operators — confusing because no
2183 /// specific node id was ever attempted; the auto-placement
2184 /// found nobody to attempt against.
2185 #[test]
2186 fn start_migration_auto_returns_no_target_available_when_scheduler_finds_nothing() {
2187 use crate::adapter::net::behavior::capability::CapabilitySet;
2188 use crate::adapter::net::behavior::fold::{CapabilityFold, Fold};
2189
2190 let (reg, origin) = setup_registry();
2191 let orch = MigrationOrchestrator::new(reg, 0x1111);
2192
2193 // Empty fold — no candidate nodes anywhere.
2194 let fold: Arc<Fold<CapabilityFold>> =
2195 Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
2196 let scheduler = super::super::Scheduler::new(fold, 0x1111, CapabilitySet::default());
2197
2198 // A filter that nothing in the empty index can satisfy.
2199 let filter = CapabilityFilter::default();
2200
2201 let err = orch
2202 .start_migration_auto(origin, 0x1111, &scheduler, &filter)
2203 .unwrap_err();
2204 assert_eq!(
2205 err,
2206 MigrationError::NoTargetAvailable,
2207 "auto-placement with no candidates must surface as \
2208 NoTargetAvailable, not TargetUnavailable(0). The 0 \
2209 was a fake node id that pre-fix appeared in operator \
2210 error logs as `target node 0x0 unavailable`."
2211 );
2212 }
2213
2214 #[test]
2215 fn test_abort_migration() {
2216 let (reg, origin) = setup_registry();
2217 let orch = MigrationOrchestrator::new(reg, 0x1111);
2218
2219 orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2220 assert!(orch.is_migrating(origin));
2221
2222 let msg = orch.abort_migration(origin, "test abort".into()).unwrap();
2223 match msg {
2224 MigrationMessage::MigrationFailed { reason, .. } => {
2225 // `abort_migration` wraps its string in `StateFailed`.
2226 match reason {
2227 MigrationFailureReason::StateFailed(msg) => {
2228 assert_eq!(msg, "test abort")
2229 }
2230 other => panic!("expected StateFailed, got {other:?}"),
2231 }
2232 }
2233 _ => panic!("expected MigrationFailed"),
2234 }
2235
2236 assert!(!orch.is_migrating(origin));
2237 }
2238
2239 /// Regression: pre-fix `abort_migration_with_reason` only
2240 /// removed the orchestrator's record. The matching
2241 /// `MigrationSourceHandler` entry stayed put, so
2242 /// `is_migrating()` on the source remained `true`,
2243 /// `buffer_event` kept appending to a never-drained vector,
2244 /// and a retry trip-tested `AlreadyMigrating` against a
2245 /// migration the orchestrator believed was aborted. The fix
2246 /// calls `source.abort` from the orchestrator's abort path.
2247 #[test]
2248 fn abort_migration_propagates_to_source_handler() {
2249 use crate::adapter::net::compute::migration_source::MigrationSourceHandler;
2250 let (reg, origin) = setup_registry();
2251 let source = Arc::new(MigrationSourceHandler::new(reg.clone()));
2252 let orch = MigrationOrchestrator::new(reg, 0x1111).with_source_handler(source.clone());
2253
2254 // Stand up the migration end-to-end via the orchestrator's
2255 // `start_migration`, which records on BOTH sides (the
2256 // orchestrator's record AND the source handler's mirror).
2257 // Pre-fix the orchestrator's `abort_migration_with_reason`
2258 // only cleared its own record, leaving the source mirror
2259 // intact.
2260 orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2261 assert!(
2262 orch.is_migrating(origin),
2263 "orchestrator records the migration"
2264 );
2265 assert!(
2266 source.is_migrating(origin),
2267 "source handler also records the migration via the orchestrator",
2268 );
2269
2270 // Abort. Both sides must clear.
2271 orch.abort_migration(origin, "test abort".into()).unwrap();
2272 assert!(
2273 !orch.is_migrating(origin),
2274 "orchestrator must clear its record"
2275 );
2276 assert!(
2277 !source.is_migrating(origin),
2278 "source handler must also clear its mirror entry on abort",
2279 );
2280
2281 // The decisive sealed property: a fresh
2282 // `start_migration` for the same daemon now succeeds.
2283 // Pre-fix this would `AlreadyMigrating` because the source
2284 // handler still tracked the daemon.
2285 orch.start_migration(origin, 0x1111, 0x3333).unwrap();
2286 }
2287
2288 /// After deleting `MigrationOrchestrator::buffer_event` (the
2289 /// dead surface that no production caller ever used),
2290 /// `on_restore_complete` ships a `MigrationMessage` that
2291 /// always carries no buffered events. The source-side
2292 /// `MigrationSourceHandler::on_cutover` is the only path
2293 /// that drains real buffered events; the orchestrator
2294 /// surface is dead and removed.
2295 #[test]
2296 fn on_restore_complete_ships_no_buffered_events() {
2297 let (reg, origin) = setup_registry();
2298 let orch = MigrationOrchestrator::new(reg, 0x3333);
2299
2300 orch.start_migration(origin, 0x1111, 0x2222).unwrap();
2301 // Drive Snapshot → Transfer → Restore by force-phase so
2302 // we don't need a real source-side handshake.
2303 {
2304 let entry = orch.migrations.get(&origin).unwrap();
2305 let mut record = entry.lock();
2306 record.state.force_phase(MigrationPhase::Restore);
2307 }
2308
2309 let result = orch.on_restore_complete(origin, 0).unwrap();
2310 assert!(
2311 result.is_none(),
2312 "on_restore_complete must never emit BufferedEvents — \
2313 that buffer is exclusively a source-handler surface"
2314 );
2315 }
2316
2317 #[test]
2318 fn test_wire_roundtrip_take_snapshot() {
2319 let msg = MigrationMessage::TakeSnapshot {
2320 daemon_origin: 0xAAAA,
2321 target_node: 0x2222,
2322 };
2323 let encoded = wire::encode(&msg).unwrap();
2324 let decoded = wire::decode(&encoded).unwrap();
2325 match decoded {
2326 MigrationMessage::TakeSnapshot {
2327 daemon_origin,
2328 target_node,
2329 } => {
2330 assert_eq!(daemon_origin, 0xAAAA);
2331 assert_eq!(target_node, 0x2222);
2332 }
2333 _ => panic!("expected TakeSnapshot"),
2334 }
2335 }
2336
2337 #[test]
2338 fn test_wire_roundtrip_snapshot_ready() {
2339 let msg = MigrationMessage::SnapshotReady {
2340 daemon_origin: 0xBBBB,
2341 snapshot_bytes: vec![1, 2, 3, 4, 5],
2342 seq_through: 42,
2343 chunk_index: 0,
2344 total_chunks: 1,
2345 };
2346 let encoded = wire::encode(&msg).unwrap();
2347 let decoded = wire::decode(&encoded).unwrap();
2348 match decoded {
2349 MigrationMessage::SnapshotReady {
2350 daemon_origin,
2351 snapshot_bytes,
2352 seq_through,
2353 chunk_index,
2354 total_chunks,
2355 } => {
2356 assert_eq!(daemon_origin, 0xBBBB);
2357 assert_eq!(snapshot_bytes, vec![1, 2, 3, 4, 5]);
2358 assert_eq!(seq_through, 42);
2359 assert_eq!(chunk_index, 0);
2360 assert_eq!(total_chunks, 1);
2361 }
2362 _ => panic!("expected SnapshotReady"),
2363 }
2364 }
2365
2366 #[test]
2367 fn test_chunk_snapshot_small() {
2368 let chunks = chunk_snapshot(0xAAAA, vec![1, 2, 3], 10).unwrap();
2369 assert_eq!(chunks.len(), 1);
2370 match &chunks[0] {
2371 MigrationMessage::SnapshotReady {
2372 chunk_index,
2373 total_chunks,
2374 snapshot_bytes,
2375 ..
2376 } => {
2377 assert_eq!(*chunk_index, 0);
2378 assert_eq!(*total_chunks, 1);
2379 assert_eq!(snapshot_bytes, &[1, 2, 3]);
2380 }
2381 _ => panic!("expected SnapshotReady"),
2382 }
2383 }
2384
2385 #[test]
2386 fn test_chunk_snapshot_large() {
2387 // Create a snapshot larger than MAX_SNAPSHOT_CHUNK_SIZE
2388 let big = vec![0xABu8; MAX_SNAPSHOT_CHUNK_SIZE * 3 + 100];
2389 let total_len = big.len();
2390 let chunks = chunk_snapshot(0xBBBB, big, 42).unwrap();
2391
2392 assert_eq!(chunks.len(), 4); // 3 full + 1 partial
2393
2394 // Verify chunk metadata
2395 for (i, chunk) in chunks.iter().enumerate() {
2396 match chunk {
2397 MigrationMessage::SnapshotReady {
2398 chunk_index,
2399 total_chunks,
2400 daemon_origin,
2401 seq_through,
2402 ..
2403 } => {
2404 assert_eq!(*chunk_index, i as u32);
2405 assert_eq!(*total_chunks, 4);
2406 assert_eq!(*daemon_origin, 0xBBBB);
2407 assert_eq!(*seq_through, 42);
2408 }
2409 _ => panic!("expected SnapshotReady"),
2410 }
2411 }
2412
2413 // Verify reassembly
2414 let mut reassembler = SnapshotReassembler::new();
2415 for chunk in chunks {
2416 if let MigrationMessage::SnapshotReady {
2417 daemon_origin,
2418 snapshot_bytes,
2419 seq_through,
2420 chunk_index,
2421 total_chunks,
2422 } = chunk
2423 {
2424 let result = reassembler
2425 .feed(
2426 daemon_origin,
2427 snapshot_bytes,
2428 seq_through,
2429 chunk_index,
2430 total_chunks,
2431 )
2432 .expect("legitimate chunks must not be rejected");
2433 if chunk_index < total_chunks - 1 {
2434 assert!(result.is_none());
2435 } else {
2436 let full = result.expect("last chunk should complete reassembly");
2437 assert_eq!(full.len(), total_len);
2438 assert!(full.iter().all(|&b| b == 0xAB));
2439 }
2440 }
2441 }
2442 }
2443
2444 #[test]
2445 fn test_reassembler_cancel() {
2446 let mut reassembler = SnapshotReassembler::new();
2447 reassembler.feed(0xAAAA, vec![1, 2], 10, 0, 3).unwrap();
2448 assert_eq!(reassembler.pending_count(), 1);
2449 reassembler.cancel(0xAAAA);
2450 assert_eq!(reassembler.pending_count(), 0);
2451 }
2452
2453 // ---- Regression tests: SnapshotReassembler DoS / forgery holes ----
2454
2455 #[test]
2456 fn test_regression_reassembler_rejects_chunk_index_out_of_range() {
2457 // Regression: feed() never checked that `chunk_index < total_chunks`,
2458 // so an attacker could declare total_chunks=3 and feed indices
2459 // {0, 5, 7}. The BTreeMap happily stored them, `chunks.len() == 3 ==
2460 // total_chunks` fired "complete", and the reassembler concatenated
2461 // three non-contiguous chunks as if they were chunks 0,1,2 —
2462 // silently forging a snapshot from attacker-chosen partial content.
2463 //
2464 // Fix: feed() rejects any chunk with `chunk_index >= total_chunks`
2465 // before touching state.
2466 let mut reassembler = SnapshotReassembler::new();
2467
2468 let r0 = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3);
2469 assert!(r0.is_ok(), "in-range chunk must be accepted: {:?}", r0);
2470
2471 let forged = reassembler.feed(0xAAAA, vec![2; 10], 1, 5, 3);
2472 assert!(
2473 matches!(
2474 forged,
2475 Err(ReassemblyError::ChunkIndexOutOfRange {
2476 chunk_index: 5,
2477 total_chunks: 3,
2478 })
2479 ),
2480 "chunk_index=5 with total_chunks=3 must be rejected, got {:?}",
2481 forged
2482 );
2483
2484 // The reassembly must not have "completed" from the forged chunk —
2485 // still waiting for real chunks 1 and 2.
2486 assert_eq!(
2487 reassembler.pending_count(),
2488 1,
2489 "state must stay in-flight after rejected chunk"
2490 );
2491 }
2492
2493 #[test]
2494 fn test_regression_reassembler_rejects_zero_total_chunks() {
2495 // Regression: total_chunks == 0 created a ReassemblyState that
2496 // could never complete (len check 0 == 0 never true after the
2497 // first insert), leaking memory. Fix: reject at the entry point.
2498 let mut reassembler = SnapshotReassembler::new();
2499 let result = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 0);
2500 assert!(matches!(result, Err(ReassemblyError::ZeroTotalChunks)));
2501 assert_eq!(reassembler.pending_count(), 0);
2502 }
2503
2504 #[test]
2505 fn test_regression_reassembler_caps_total_chunks() {
2506 // Regression: an attacker could declare total_chunks = u32::MAX
2507 // and flood the BTreeMap with up to ~4B insertions before any
2508 // completion check would fire. Fix: cap total_chunks at
2509 // MAX_TOTAL_CHUNKS (well above any legitimate snapshot).
2510 let mut reassembler = SnapshotReassembler::new();
2511 let result = reassembler.feed(0xAAAA, vec![1; 10], 1, 0, u32::MAX);
2512 assert!(matches!(
2513 result,
2514 Err(ReassemblyError::TotalChunksTooLarge {
2515 total_chunks: u32::MAX
2516 })
2517 ));
2518 assert_eq!(reassembler.pending_count(), 0);
2519 }
2520
2521 #[test]
2522 fn test_regression_reassembler_rejects_oversized_chunk() {
2523 // Defense in depth: even if the transport framing lets a larger
2524 // payload through, the reassembler refuses a single chunk bigger
2525 // than MAX_SNAPSHOT_CHUNK_SIZE.
2526 let mut reassembler = SnapshotReassembler::new();
2527 let oversized = vec![0u8; MAX_SNAPSHOT_CHUNK_SIZE + 1];
2528 let result = reassembler.feed(0xAAAA, oversized, 1, 0, 3);
2529 assert!(
2530 matches!(result, Err(ReassemblyError::ChunkTooLarge { .. })),
2531 "got {:?}",
2532 result
2533 );
2534 }
2535
2536 #[test]
2537 fn test_regression_reassembler_rejects_total_chunks_mismatch() {
2538 // Regression: an attacker who opened a reassembly with
2539 // total_chunks=3 could send a later chunk declaring total_chunks=100
2540 // and the code would just keep inserting. Fix: the first chunk's
2541 // total_chunks is locked in; later chunks must agree.
2542 let mut reassembler = SnapshotReassembler::new();
2543 reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
2544 let result = reassembler.feed(0xAAAA, vec![2; 10], 1, 1, 100);
2545 assert!(
2546 matches!(
2547 result,
2548 Err(ReassemblyError::TotalChunksMismatch {
2549 got: 100,
2550 expected: 3,
2551 })
2552 ),
2553 "got {:?}",
2554 result
2555 );
2556 assert_eq!(reassembler.pending_count(), 1);
2557 }
2558
2559 /// Zero-byte chunks are rejected at the boundary. Pre-fix
2560 /// `MAX_TOTAL_CHUNKS = 700_000` zero-byte chunks could be
2561 /// admitted per reassembly without the byte-budget cap firing —
2562 /// nonsensical for legitimate snapshots and a cheap way to
2563 /// inflate BTreeMap bookkeeping.
2564 #[test]
2565 fn reassembler_refuses_zero_byte_chunk() {
2566 let mut reassembler = SnapshotReassembler::new();
2567 let result = reassembler.feed(0xAAAA, vec![], 1, 0, 3);
2568 assert!(
2569 matches!(result, Err(ReassemblyError::ChunkTooLarge { len: 0 })),
2570 "got {:?}",
2571 result
2572 );
2573 assert_eq!(reassembler.pending_count(), 0);
2574 }
2575
2576 /// The total_chunks==1 fast path must not bypass the
2577 /// total_chunks-mismatch guard. A peer that opened reassembly
2578 /// with chunk 0/3 for `(daemon, seq)` could otherwise follow up
2579 /// with chunk 0/1 for the same key and have the second payload
2580 /// accepted as a complete snapshot — substituting its content
2581 /// for the in-flight multi-chunk one. The fast path now consults
2582 /// `pending` first and surfaces TotalChunksMismatch when the
2583 /// declared total disagrees with the in-flight state.
2584 #[test]
2585 fn fast_path_rejects_single_chunk_after_multi_chunk_state() {
2586 let mut reassembler = SnapshotReassembler::new();
2587 // Open with chunk 0/3.
2588 reassembler.feed(0xAAAA, vec![1; 10], 7, 0, 3).unwrap();
2589 // Attacker follows up declaring total_chunks=1 for same key.
2590 let result = reassembler.feed(0xAAAA, vec![2; 10], 7, 0, 1);
2591 assert!(
2592 matches!(
2593 result,
2594 Err(ReassemblyError::TotalChunksMismatch {
2595 got: 1,
2596 expected: 3,
2597 })
2598 ),
2599 "fast path must refuse substitution; got {:?}",
2600 result
2601 );
2602 // The original in-flight reassembly must still exist; the
2603 // attempted substitution must not have evicted it.
2604 assert_eq!(reassembler.pending_count(), 1);
2605 }
2606
2607 #[test]
2608 fn test_regression_reassembler_evicts_older_seq_per_daemon() {
2609 // Regression: `pending` was keyed by (daemon_origin, seq_through)
2610 // and a fresh seq_through did NOT evict older pending reassemblies
2611 // for the same daemon. A peer could open unbounded in-flight
2612 // entries by incrementing seq_through forever.
2613 //
2614 // Fix: at most one in-flight reassembly per daemon. A newer
2615 // seq_through evicts older ones; older seq_through values are
2616 // rejected as stale.
2617 let mut reassembler = SnapshotReassembler::new();
2618
2619 reassembler.feed(0xAAAA, vec![1; 10], 10, 0, 3).unwrap();
2620 reassembler.feed(0xAAAA, vec![1; 10], 11, 0, 3).unwrap();
2621 reassembler.feed(0xAAAA, vec![1; 10], 12, 0, 3).unwrap();
2622
2623 assert_eq!(
2624 reassembler.pending_count(),
2625 1,
2626 "only the newest seq_through for a daemon should remain in flight"
2627 );
2628
2629 // A stale seq_through is rejected — not silently dropped on the floor.
2630 let stale = reassembler.feed(0xAAAA, vec![1; 10], 5, 0, 3);
2631 assert!(
2632 matches!(
2633 stale,
2634 Err(ReassemblyError::StaleSeqThrough { got: 5, latest: 12 })
2635 ),
2636 "stale seq_through must be rejected, got {:?}",
2637 stale
2638 );
2639 assert_eq!(reassembler.pending_count(), 1);
2640 }
2641
2642 /// Regression: a peer-driven reassembly that declares a large
2643 /// `total_chunks` and ships chunks just up to the per-entry
2644 /// byte cap is refused, rather than silently parking memory
2645 /// indefinitely. Pre-fix `MAX_TOTAL_CHUNKS × MAX_SNAPSHOT_CHUNK_SIZE`
2646 /// could buffer ~4.3 GiB per `(origin, seq)` key forever
2647 /// because the eviction at `seq_through > latest` doesn't fire
2648 /// when an attacker re-uses the same `seq_through`.
2649 #[test]
2650 fn reassembler_refuses_chunk_that_overflows_pending_byte_cap() {
2651 let mut reassembler = SnapshotReassembler::new();
2652
2653 // Pre-fill to just under the cap. Each chunk is the max
2654 // legal chunk size; we send unique indices so no chunk is
2655 // displaced.
2656 let chunk_full = vec![0xCCu8; MAX_SNAPSHOT_CHUNK_SIZE];
2657 let chunks_to_fill = MAX_PENDING_REASSEMBLY_BYTES / MAX_SNAPSHOT_CHUNK_SIZE;
2658 // Choose a `total_chunks` that fits the prefill + at least
2659 // two more — so the entry is still incomplete after prefill
2660 // and the next chunk lands in the same key.
2661 let total_chunks = (chunks_to_fill as u32) + 2;
2662 for i in 0..(chunks_to_fill as u32) {
2663 reassembler
2664 .feed(0xAAAA, chunk_full.clone(), 1, i, total_chunks)
2665 .unwrap();
2666 }
2667
2668 // The next chunk would push buffered past the cap. It must
2669 // be refused with `TooManyPendingBytes`, not silently
2670 // accepted.
2671 let next_idx = chunks_to_fill as u32;
2672 let result = reassembler.feed(0xAAAA, chunk_full.clone(), 1, next_idx, total_chunks);
2673 assert!(
2674 matches!(result, Err(ReassemblyError::TooManyPendingBytes { .. })),
2675 "chunk that would overflow the per-entry cap must be refused, got {:?}",
2676 result,
2677 );
2678
2679 // Re-sending an index that is ALREADY buffered must succeed
2680 // (the displaced chunk's bytes are subtracted before the cap
2681 // re-check). Pin this so the cap doesn't break legitimate
2682 // duplicate-chunk delivery.
2683 let resend = reassembler.feed(0xAAAA, chunk_full.clone(), 1, 0, total_chunks);
2684 assert!(
2685 resend.is_ok(),
2686 "re-sending an already-buffered chunk index must succeed, got {:?}",
2687 resend
2688 );
2689 }
2690
2691 /// Regression: an entry parked at the per-entry byte cap could
2692 /// stay in `pending` forever because the `seq_through > latest`
2693 /// eviction never fires while a hostile peer re-uses the same
2694 /// `seq_through`. The age sweep is the second line of defense:
2695 /// any entry whose last-progress is older than `max_age` is
2696 /// dropped on the next `sweep_stale` call.
2697 #[test]
2698 fn reassembler_sweep_stale_drops_quiet_entries() {
2699 let mut reassembler = SnapshotReassembler::new();
2700 reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
2701 assert_eq!(reassembler.pending_count(), 1);
2702
2703 // Wait until the entry's last_progress_at is older than the
2704 // sweep age, then sweep.
2705 std::thread::sleep(Duration::from_millis(20));
2706 let evicted = reassembler.sweep_stale(Duration::from_millis(10));
2707 assert_eq!(evicted, 1, "stale entry must be evicted");
2708 assert_eq!(reassembler.pending_count(), 0);
2709 }
2710
2711 /// Pin the slow-but-progressing legitimate peer case: every
2712 /// chunk that lands resets `last_progress_at`, so the sweep
2713 /// only kills entries that have actually gone quiet — not ones
2714 /// that are simply slow to receive every chunk.
2715 #[test]
2716 fn reassembler_sweep_keeps_progressing_entries() {
2717 let mut reassembler = SnapshotReassembler::new();
2718 reassembler.feed(0xAAAA, vec![1; 10], 1, 0, 3).unwrap();
2719 std::thread::sleep(Duration::from_millis(20));
2720
2721 // A second chunk lands — last_progress_at refreshes.
2722 reassembler.feed(0xAAAA, vec![1; 10], 1, 1, 3).unwrap();
2723
2724 // Sweep with an age that would have killed the entry from
2725 // its original creation, but is generous relative to the
2726 // fresh chunk's timestamp.
2727 let evicted = reassembler.sweep_stale(Duration::from_millis(15));
2728 assert_eq!(
2729 evicted, 0,
2730 "entry that received a chunk within max_age must survive"
2731 );
2732 assert_eq!(reassembler.pending_count(), 1);
2733 }
2734
2735 /// Pin the cross-daemon healing path: even if a particular
2736 /// daemon's hostile entry never sees another chunk, the
2737 /// opportunistic sweep at the head of every `feed` call drops
2738 /// it on the next traffic from ANY daemon.
2739 #[test]
2740 fn reassembler_opportunistic_sweep_in_feed_drops_quiet_entries() {
2741 // Use a tiny opportunistic-sweep age so the in-`feed`
2742 // sweep fires within test timescales.
2743 let mut reassembler = SnapshotReassembler::with_max_pending_age(Duration::from_millis(10));
2744
2745 // Hostile daemon parks an entry under (origin=0xBAD, seq=1).
2746 reassembler.feed(0xBAD, vec![0xFF; 10], 1, 0, 3).unwrap();
2747 assert_eq!(reassembler.pending_count(), 1);
2748
2749 // Time passes; hostile peer never sends anything else.
2750 std::thread::sleep(Duration::from_millis(25));
2751
2752 // A completely unrelated daemon's chunk arrives. The
2753 // opportunistic sweep at the head of `feed` must drop the
2754 // hostile entry as a side effect — not just the
2755 // explicit-`sweep_stale` driver.
2756 reassembler.feed(0xC0DE, vec![1; 10], 5, 0, 3).unwrap();
2757
2758 // Only the new daemon's entry remains; the hostile
2759 // 0xBAD entry was swept.
2760 assert_eq!(reassembler.pending_count(), 1);
2761 }
2762
2763 /// Sweeping a stale buffer must not amnesia the daemon's
2764 /// `latest_seq`: a peer that comes back later trying to
2765 /// re-open the same `seq_through` is still rejected as stale,
2766 /// so the sweep can't be turned into a snapshot-replacement
2767 /// amplifier.
2768 #[test]
2769 fn reassembler_sweep_stale_preserves_latest_seq() {
2770 let mut reassembler = SnapshotReassembler::new();
2771 reassembler.feed(0xAAAA, vec![1; 10], 100, 0, 3).unwrap();
2772
2773 std::thread::sleep(Duration::from_millis(20));
2774 let evicted = reassembler.sweep_stale(Duration::from_millis(10));
2775 assert_eq!(evicted, 1);
2776
2777 // Old seq_through must still be rejected as stale even
2778 // though the in-flight buffer was dropped.
2779 let stale = reassembler.feed(0xAAAA, vec![1; 10], 50, 0, 3);
2780 assert!(
2781 matches!(
2782 stale,
2783 Err(ReassemblyError::StaleSeqThrough {
2784 got: 50,
2785 latest: 100,
2786 })
2787 ),
2788 "post-sweep replay of an older seq_through must still be rejected, got {:?}",
2789 stale,
2790 );
2791 }
2792
2793 /// Pin the at-cap + quiet attack: a peer fills an entry to
2794 /// just under `MAX_PENDING_REASSEMBLY_BYTES` and goes silent.
2795 /// Pre-fix the per-entry byte cap blocked further amplification
2796 /// but the parked bytes stayed forever; the sweep closes that
2797 /// hole.
2798 #[test]
2799 fn reassembler_sweep_releases_buffer_parked_at_byte_cap() {
2800 let mut reassembler = SnapshotReassembler::new();
2801 let chunk_full = vec![0xCCu8; MAX_SNAPSHOT_CHUNK_SIZE];
2802 let chunks_to_fill = MAX_PENDING_REASSEMBLY_BYTES / MAX_SNAPSHOT_CHUNK_SIZE;
2803 let total_chunks = (chunks_to_fill as u32) + 2;
2804 for i in 0..(chunks_to_fill as u32) {
2805 reassembler
2806 .feed(0xAAAA, chunk_full.clone(), 1, i, total_chunks)
2807 .unwrap();
2808 }
2809 assert_eq!(reassembler.pending_count(), 1);
2810
2811 // Peer goes silent. Pre-fix the entry would stay parked
2812 // at ~MAX_PENDING_REASSEMBLY_BYTES indefinitely.
2813 std::thread::sleep(Duration::from_millis(20));
2814 let evicted = reassembler.sweep_stale(Duration::from_millis(10));
2815 assert_eq!(evicted, 1, "parked-at-cap entry must be released by sweep");
2816 assert_eq!(reassembler.pending_count(), 0);
2817 }
2818
2819 #[test]
2820 fn test_regression_reassembler_distinct_daemons_coexist() {
2821 // Eviction is per-daemon, not global — parallel migrations of
2822 // different daemons must be able to share the reassembler.
2823 let mut reassembler = SnapshotReassembler::new();
2824 reassembler.feed(0x1111, vec![1; 10], 1, 0, 3).unwrap();
2825 reassembler.feed(0x2222, vec![2; 10], 7, 0, 3).unwrap();
2826 reassembler.feed(0x3333, vec![3; 10], 9, 0, 3).unwrap();
2827 assert_eq!(reassembler.pending_count(), 3);
2828 }
2829
2830 #[test]
2831 fn test_regression_wire_decode_rejects_zero_total_chunks() {
2832 // Regression: the wire decoder accepted any u32 for total_chunks
2833 // and chunk_index, including nonsense like total_chunks=0. A
2834 // defensive validation at the wire boundary stops malformed
2835 // messages from ever reaching the reassembler.
2836 use bytes::BufMut;
2837 let mut buf = Vec::new();
2838 buf.put_u8(wire::MSG_SNAPSHOT_READY);
2839 buf.put_u64_le(0xAAAA); // daemon_origin
2840 buf.put_u64_le(1); // seq_through
2841 buf.put_u32_le(0); // chunk_index
2842 buf.put_u32_le(0); // total_chunks — invalid
2843 buf.put_u32_le(0); // len
2844 let err = wire::decode(&buf).expect_err("total_chunks=0 must be rejected");
2845 let err_msg = format!("{}", err);
2846 assert!(
2847 err_msg.contains("total_chunks"),
2848 "error must mention total_chunks, got {:?}",
2849 err_msg
2850 );
2851 }
2852
2853 #[test]
2854 fn test_regression_wire_decode_rejects_chunk_index_out_of_range() {
2855 use bytes::BufMut;
2856 let mut buf = Vec::new();
2857 buf.put_u8(wire::MSG_SNAPSHOT_READY);
2858 buf.put_u64_le(0xAAAA);
2859 buf.put_u64_le(1);
2860 buf.put_u32_le(5); // chunk_index
2861 buf.put_u32_le(3); // total_chunks — index out of range
2862 buf.put_u32_le(0);
2863 let err = wire::decode(&buf).expect_err("chunk_index >= total_chunks must be rejected");
2864 let err_msg = format!("{}", err);
2865 assert!(
2866 err_msg.contains("chunk_index"),
2867 "error must mention chunk_index, got {:?}",
2868 err_msg
2869 );
2870 }
2871
2872 #[test]
2873 fn test_regression_wire_decode_rejects_total_chunks_overflow() {
2874 use bytes::BufMut;
2875 let mut buf = Vec::new();
2876 buf.put_u8(wire::MSG_SNAPSHOT_READY);
2877 buf.put_u64_le(0xAAAA);
2878 buf.put_u64_le(1);
2879 buf.put_u32_le(0);
2880 buf.put_u32_le(u32::MAX); // total_chunks — exceeds MAX_TOTAL_CHUNKS
2881 buf.put_u32_le(0);
2882 let err = wire::decode(&buf).expect_err("total_chunks > MAX_TOTAL_CHUNKS must be rejected");
2883 let err_msg = format!("{}", err);
2884 assert!(
2885 err_msg.contains("MAX_TOTAL_CHUNKS"),
2886 "error must mention MAX_TOTAL_CHUNKS, got {:?}",
2887 err_msg
2888 );
2889 }
2890
2891 #[test]
2892 fn test_regression_reassembler_end_to_end_forged_chunk_cannot_complete() {
2893 // Integration: simulate an attacker who learns total_chunks=4 from a
2894 // legitimate first chunk and then tries to race ahead with forged
2895 // content at indices beyond the range. Even if indices {0,5,7} each
2896 // carry attacker-chosen bytes, the reassembler must never "complete"
2897 // a snapshot without receiving every real index in 0..total_chunks.
2898 let mut reassembler = SnapshotReassembler::new();
2899
2900 // Real chunk 0 — opens the reassembly at total_chunks=4.
2901 let r0 = reassembler.feed(0xDEAD, vec![0xA0; 10], 1, 0, 4).unwrap();
2902 assert!(r0.is_none());
2903
2904 // Forged out-of-range chunks — all rejected, none completes.
2905 for bad_idx in [4, 5, 7, 999] {
2906 let r = reassembler.feed(0xDEAD, vec![0xFF; 10], 1, bad_idx, 4);
2907 assert!(
2908 matches!(r, Err(ReassemblyError::ChunkIndexOutOfRange { .. })),
2909 "index {} must be rejected, got {:?}",
2910 bad_idx,
2911 r
2912 );
2913 }
2914
2915 // A snapshot-like "complete" signal can only come from filling
2916 // real indices 1, 2, 3.
2917 assert!(reassembler
2918 .feed(0xDEAD, vec![0xA1; 10], 1, 1, 4)
2919 .unwrap()
2920 .is_none());
2921 assert!(reassembler
2922 .feed(0xDEAD, vec![0xA2; 10], 1, 2, 4)
2923 .unwrap()
2924 .is_none());
2925 let full = reassembler
2926 .feed(0xDEAD, vec![0xA3; 10], 1, 3, 4)
2927 .unwrap()
2928 .expect("all four real chunks received — reassembly must complete");
2929 // Concatenation order is by chunk_index ascending, so the payload
2930 // is exactly the legitimate chunks 0,1,2,3 — not a forgery.
2931 assert_eq!(full.len(), 40);
2932 assert!(full[..10].iter().all(|&b| b == 0xA0));
2933 assert!(full[10..20].iter().all(|&b| b == 0xA1));
2934 assert!(full[20..30].iter().all(|&b| b == 0xA2));
2935 assert!(full[30..].iter().all(|&b| b == 0xA3));
2936 }
2937
2938 #[test]
2939 fn test_wire_roundtrip_chunked_snapshot() {
2940 let msg = MigrationMessage::SnapshotReady {
2941 daemon_origin: 0xCCCC,
2942 snapshot_bytes: vec![42; 100],
2943 seq_through: 99,
2944 chunk_index: 2,
2945 total_chunks: 5,
2946 };
2947 let encoded = wire::encode(&msg).unwrap();
2948 let decoded = wire::decode(&encoded).unwrap();
2949 match decoded {
2950 MigrationMessage::SnapshotReady {
2951 chunk_index,
2952 total_chunks,
2953 ..
2954 } => {
2955 assert_eq!(chunk_index, 2);
2956 assert_eq!(total_chunks, 5);
2957 }
2958 _ => panic!("expected SnapshotReady"),
2959 }
2960 }
2961
2962 #[test]
2963 fn test_wire_roundtrip_failed() {
2964 // Round-trip every variant of MigrationFailureReason to
2965 // pin the wire-layout contract. A future bump that drops
2966 // or adds a variant without updating the match will trip
2967 // the exhaustive match below.
2968 for reason in [
2969 MigrationFailureReason::NotReady,
2970 MigrationFailureReason::FactoryNotFound,
2971 MigrationFailureReason::ComputeNotSupported,
2972 MigrationFailureReason::StateFailed("something broke".into()),
2973 MigrationFailureReason::AlreadyMigrating,
2974 MigrationFailureReason::IdentityTransportFailed("seal failed".into()),
2975 MigrationFailureReason::NotReadyTimeout { attempts: 5 },
2976 ] {
2977 let msg = MigrationMessage::MigrationFailed {
2978 daemon_origin: 0xCCCC,
2979 reason: reason.clone(),
2980 };
2981 let encoded = wire::encode(&msg).unwrap();
2982 let decoded = wire::decode(&encoded).unwrap();
2983 match decoded {
2984 MigrationMessage::MigrationFailed {
2985 daemon_origin,
2986 reason: r,
2987 } => {
2988 assert_eq!(daemon_origin, 0xCCCC);
2989 assert_eq!(r, reason);
2990 }
2991 _ => panic!("expected MigrationFailed"),
2992 }
2993 }
2994 }
2995
2996 #[test]
2997 fn test_wire_roundtrip_buffered_events() {
2998 let events = vec![
2999 CausalEvent {
3000 link: CausalLink::genesis(0xAAAA, 0),
3001 payload: Bytes::from_static(b"event1"),
3002 received_at: 100,
3003 },
3004 CausalEvent {
3005 link: CausalLink {
3006 origin_hash: 0xAAAA,
3007 horizon_encoded: 1,
3008 sequence: 1,
3009 parent_hash: 12345,
3010 },
3011 payload: Bytes::from_static(b"event2"),
3012 received_at: 200,
3013 },
3014 ];
3015 let msg = MigrationMessage::BufferedEvents {
3016 daemon_origin: 0xAAAA,
3017 events,
3018 };
3019 let encoded = wire::encode(&msg).unwrap();
3020 let decoded = wire::decode(&encoded).unwrap();
3021 match decoded {
3022 MigrationMessage::BufferedEvents {
3023 daemon_origin,
3024 events,
3025 } => {
3026 assert_eq!(daemon_origin, 0xAAAA);
3027 assert_eq!(events.len(), 2);
3028 assert_eq!(events[0].payload, Bytes::from_static(b"event1"));
3029 assert_eq!(events[0].received_at, 100);
3030 assert_eq!(events[1].link.sequence, 1);
3031 assert_eq!(events[1].link.parent_hash, 12345);
3032 assert_eq!(events[1].payload, Bytes::from_static(b"event2"));
3033 assert_eq!(events[1].received_at, 200);
3034 }
3035 _ => panic!("expected BufferedEvents"),
3036 }
3037 }
3038
3039 #[test]
3040 fn test_wire_encode_rejects_oversized_failure_reason() {
3041 // Regression: `reason.len() as u16` previously truncated silently when
3042 // the reason exceeded u16::MAX, producing a stream the decoder
3043 // misparses. Encoding must now return an error.
3044 let oversized = "x".repeat(u16::MAX as usize + 1);
3045 let msg = MigrationMessage::MigrationFailed {
3046 daemon_origin: 0xDEAD,
3047 reason: MigrationFailureReason::StateFailed(oversized),
3048 };
3049 let result = wire::encode(&msg);
3050 assert!(
3051 matches!(result, Err(MigrationError::StateFailed(_))),
3052 "encode of oversized reason must error, got {:?}",
3053 result
3054 );
3055 }
3056
3057 #[test]
3058 fn test_wire_rejects_unknown_failure_code() {
3059 // Manually-crafted `MSG_FAILED` with code 0xFFFF (unknown
3060 // variant). Decoder must refuse rather than mis-parse.
3061 let mut buf = Vec::new();
3062 buf.put_u8(wire::MSG_FAILED);
3063 buf.put_u64_le(0xBEEF);
3064 buf.put_u16_le(0xFFFF); // unknown code
3065 let err = wire::decode(&buf).expect_err("unknown code must reject");
3066 match err {
3067 MigrationError::StateFailed(msg) => {
3068 assert!(msg.contains("unknown MigrationFailureReason code"));
3069 }
3070 other => panic!("expected StateFailed, got {other:?}"),
3071 }
3072 }
3073
3074 #[test]
3075 fn test_list_migrations() {
3076 let (reg, origin) = setup_registry();
3077 let orch = MigrationOrchestrator::new(reg, 0x1111);
3078
3079 assert!(orch.list_migrations().is_empty());
3080
3081 orch.start_migration(origin, 0x1111, 0x2222).unwrap();
3082
3083 let list = orch.list_migrations();
3084 assert_eq!(list.len(), 1);
3085 assert_eq!(list[0].daemon_origin, origin);
3086 assert_eq!(list[0].source_node, 0x1111);
3087 assert_eq!(list[0].target_node, 0x2222);
3088 assert_eq!(list[0].retries, 0);
3089 }
3090}