net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
//! Source-side migration handler.
//!
//! Manages the source node's role in migration: taking a snapshot, buffering
//! events during transfer/replay, executing cutover (stop writes), and
//! cleaning up the daemon after the target is live.

use std::sync::Arc;
use std::time::Instant;

use dashmap::DashMap;
use parking_lot::Mutex;

use super::migration::{MigrationError, MigrationPhase};
use super::registry::DaemonRegistry;
use crate::adapter::net::state::causal::CausalEvent;
use crate::adapter::net::state::snapshot::StateSnapshot;

/// Maximum bytes of buffered events the source will hold per
/// daemon before refusing further inserts with
/// `MigrationError::BufferFull`. Mirrors the target's
/// `MAX_PENDING_BUFFER_BYTES` (64 MiB).
const MAX_SOURCE_BUFFERED_BYTES: usize = 64 * 1024 * 1024;
/// Companion event-count cap. Mirrors `MAX_BUFFERED_EVENTS` in
/// the orchestrator's wire-decode path (1_000_000).
const MAX_SOURCE_BUFFERED_EVENTS: usize = 1_000_000;

/// Per-daemon source-side migration state.
#[allow(dead_code)]
struct SourceMigrationState {
    daemon_origin: u64,
    target_node: u64,
    /// Node that initiated this migration. Replies (SnapshotReady,
    /// CleanupComplete) are routed here, not to the immediate wire hop
    /// — which, under future subprotocol relaying, may not be the
    /// orchestrator.
    orchestrator_node: u64,
    phase: MigrationPhase,
    snapshot: Option<StateSnapshot>,
    /// Events buffered between snapshot and cutover, in sequence order.
    buffered_events: Vec<CausalEvent>,
    /// Running byte total of `buffered_events.payload` sizes. Updated
    /// on push / drain so `buffer_event`'s cap check is O(1).
    /// Pre-fix the check recomputed the sum with `iter().map().sum()`
    /// on every insert — O(N) per call, O(N²) over the lifetime of
    /// a migration, which is exactly the shape the byte cap exists
    /// to bound against (a wedged target plus a high-volume daemon).
    buffered_bytes: usize,
    /// Last buffered event sequence number.
    last_buffered_seq: u64,
    started_at: Instant,
}

/// Handles the source node's role in daemon migration.
///
/// The source handler:
/// 1. Takes a snapshot of the daemon (phase 0)
/// 2. Buffers events arriving for the daemon during migration (phases 0-3)
/// 3. Stops accepting writes at cutover (phase 4)
/// 4. Unregisters the daemon and cleans up (phase 5)
pub struct MigrationSourceHandler {
    /// Local daemon registry.
    daemon_registry: Arc<DaemonRegistry>,
    /// Active migrations on this node as source: daemon_origin → state.
    migrations: DashMap<u64, Mutex<SourceMigrationState>>,
    /// Single-flight claim set: a daemon present here has a snapshot
    /// in flight. `start_snapshot` CAS-inserts the origin BEFORE
    /// running the user-supplied `MeshDaemon::snapshot()` and
    /// removes it on insertion-into-`migrations` OR on early return.
    /// Pre-fix the contains_key→entry window let two callers each
    /// run a full snapshot for the same origin, double-firing any
    /// non-idempotent side-effect (counter bumps, deferred I/O,
    /// etc.) inside the user's snapshot impl.
    snapshots_in_progress: DashMap<u64, ()>,
}

impl MigrationSourceHandler {
    /// Create a new source handler.
    pub fn new(daemon_registry: Arc<DaemonRegistry>) -> Self {
        Self {
            daemon_registry,
            migrations: DashMap::new(),
            snapshots_in_progress: DashMap::new(),
        }
    }

    /// Phase 0: Take snapshot of a local daemon.
    ///
    /// Registers the migration and returns the snapshot for transfer.
    /// `orchestrator_node` is the node that initiated this migration;
    /// SnapshotReady / CleanupComplete replies are routed to it rather
    /// than to whatever hop forwarded the wire packet.
    pub fn start_snapshot(
        &self,
        daemon_origin: u64,
        target_node: u64,
        orchestrator_node: u64,
    ) -> Result<StateSnapshot, MigrationError> {
        // Pre-fix, the Vacant entry write-guard from
        // `migrations.entry(daemon_origin)` was held across
        // `daemon_registry.contains` and `daemon_registry.snapshot`,
        // both of which take an inner `Mutex<DaemonHost>`.
        // Combined with another caller that takes the daemon-host
        // mutex first then touches the same dashmap shard,
        // deadlock risk emerges. The snapshot itself runs
        // user-supplied `MeshDaemon::snapshot()` code, blocking
        // co-hashed migrations on the held shard guard for the
        // duration.
        //
        // Post-fix: do the read-only contains check + the
        // expensive snapshot OUTSIDE any dashmap entry guard,
        // then `entry()` again at the very end to atomically
        // insert. The trade-off is a wasted snapshot if two
        // callers race start_snapshot for the same origin (the
        // second one's `entry()` finds Occupied and discards its
        // snapshot). That's far cheaper than a deadlock — and
        // the duplicate snapshot work is bounded to the racing
        // pair, not all co-hashed origins.
        //
        // Side-effect note: `daemon_registry.snapshot(...)` calls
        // user-supplied `MeshDaemon::snapshot()` code. Two racing
        // `start_snapshot` calls therefore produce two snapshot
        // side-effects (counter bumps, deferred I/O, etc.) where
        // the prior single-flight design produced one. This is
        // fine for any *idempotent* `MeshDaemon::snapshot()` —
        // which is the documented contract — but a non-idempotent
        // implementation must be aware that the second call's
        // result is discarded *after* it ran. If your daemon's
        // snapshot has visible side-effects beyond serializing
        // state, gate them behind your own single-flight (e.g. a
        // `tokio::sync::Mutex`) inside `MeshDaemon::snapshot`
        // rather than relying on this layer to deduplicate.

        if !self.daemon_registry.contains(daemon_origin) {
            return Err(MigrationError::DaemonNotFound(daemon_origin));
        }

        if self.migrations.contains_key(&daemon_origin) {
            return Err(MigrationError::AlreadyMigrating(daemon_origin));
        }

        // Single-flight claim. CAS-insert into `snapshots_in_progress`
        // BEFORE running the user-supplied snapshot — DashMap's
        // `Entry::Vacant`/`Occupied` is the atomic fence. If we
        // observe `Occupied`, another caller is mid-snapshot for
        // this origin; surface AlreadyMigrating without firing the
        // user's snapshot a second time.
        match self.snapshots_in_progress.entry(daemon_origin) {
            dashmap::mapref::entry::Entry::Occupied(_) => {
                return Err(MigrationError::AlreadyMigrating(daemon_origin));
            }
            dashmap::mapref::entry::Entry::Vacant(entry) => {
                entry.insert(());
            }
        }
        // RAII drop of the claim regardless of which branch we exit
        // through. Keeping the claim past `migrations.entry` insert
        // is fine — the contains_key check at the top of subsequent
        // callers' `start_snapshot` already returns AlreadyMigrating
        // once `migrations` is populated.
        struct ClaimGuard<'a> {
            map: &'a DashMap<u64, ()>,
            origin: u64,
        }
        impl Drop for ClaimGuard<'_> {
            fn drop(&mut self) {
                self.map.remove(&self.origin);
            }
        }
        let _claim_guard = ClaimGuard {
            map: &self.snapshots_in_progress,
            origin: daemon_origin,
        };

        let snapshot = self
            .daemon_registry
            .snapshot(daemon_origin)
            .map_err(|e| MigrationError::StateFailed(e.to_string()))?
            .ok_or_else(|| {
                MigrationError::StateFailed("daemon is stateless or snapshot failed".into())
            })?;

        // Atomic insert. The single-flight claim above guarantees
        // no second snapshot call ran for this origin while we were
        // computing — so the Occupied branch here is unreachable
        // in practice, but kept for defense-in-depth.
        let entry = match self.migrations.entry(daemon_origin) {
            dashmap::mapref::entry::Entry::Occupied(_) => {
                return Err(MigrationError::AlreadyMigrating(daemon_origin));
            }
            dashmap::mapref::entry::Entry::Vacant(entry) => entry,
        };
        entry.insert(Mutex::new(SourceMigrationState {
            daemon_origin,
            target_node,
            orchestrator_node,
            phase: MigrationPhase::Snapshot,
            snapshot: Some(snapshot.clone()),
            buffered_events: Vec::new(),
            buffered_bytes: 0,
            last_buffered_seq: snapshot.through_seq,
            started_at: Instant::now(),
        }));

        Ok(snapshot)
    }

    /// Recorded orchestrator for an active source-side migration.
    ///
    /// Returns `None` once the migration has been cleaned up.
    pub fn orchestrator_node(&self, daemon_origin: u64) -> Option<u64> {
        self.migrations
            .get(&daemon_origin)
            .map(|e| e.lock().orchestrator_node)
    }

    /// Buffer an event arriving for a daemon during migration.
    ///
    /// Events are buffered during Snapshot through Replay phases.
    /// Returns `Ok(true)` if buffered, `Ok(false)` if no migration active
    /// or past cutover. Returns `Err` if the daemon was cut over (writes rejected).
    pub fn buffer_event(
        &self,
        daemon_origin: u64,
        event: CausalEvent,
    ) -> Result<bool, MigrationError> {
        let entry = match self.migrations.get(&daemon_origin) {
            Some(entry) => entry,
            None => return Ok(false),
        };

        let mut state = entry.lock();
        match state.phase {
            MigrationPhase::Snapshot
            | MigrationPhase::Transfer
            | MigrationPhase::Restore
            | MigrationPhase::Replay => {
                // Per-daemon byte + count cap. A stuck migration
                // (target wedged in Restore/Replay because of an
                // upstream stall) plus a high-volume daemon would
                // otherwise grow the buffer without bound on the
                // source side; the wire-decode cap downstream
                // (MAX_BUFFERED_EVENTS) is post-encode and never
                // sees this. Surface BufferFull so the orchestrator
                // can abort cleanly rather than OOM the node.
                //
                // Running `buffered_bytes` counter keeps this O(1)
                // per insert — mirrors the target's `pending_bytes`
                // shape. Pre-fix this recomputed the sum on every
                // call (O(N²) over the migration life), exactly
                // matching the attack shape the byte cap exists to
                // bound against.
                let event_bytes = event.payload.len();
                let would_be_bytes = state.buffered_bytes.saturating_add(event_bytes);
                if state.buffered_events.len() >= MAX_SOURCE_BUFFERED_EVENTS
                    || would_be_bytes > MAX_SOURCE_BUFFERED_BYTES
                {
                    // Report `would_be_bytes` (post-insert total)
                    // rather than `buffered_bytes` (pre-insert). The
                    // pre-insert value can read `< MAX` for a
                    // `BufferFull` error, which confuses operator
                    // dashboards interpreting the field as "the
                    // size that exceeded the cap." Post-insert
                    // matches the cap comparison and makes the
                    // error self-explanatory.
                    return Err(MigrationError::BufferFull {
                        events: state.buffered_events.len().saturating_add(1),
                        bytes: would_be_bytes,
                    });
                }
                state.last_buffered_seq = event.link.sequence;
                state.buffered_bytes = state.buffered_bytes.saturating_add(event_bytes);
                state.buffered_events.push(event);
                Ok(true)
            }
            MigrationPhase::Cutover | MigrationPhase::Complete => {
                // After cutover, source rejects writes
                Err(MigrationError::StateFailed(format!(
                    "daemon {:#x} has been cut over, writes rejected",
                    daemon_origin,
                )))
            }
        }
    }

    /// Check if a daemon is being migrated from this node.
    pub fn is_migrating(&self, daemon_origin: u64) -> bool {
        self.migrations.contains_key(&daemon_origin)
    }

    /// Get buffered events for transfer to the target (during
    /// snapshot/transfer/restore/replay phases — i.e. the same
    /// phases that `buffer_event` accepts writes in).
    ///
    /// Drains the buffer — events are moved, not copied.
    ///
    /// Returns `WrongPhase` if invoked after cutover. Pre-fix
    /// the call had no phase guard, so a caller that drained
    /// post-cutover would silently get an empty `Vec` (since
    /// `on_cutover` already drained the buffer to its return
    /// value and any post-cutover writes are rejected by
    /// `buffer_event`). Distinguishing "no events were
    /// buffered" from "you called drain in the wrong phase" via
    /// a typed error catches the latter at the boundary instead
    /// of letting it manifest as missing-event diagnostics
    /// downstream.
    pub fn take_buffered_events(
        &self,
        daemon_origin: u64,
    ) -> Result<Vec<CausalEvent>, MigrationError> {
        let entry = self
            .migrations
            .get(&daemon_origin)
            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;

        let mut state = entry.lock();
        match state.phase {
            MigrationPhase::Snapshot
            | MigrationPhase::Transfer
            | MigrationPhase::Restore
            | MigrationPhase::Replay => {
                state.buffered_bytes = 0;
                Ok(std::mem::take(&mut state.buffered_events))
            }
            other => Err(MigrationError::WrongPhase {
                expected: MigrationPhase::Replay,
                got: other,
            }),
        }
    }

    /// Phase 4: Cutover — stop accepting writes for this daemon.
    pub fn on_cutover(&self, daemon_origin: u64) -> Result<Vec<CausalEvent>, MigrationError> {
        let entry = self
            .migrations
            .get(&daemon_origin)
            .ok_or(MigrationError::DaemonNotFound(daemon_origin))?;

        let mut state = entry.lock();
        state.phase = MigrationPhase::Cutover;

        // Return any remaining buffered events for final sync.
        state.buffered_bytes = 0;
        Ok(std::mem::take(&mut state.buffered_events))
    }

    /// Phase 5: Cleanup — unregister daemon from this node.
    ///
    /// Removes the daemon from the local registry and clears migration state.
    /// Requires the migration to be in `Cutover` (the only phase
    /// after which the source's daemon copy is safe to retire); any
    /// pre-cutover call would otherwise unregister a live daemon
    /// while the target is still restoring, stranding new traffic
    /// in `DaemonNotFound` and losing source-side `buffered_events`.
    ///
    /// When no migration record exists for `daemon_origin`, this is
    /// a no-op success. A forged or replayed `CleanupComplete` for an
    /// origin we never migrated must NOT unregister an unrelated local
    /// daemon — the unregister is gated on `migrations` membership and
    /// the `Cutover` phase, which together prove this handler authored
    /// the migration whose target now owns the daemon.
    pub fn cleanup(&self, daemon_origin: u64) -> Result<(), MigrationError> {
        let Some(entry) = self.migrations.get(&daemon_origin) else {
            return Ok(());
        };
        let phase = entry.lock().phase;
        if phase != MigrationPhase::Cutover {
            return Err(MigrationError::WrongPhase {
                expected: MigrationPhase::Cutover,
                got: phase,
            });
        }
        drop(entry);

        // Unregister daemon from local registry. Only reached when the
        // migration record exists AND is in Cutover — i.e. the local
        // source authored this migration and the target has accepted
        // the cutover, so the source's copy is safe to retire.
        let _ = self.daemon_registry.unregister(daemon_origin);

        // Remove migration state
        self.migrations.remove(&daemon_origin);

        Ok(())
    }

    /// Abort a migration — return to normal operation.
    ///
    /// Clears migration state. The daemon remains registered locally.
    pub fn abort(&self, daemon_origin: u64) -> Result<(), MigrationError> {
        self.migrations.remove(&daemon_origin);
        Ok(())
    }

    /// Get the current phase of a migration on this source.
    pub fn phase(&self, daemon_origin: u64) -> Option<MigrationPhase> {
        self.migrations
            .get(&daemon_origin)
            .map(|entry| entry.lock().phase)
    }

    /// Number of active source-side migrations.
    pub fn active_count(&self) -> usize {
        self.migrations.len()
    }

    /// Currently-buffered event count for `daemon_origin`'s active
    /// migration, if one exists. Used by snapshot-source adapters
    /// to populate `MigrationSnapshot::buffered_events` truthfully
    /// instead of hardcoding `0`.
    pub fn buffered_event_count(&self, daemon_origin: u64) -> Option<usize> {
        self.migrations
            .get(&daemon_origin)
            .map(|e| e.lock().buffered_events.len())
    }
}

impl std::fmt::Debug for MigrationSourceHandler {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("MigrationSourceHandler")
            .field("active_migrations", &self.migrations.len())
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::adapter::net::behavior::capability::CapabilityFilter;
    use crate::adapter::net::compute::{DaemonError, DaemonHost, DaemonHostConfig, MeshDaemon};
    use crate::adapter::net::identity::EntityKeypair;
    use crate::adapter::net::state::causal::CausalLink;
    use bytes::Bytes;

    struct StatefulDaemon {
        value: u64,
    }

    impl MeshDaemon for StatefulDaemon {
        fn name(&self) -> &str {
            "stateful"
        }
        fn requirements(&self) -> CapabilityFilter {
            CapabilityFilter::default()
        }
        fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
            self.value += 1;
            Ok(vec![])
        }
        fn snapshot(&self) -> Option<Bytes> {
            Some(Bytes::from(self.value.to_le_bytes().to_vec()))
        }
        fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
            self.value = u64::from_le_bytes(state[..8].try_into().unwrap());
            Ok(())
        }
    }

    fn setup() -> (Arc<DaemonRegistry>, u64) {
        let reg = Arc::new(DaemonRegistry::new());
        let kp = EntityKeypair::generate();
        let origin = kp.origin_hash();
        let host = DaemonHost::new(
            Box::new(StatefulDaemon { value: 42 }),
            kp,
            DaemonHostConfig::default(),
        );
        reg.register(host).unwrap();
        (reg, origin)
    }

    fn make_event(origin: u64, seq: u64) -> CausalEvent {
        CausalEvent {
            link: CausalLink {
                origin_hash: origin,
                horizon_encoded: 0,
                sequence: seq,
                parent_hash: 0,
            },
            payload: Bytes::from_static(b"data"),
            received_at: 0,
        }
    }

    #[test]
    fn test_start_snapshot() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg);

        let snapshot = handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
        assert_eq!(snapshot.entity_id.origin_hash(), origin);
        assert!(handler.is_migrating(origin));
    }

    #[test]
    fn test_start_snapshot_not_found() {
        let reg = Arc::new(DaemonRegistry::new());
        let handler = MigrationSourceHandler::new(reg);
        assert!(handler.start_snapshot(0xDEAD, 0x2222, 0x1111).is_err());
    }

    #[test]
    fn test_duplicate_snapshot_rejected() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg);

        handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
        let err = handler.start_snapshot(origin, 0x3333, 0x1111).unwrap_err();
        assert_eq!(err, MigrationError::AlreadyMigrating(origin));
    }

    #[test]
    fn test_buffer_events() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg);

        handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();

        assert!(handler.buffer_event(origin, make_event(origin, 1)).unwrap());
        assert!(handler.buffer_event(origin, make_event(origin, 2)).unwrap());
        assert!(handler.buffer_event(origin, make_event(origin, 3)).unwrap());

        let events = handler.take_buffered_events(origin).unwrap();
        assert_eq!(events.len(), 3);
    }

    #[test]
    fn test_buffer_event_no_migration() {
        let (reg, _origin) = setup();
        let handler = MigrationSourceHandler::new(reg);

        let result = handler.buffer_event(0xDEAD, make_event(0xDEAD, 1)).unwrap();
        assert!(!result);
    }

    #[test]
    fn test_cutover_rejects_writes() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg);

        handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
        handler.buffer_event(origin, make_event(origin, 1)).unwrap();

        // Cutover
        let remaining = handler.on_cutover(origin).unwrap();
        assert_eq!(remaining.len(), 1);

        // After cutover, buffer_event should reject
        let err = handler
            .buffer_event(origin, make_event(origin, 2))
            .unwrap_err();
        assert!(err.to_string().contains("cut over"));
    }

    #[test]
    fn test_cleanup() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg.clone());

        handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
        handler.on_cutover(origin).unwrap();
        handler.cleanup(origin).unwrap();

        assert!(!handler.is_migrating(origin));
        assert!(!reg.contains(origin)); // daemon unregistered
    }

    #[test]
    fn test_abort() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg.clone());

        handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
        handler.abort(origin).unwrap();

        assert!(!handler.is_migrating(origin));
        assert!(reg.contains(origin)); // daemon still registered
    }

    /// `cleanup` invoked before `on_cutover` must NOT unregister
    /// the source's live daemon — the target is still restoring
    /// and inbound traffic would otherwise route to a missing
    /// daemon while source-side buffered events were silently
    /// dropped. The guard returns `WrongPhase { expected: Cutover }`
    /// so misuse surfaces at the boundary.
    #[test]
    fn cleanup_before_cutover_rejects_with_wrong_phase() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg.clone());

        handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
        // Snapshot phase — should reject.
        let err = handler.cleanup(origin).unwrap_err();
        match err {
            MigrationError::WrongPhase { expected, got } => {
                assert_eq!(expected, MigrationPhase::Cutover);
                assert_eq!(got, MigrationPhase::Snapshot);
            }
            other => panic!("expected WrongPhase, got {:?}", other),
        }
        // Live daemon still registered, migration record still present.
        assert!(reg.contains(origin));
        assert!(handler.is_migrating(origin));
    }

    /// Regression: `cleanup` for an unknown `daemon_origin` must be
    /// a no-op success and must NOT touch the local daemon registry.
    /// Pre-fix, the unregister ran unconditionally once the early
    /// `if let Some(entry) = ...` block was skipped, so a forged or
    /// replayed `CleanupComplete` for an origin we never migrated
    /// tore down an unrelated live local daemon.
    #[test]
    fn cleanup_for_unknown_origin_does_not_unregister_live_daemon() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg.clone());

        // No `start_snapshot` for `origin` — the source handler has
        // no migration record. A spurious `cleanup` call for it must
        // leave the local daemon registered.
        handler
            .cleanup(origin)
            .expect("cleanup is idempotent on miss");
        assert!(
            reg.contains(origin),
            "cleanup for an origin with no migration record must not unregister the local daemon",
        );
        assert!(!handler.is_migrating(origin));
    }

    /// Regression: `take_buffered_events` must refuse to drain
    /// after `on_cutover` has transitioned the daemon into the
    /// `Cutover` phase. Pre-fix the call had no phase guard, so
    /// a caller invoking it post-cutover silently received an
    /// empty `Vec` (since `on_cutover` already drained the
    /// buffer to its return value). The empty result was
    /// indistinguishable from "no events were ever buffered,"
    /// pushing diagnosis of the misuse to whatever downstream
    /// code consumed the empty list. Post-fix it returns
    /// `WrongPhase { expected: Replay, got: Cutover }` so the
    /// programming error surfaces at the boundary.
    #[test]
    fn take_buffered_events_after_cutover_returns_wrong_phase() {
        let (reg, origin) = setup();
        let handler = MigrationSourceHandler::new(reg);

        handler.start_snapshot(origin, 0x2222, 0x1111).unwrap();
        handler.buffer_event(origin, make_event(origin, 1)).unwrap();
        // on_cutover drains and transitions to Cutover phase.
        let _ = handler.on_cutover(origin).unwrap();

        let err = handler.take_buffered_events(origin).unwrap_err();
        match err {
            MigrationError::WrongPhase { expected, got } => {
                assert_eq!(expected, MigrationPhase::Replay);
                assert_eq!(got, MigrationPhase::Cutover);
            }
            other => panic!(
                "expected WrongPhase {{ expected: Replay, got: Cutover }}, got {:?}",
                other
            ),
        }
    }
}