Skip to main content

net_sdk/
compute.rs

1//! Compute surface — `MeshDaemon` + `DaemonRuntime`.
2//!
3//! Users implement [`MeshDaemon`] and hand it to a [`DaemonRuntime`]
4//! tied to a [`Mesh`] node. The runtime holds the
5//! kind-keyed factory table, the per-daemon host registry, and the
6//! lifecycle gate that decides when inbound migrations may land.
7//!
8//! This file is Stage 1 of
9//! [`SDK_COMPUTE_SURFACE_PLAN.md`](../../../docs/SDK_COMPUTE_SURFACE_PLAN.md)
10//! plus the lifecycle half of
11//! [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
12//! local spawn / snapshot / stop, with an explicit
13//! `Registering → Ready → ShuttingDown` fence. Migration is Stage 2;
14//! the wire-level half of the readiness plan
15//! (`MigrationFailureReason`) ships alongside it.
16//!
17//! # Example
18//!
19//! ```no_run
20//! use std::sync::Arc;
21//! use bytes::Bytes;
22//! use net_sdk::{Identity, Mesh};
23//! use net_sdk::compute::{
24//!     CausalEvent, DaemonHostConfig, DaemonRuntime, MeshDaemon,
25//! };
26//! use net_sdk::capabilities::CapabilityFilter;
27//! use net::adapter::net::compute::DaemonError as CoreDaemonError;
28//!
29//! struct EchoDaemon;
30//! impl MeshDaemon for EchoDaemon {
31//!     fn name(&self) -> &str { "echo" }
32//!     fn requirements(&self) -> CapabilityFilter { CapabilityFilter::default() }
33//!     fn process(&mut self, event: &CausalEvent) -> Result<Vec<Bytes>, CoreDaemonError> {
34//!         Ok(vec![event.payload.clone()])
35//!     }
36//! }
37//!
38//! # async fn example(mesh: Arc<Mesh>) -> Result<(), Box<dyn std::error::Error>> {
39//! let rt = DaemonRuntime::new(mesh);
40//! rt.register_factory("echo", || Box::new(EchoDaemon))?;
41//! rt.start().await?;
42//! let handle = rt.spawn("echo", Identity::generate(), DaemonHostConfig::default()).await?;
43//! # Ok(())
44//! # }
45//! ```
46
47use std::collections::HashMap;
48use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
49use std::sync::{Arc, Mutex, RwLock};
50
51use thiserror::Error;
52
53pub use ::net::adapter::net::compute::{
54    DaemonBindings, DaemonError as CoreDaemonError, DaemonHostConfig, DaemonStats, MeshDaemon,
55    MigrationError, MigrationFailureReason, MigrationPhase, PlacementDecision, SchedulerError,
56    SubscriptionBinding, SUBPROTOCOL_MIGRATION,
57};
58pub use ::net::adapter::net::state::causal::{CausalEvent, CausalLink};
59pub use ::net::adapter::net::state::snapshot::StateSnapshot;
60
61use ::net::adapter::net::channel::ChannelName;
62use ::net::adapter::net::identity::PermissionToken;
63
64use ::net::adapter::net::behavior::capability::CapabilitySet;
65use ::net::adapter::net::compute::{
66    chunk_snapshot, orchestrator::wire as migration_wire, DaemonFactoryRegistry, DaemonHost,
67    DaemonRegistry, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
68    MigrationTargetHandler, Scheduler,
69};
70use ::net::adapter::net::identity::EntityId;
71use ::net::adapter::net::subprotocol::{
72    FailureCallback, MigrationHandlerHooks, MigrationSubprotocolHandler, PostRestoreCallback,
73    PreCleanupCallback, ReadinessCallback,
74};
75
76use crate::identity::Identity;
77use crate::mesh::Mesh;
78
79/// Arc-wrapped factory closure. Kind-keyed at the SDK layer; cloned
80/// into the core `DaemonFactoryRegistry` at `spawn` time so a future
81/// migration target can reconstruct the daemon by `origin_hash`.
82type FactoryFn = Arc<dyn Fn() -> Box<dyn MeshDaemon> + Send + Sync>;
83
84/// Errors from the SDK daemon runtime.
85#[derive(Debug, Error)]
86pub enum DaemonError {
87    /// `start()` has not been called yet; the runtime is still in
88    /// `Registering` and will not accept spawns or migrations.
89    #[error("daemon runtime is not ready — call DaemonRuntime::start() first")]
90    NotReady,
91    /// `shutdown()` has been called; the runtime is permanently
92    /// non-functional.
93    #[error("daemon runtime has been shut down")]
94    ShuttingDown,
95    /// Two `register_factory` calls used the same `kind` string.
96    #[error("factory for kind '{0}' is already registered")]
97    FactoryAlreadyRegistered(String),
98    /// `spawn` / `spawn_from_snapshot` referenced an unregistered kind.
99    #[error("no factory registered for kind '{0}'")]
100    FactoryNotFound(String),
101    /// The snapshot's `entity_id.origin_hash` does not match the
102    /// identity handed to `spawn_from_snapshot`.
103    #[error(
104        "snapshot/identity mismatch: snapshot origin {snapshot:#x} != identity origin {identity:#x}"
105    )]
106    SnapshotIdentityMismatch { snapshot: u64, identity: u64 },
107    /// Pass-through for errors surfaced by the core compute layer.
108    #[error(transparent)]
109    Core(#[from] CoreDaemonError),
110    /// Pass-through for migration-layer errors.
111    #[error("migration failed: {0}")]
112    Migration(#[from] MigrationError),
113    /// Structured failure reason surfaced by the migration
114    /// dispatcher on the source side. Use
115    /// [`MigrationFailureReason::is_retriable`] to decide whether
116    /// the caller should back off and retry rather than propagating.
117    #[error("migration failed: {0}")]
118    MigrationFailed(MigrationFailureReason),
119}
120
121// Runtime state machine. Encoded as `u8` so it rides in an
122// `AtomicU8` without an extra layer of indirection. Values are
123// stable across the lifetime of a `DaemonRuntime` and must not be
124// reordered (release / acquire cmpxchg compares by value).
125#[repr(u8)]
126#[derive(Copy, Clone, Debug, Eq, PartialEq)]
127enum State {
128    Registering = 0,
129    Ready = 1,
130    ShuttingDown = 2,
131}
132
133impl State {
134    fn from_u8(v: u8) -> Self {
135        match v {
136            0 => State::Registering,
137            1 => State::Ready,
138            2 => State::ShuttingDown,
139            // `AtomicU8` is only written through `State::*` variants,
140            // so any other value means memory corruption. Panic
141            // loudly rather than silently misinterpret.
142            other => panic!("daemon runtime: corrupt state byte {other}"),
143        }
144    }
145}
146
147/// Per-mesh compute runtime.
148///
149/// Holds the kind-keyed factory table, the per-daemon host registry,
150/// and the `Registering → Ready → ShuttingDown` lifecycle gate. One
151/// `DaemonRuntime` per [`Mesh`]; clone the handle freely — the inner
152/// state is `Arc`-shared.
153#[derive(Clone)]
154pub struct DaemonRuntime {
155    inner: Arc<Inner>,
156}
157
158struct Inner {
159    mesh: Arc<Mesh>,
160    state: AtomicU8,
161    /// SDK-side kind → factory map. The migration target path reaches
162    /// into the core `factory_registry` below by `origin_hash`;
163    /// `kind` is SDK sugar so the *caller* can spawn without knowing
164    /// the underlying keypair.
165    factories: RwLock<HashMap<String, FactoryFn>>,
166    /// Core registry — shared with the migration target handler so
167    /// daemons restored from an inbound snapshot land in the same
168    /// map a local `spawn` uses.
169    registry: Arc<DaemonRegistry>,
170    /// Core scheduler — built once at `new()` from the mesh's
171    /// shared `CapabilityIndex`. Used by the `groups` feature's
172    /// `ReplicaGroup` / `ForkGroup` / `StandbyGroup` for
173    /// capability-based placement.
174    ///
175    /// `local_caps` is `CapabilitySet::default()` because the mesh
176    /// doesn't currently expose the locally-announced caps as a
177    /// readable snapshot. The only behavioral impact is that
178    /// capability-filtered daemons which could run locally won't
179    /// get `LocalPreferred` short-circuit placement — they fall
180    /// through to the `CapabilityIndex::query` path, which still
181    /// returns the local node (with the right announcements) and
182    /// places correctly. Revisit when a `MeshNode::local_caps()`
183    /// getter lands.
184    #[cfg_attr(not(feature = "groups"), allow(dead_code))]
185    scheduler: Arc<Scheduler>,
186    /// Core factory registry, keyed by `origin_hash`. `spawn` mirrors
187    /// each SDK-side kind registration into this map with the
188    /// concrete keypair attached so the migration target restores
189    /// through the existing `DaemonFactoryRegistry::construct` path.
190    factory_registry: Arc<DaemonFactoryRegistry>,
191    /// Migration orchestrator, owned by this node. Orchestrates the
192    /// 6-phase state machine when this node initiates a migration.
193    orchestrator: Arc<MigrationOrchestrator>,
194    /// Migration source handler — drives the source side when THIS
195    /// node is the source of a migration.
196    source_handler: Arc<MigrationSourceHandler>,
197    /// Migration target handler — drives the target side when THIS
198    /// node is the target of a migration.
199    target_handler: Arc<MigrationTargetHandler>,
200    /// Most recent `MigrationFailureReason` observed on the source
201    /// side for each migration, keyed by `daemon_origin`. Populated
202    /// by the dispatcher's failure callback; consumed by
203    /// `MigrationHandle::wait` to surface the reason to the caller.
204    /// Mutex because the SDK's dependency set doesn't include
205    /// `dashmap` directly, and this map sees low write frequency
206    /// (one entry per failed migration).
207    recent_failures: Mutex<HashMap<u64, MigrationFailureReason>>,
208    /// Test-only knob: when set to `true`, the readiness callback
209    /// reports "not ready" even when the runtime is in `Ready`.
210    /// Lets integration tests drive the `NotReady` retry path
211    /// without racing against runtime startup. Defaults to `false`;
212    /// production code should not touch it.
213    simulate_not_ready: AtomicBool,
214    /// Test-only stall injected into `spawn` between the
215    /// `require_ready` check and the registry inserts. Measured
216    /// in milliseconds; `0` = no stall (production default). See
217    /// [`DaemonRuntime::set_spawn_stall_ms`].
218    spawn_stall_ms: std::sync::atomic::AtomicU32,
219    /// Test-only stall injected into `start` between installing
220    /// the migration handler and the Registering→Ready CAS.
221    /// Measured in milliseconds; `0` = no stall.
222    start_stall_ms: std::sync::atomic::AtomicU32,
223    /// Per-origin post-delivery observers. Fired on every successful
224    /// `deliver(origin_hash, event)`, inside the registry call and
225    /// after the daemon's `process` returns Ok.
226    ///
227    /// **Why this exists.** `StandbyGroup` needs every event routed
228    /// to its active member captured in a replay buffer so a
229    /// promoted standby can reapply the window between the last
230    /// `sync_standbys` and the failure. A manual `on_event_delivered`
231    /// hook relied on the caller pairing every `deliver` with the
232    /// buffering call, which silently lost events on omission.
233    /// Observers close that gap: a group installs a weak-ref
234    /// closure that pushes the event into its buffer automatically,
235    /// with no contract on the caller.
236    ///
237    /// `pub(crate)` so `sdk/src/groups/*` can register; not a
238    /// stable public API. The observer list is a `Vec` so unrelated
239    /// subsystems (future audit / metrics hooks) could coexist on
240    /// the same origin without one overwriting another.
241    observers: Mutex<HashMap<u64, Vec<(u64, DeliverObserver)>>>,
242    /// Monotonic id minted by `register_deliver_observer`, used by
243    /// the returned `ObserverHandle` to identify its entry on
244    /// `Drop`/`unregister`.
245    #[cfg_attr(not(feature = "groups"), allow(dead_code))]
246    observer_id_counter: std::sync::atomic::AtomicU64,
247}
248
249/// A post-delivery observer closure registered against an
250/// `origin_hash`. Fires on every successful `DaemonRuntime::deliver`
251/// for that origin.
252///
253/// Observers MUST be cheap — they run on the delivery thread, after
254/// the daemon's `process` but before `deliver` returns. A slow or
255/// panicking observer stalls delivery. The in-tree use (standby
256/// replay buffer) is a single `VecDeque::push_back` under a
257/// short-lived lock; new observers should aim for similar.
258pub(crate) type DeliverObserver = Arc<dyn Fn(&CausalEvent) + Send + Sync>;
259
260/// Handle returned by
261/// [`DaemonRuntime::register_deliver_observer`]. Dropping the
262/// handle removes the observer from the runtime's per-origin
263/// observer list. Safe to drop after the runtime itself has been
264/// dropped: the `Weak` upgrade in `Drop` silently no-ops.
265#[cfg_attr(not(feature = "groups"), allow(dead_code))]
266pub(crate) struct ObserverHandle {
267    runtime: std::sync::Weak<Inner>,
268    origin: u64,
269    id: u64,
270}
271
272impl Drop for ObserverHandle {
273    fn drop(&mut self) {
274        if let Some(inner) = self.runtime.upgrade() {
275            if let Ok(mut map) = inner.observers.lock() {
276                if let Some(v) = map.get_mut(&self.origin) {
277                    v.retain(|(id, _)| *id != self.id);
278                    if v.is_empty() {
279                        map.remove(&self.origin);
280                    }
281                }
282            }
283        }
284    }
285}
286
287impl DaemonRuntime {
288    /// Attach a runtime to an existing [`Mesh`]. Stage 1 does not
289    /// consume the `Mesh` — users keep their `Arc<Mesh>` for channel
290    /// registration, subscription, and the rest of the non-compute
291    /// surface. Stage 2 will install the migration subprotocol
292    /// handler when [`Self::start`] runs; until then inbound
293    /// migration messages (if any) are silently dropped by the core,
294    /// same as today.
295    pub fn new(mesh: Arc<Mesh>) -> Self {
296        let local_node_id = mesh.inner().node_id();
297        let registry = Arc::new(DaemonRegistry::new());
298        let factory_registry = Arc::new(DaemonFactoryRegistry::new());
299        let source_handler = Arc::new(MigrationSourceHandler::new(registry.clone()));
300        // Wire `source_handler` into the orchestrator so
301        // local-source migrations register the migration in the
302        // source-side handler — without this, post-snapshot events
303        // are silently mutated into the source daemon's state and
304        // lost at cutover. See
305        // `MigrationOrchestrator::with_source_handler`.
306        let orchestrator = Arc::new(
307            MigrationOrchestrator::new(registry.clone(), local_node_id)
308                .with_source_handler(source_handler.clone()),
309        );
310        let target_handler = Arc::new(MigrationTargetHandler::new_with_factories(
311            registry.clone(),
312            factory_registry.clone(),
313        ));
314        // Scheduler shares the mesh's `CapabilityIndex`, so
315        // announcements the mesh publishes become visible to
316        // placement queries immediately. `CapabilitySet::default()`
317        // for `local_caps` is a known gap — see the `scheduler`
318        // field's docstring on [`Inner`].
319        let scheduler = Arc::new(Scheduler::new(
320            mesh.inner().capability_index().clone(),
321            local_node_id,
322            CapabilitySet::default(),
323        ));
324        Self {
325            inner: Arc::new(Inner {
326                mesh,
327                state: AtomicU8::new(State::Registering as u8),
328                factories: RwLock::new(HashMap::new()),
329                registry,
330                scheduler,
331                factory_registry,
332                orchestrator,
333                source_handler,
334                target_handler,
335                recent_failures: Mutex::new(HashMap::new()),
336                simulate_not_ready: AtomicBool::new(false),
337                spawn_stall_ms: std::sync::atomic::AtomicU32::new(0),
338                start_stall_ms: std::sync::atomic::AtomicU32::new(0),
339                observers: Mutex::new(HashMap::new()),
340                observer_id_counter: std::sync::atomic::AtomicU64::new(1),
341            }),
342        }
343    }
344
345    /// Register a post-delivery observer for `origin_hash`. Every
346    /// successful `deliver(origin_hash, event)` fires the closure
347    /// after the daemon's `process` returns Ok. Returns an
348    /// [`ObserverHandle`] whose `Drop` unregisters the observer.
349    ///
350    /// `pub(crate)` — only the SDK's own group wrappers use this.
351    /// See the `observers` field on [`Inner`] for the design rationale.
352    #[cfg_attr(not(feature = "groups"), allow(dead_code))]
353    pub(crate) fn register_deliver_observer(
354        &self,
355        origin: u64,
356        cb: DeliverObserver,
357    ) -> ObserverHandle {
358        let id = self
359            .inner
360            .observer_id_counter
361            .fetch_add(1, Ordering::Relaxed);
362        {
363            let mut map = self
364                .inner
365                .observers
366                .lock()
367                .expect("DaemonRuntime observers mutex poisoned");
368            map.entry(origin).or_default().push((id, cb));
369        }
370        ObserverHandle {
371            runtime: Arc::downgrade(&self.inner),
372            origin,
373            id,
374        }
375    }
376
377    /// Register a factory for a daemon type. `kind` is a user-chosen
378    /// string shared across every node that may host this daemon.
379    /// Second registrations of the same `kind` return
380    /// [`DaemonError::FactoryAlreadyRegistered`].
381    ///
382    /// Valid in both `Registering` and `Ready` states; the runtime
383    /// permits new kinds to appear at runtime. Only `ShuttingDown`
384    /// rejects.
385    ///
386    /// # Migration targeting
387    ///
388    /// `register_factory` alone is **not sufficient** to accept
389    /// inbound migrations — it registers the kind-to-closure mapping
390    /// only on the SDK side. The core migration dispatcher looks up
391    /// factories by `origin_hash` (the daemon's identity), not by
392    /// `kind`, because the migration wire protocol doesn't carry a
393    /// kind string; the target couldn't pick the right factory from
394    /// an inbound snapshot without an explicit binding.
395    ///
396    /// To accept migrations for a specific daemon, the target must
397    /// ALSO call one of:
398    ///
399    /// - [`Self::expect_migration`] `(kind, origin_hash, config)` —
400    ///   placeholder factory keyed by `origin_hash`; the envelope
401    ///   on the snapshot supplies the keypair at restore time.
402    ///   This is the common case.
403    /// - [`Self::register_migration_target_identity`] `(kind,
404    ///   identity, config)` — pre-provisions the keypair as a
405    ///   fallback when the source migrates with
406    ///   `transport_identity: false`.
407    ///
408    /// Or spawn the daemon locally first (via [`Self::spawn`]);
409    /// spawn seeds both the SDK map and the core registry, so a
410    /// daemon that migrated out and migrates back in on the same
411    /// node is covered without extra calls.
412    pub fn register_factory<F>(&self, kind: &str, factory: F) -> Result<(), DaemonError>
413    where
414        F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
415    {
416        if self.state() == State::ShuttingDown {
417            return Err(DaemonError::ShuttingDown);
418        }
419        // `contains_key` + `insert` would be atomic under the
420        // single `write` guard, but the `entry` API makes
421        // atomicity self-evident in one expression — no opening
422        // for a future reviewer to wonder whether the check and
423        // the insert can drift across two separately-acquired
424        // guards.
425        let mut map = self.inner.factories.write().expect("factory map poisoned");
426        match map.entry(kind.to_string()) {
427            std::collections::hash_map::Entry::Occupied(_) => {
428                Err(DaemonError::FactoryAlreadyRegistered(kind.to_string()))
429            }
430            std::collections::hash_map::Entry::Vacant(slot) => {
431                slot.insert(Arc::new(factory));
432                Ok(())
433            }
434        }
435    }
436
437    /// Promote to `Ready`. Idempotent — a second call on an already-
438    /// `Ready` runtime is a no-op; a call on a `ShuttingDown` runtime
439    /// returns [`DaemonError::ShuttingDown`].
440    ///
441    /// Wires the migration subprotocol (`0x0500`) handler into the
442    /// mesh so inbound `TakeSnapshot` / `SnapshotReady` / etc.
443    /// messages reach the orchestrator / source / target handlers
444    /// owned by this runtime. Installing is idempotent w.r.t.
445    /// multiple `start` calls — the `ArcSwapOption` on the mesh
446    /// swaps the same handler in on each call.
447    pub async fn start(&self) -> Result<(), DaemonError> {
448        loop {
449            match self.state() {
450                State::Registering => {
451                    // Install the migration subprotocol handler
452                    // **before** publishing `Ready`. Other threads
453                    // that observe `Ready` must be able to rely on
454                    // the handler being live: the previous ordering
455                    // (CAS → install) left a window where a
456                    // concurrent caller read `Ready`, began a
457                    // migration, and sent `SnapshotReady` onto a
458                    // mesh whose handler slot was still empty —
459                    // the dispatcher's no-handler fallback would
460                    // synthesise `ComputeNotSupported`, aborting
461                    // the migration nondeterministically during
462                    // startup.
463                    //
464                    // Double-install is safe: `set_migration_handler`
465                    // is an `ArcSwap` store, so if two concurrent
466                    // `start()`s both reach this point the later
467                    // store just wins and the CAS picks one caller
468                    // to return first. Both built handlers are
469                    // functionally equivalent (same registry,
470                    // orchestrator, hooks).
471                    let handler = Arc::new(self.build_migration_handler());
472                    self.inner.mesh.inner().set_migration_handler(handler);
473
474                    // Test-only stall between install and CAS so
475                    // integration tests can race `shutdown` in. In
476                    // production the atomic load is 0 and this is
477                    // a no-op.
478                    let stall_ms = self.inner.start_stall_ms.load(Ordering::Acquire);
479                    if stall_ms > 0 {
480                        tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
481                    }
482
483                    let swap = self.inner.state.compare_exchange(
484                        State::Registering as u8,
485                        State::Ready as u8,
486                        Ordering::AcqRel,
487                        Ordering::Acquire,
488                    );
489                    if swap.is_ok() {
490                        return Ok(());
491                    }
492                    // Lost the CAS. State is now either `Ready`
493                    // (another `start` caller beat us) or
494                    // `ShuttingDown` (a concurrent `shutdown`
495                    // raced past our install). Handle the second
496                    // case explicitly — otherwise a torn-down
497                    // runtime would leave a live handler on the
498                    // mesh, accepting migration traffic and firing
499                    // callbacks against stale registry state.
500                    //
501                    // Under the `Ready` case we leave the handler
502                    // installed: the winning `start` caller also
503                    // installed an equivalent handler (same
504                    // registry, orchestrator, hooks), so whichever
505                    // `ArcSwap` store lands last is indistinguishable
506                    // from the winner's.
507                    if self.state() == State::ShuttingDown {
508                        self.inner.mesh.inner().clear_migration_handler();
509                        return Err(DaemonError::ShuttingDown);
510                    }
511                    // CAS lost to another `start` that won the
512                    // race — loop to re-classify and return Ok on
513                    // the `Ready` arm.
514                }
515                State::Ready => return Ok(()),
516                State::ShuttingDown => return Err(DaemonError::ShuttingDown),
517            }
518        }
519    }
520
521    /// Construct the migration subprotocol handler with every
522    /// hook wired — identity context, channel-rebind replay,
523    /// unsubscribe teardown, readiness predicate, failure
524    /// observer. Extracted so [`Self::start`] can install it
525    /// before the atomic state flip (race fix) without burying
526    /// the construction inline.
527    fn build_migration_handler(&self) -> MigrationSubprotocolHandler {
528        let local_node_id = self.inner.mesh.inner().node_id();
529        // Ask the core crate to build the context. The Noise static
530        // private key is captured inside closures the core owns and
531        // never crosses this boundary as raw bytes — see
532        // `MeshNode::migration_identity_context`.
533        let ctx = self.inner.mesh.inner().migration_identity_context();
534        let inner_for_rebind = self.inner.clone();
535        let post_restore: PostRestoreCallback = Arc::new(move |origin_hash: u64| {
536            let inner = inner_for_rebind.clone();
537            tokio::spawn(async move {
538                replay_subscriptions(inner, origin_hash).await;
539            });
540        });
541        let inner_for_teardown = self.inner.clone();
542        let pre_cleanup: PreCleanupCallback = Arc::new(move |origin_hash: u64| {
543            // Snapshot the ledger BEFORE cleanup drops the host —
544            // after that, the ledger is gone. Spawn async
545            // unsubscribes so the dispatcher thread returns
546            // immediately.
547            let bindings = inner_for_teardown
548                .registry
549                .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
550                .unwrap_or_default();
551            if bindings.is_empty() {
552                return;
553            }
554            let inner = inner_for_teardown.clone();
555            tokio::spawn(async move {
556                teardown_subscriptions(inner, bindings).await;
557            });
558        });
559        let inner_for_readiness = self.inner.clone();
560        let readiness: ReadinessCallback = Arc::new(move || {
561            // Test-only: `simulate_not_ready` flips the predicate
562            // to false regardless of the underlying lifecycle
563            // state. Honour it first so integration tests can
564            // drive the NotReady retry path.
565            if inner_for_readiness
566                .simulate_not_ready
567                .load(Ordering::Acquire)
568            {
569                return false;
570            }
571            inner_for_readiness.state.load(Ordering::Acquire) == State::Ready as u8
572        });
573        let inner_for_failure = self.inner.clone();
574        let failure: FailureCallback =
575            Arc::new(move |origin_hash: u64, reason: MigrationFailureReason| {
576                if let Ok(mut map) = inner_for_failure.recent_failures.lock() {
577                    map.insert(origin_hash, reason);
578                }
579            });
580        MigrationSubprotocolHandler::with_hooks(
581            self.inner.orchestrator.clone(),
582            self.inner.source_handler.clone(),
583            self.inner.target_handler.clone(),
584            local_node_id,
585            MigrationHandlerHooks {
586                identity: Some(ctx),
587                post_restore: Some(post_restore),
588                pre_cleanup: Some(pre_cleanup),
589                readiness: Some(readiness),
590                failure: Some(failure),
591            },
592        )
593    }
594
595    /// Tear down the runtime. Unregisters every local daemon host,
596    /// clears the factory registry, and transitions state to
597    /// `ShuttingDown`. Subsequent calls on this runtime fail with
598    /// [`DaemonError::ShuttingDown`]. A second `shutdown` is a no-op.
599    pub async fn shutdown(&self) -> Result<(), DaemonError> {
600        // Mark ShuttingDown first so new spawns / registrations
601        // immediately short-circuit. The store isn't a CAS — a
602        // `ShuttingDown → ShuttingDown` re-store is cheap and
603        // benign.
604        self.inner
605            .state
606            .store(State::ShuttingDown as u8, Ordering::Release);
607
608        // Drain the registry. `list()` snapshots origin_hashes under
609        // its internal read guard; iterating is safe because we own
610        // the unregister path.
611        let origins: Vec<u64> = self
612            .inner
613            .registry
614            .list()
615            .into_iter()
616            .map(|(origin, _)| origin)
617            .collect();
618        for origin in origins {
619            let _ = self.inner.registry.unregister(origin);
620            self.inner.factory_registry.remove(origin);
621        }
622        // Drop any leftover migration-failure entries so they
623        // don't count against the process's memory footprint after
624        // shutdown. The runtime is permanently non-functional at
625        // this point, so no one will consume them.
626        if let Ok(mut map) = self.inner.recent_failures.lock() {
627            map.clear();
628        }
629        // Uninstall the migration subprotocol handler. The
630        // handler carries `Arc` clones into our `Inner` — leaving
631        // it installed keeps the runtime's internals alive via
632        // the mesh even after we've drained every registry, and
633        // would accept inbound migration traffic that now
634        // unconditionally fails (empty registry). The happy-path
635        // teardown should leave the mesh in the same shape it
636        // had before `start()`.
637        self.inner.mesh.inner().clear_migration_handler();
638        Ok(())
639    }
640
641    /// **Test-only.** Force the readiness predicate seen by the
642    /// migration dispatcher to return `false` regardless of
643    /// lifecycle state — simulates a target that's still in
644    /// `Registering` even after `start()` has run. Lets
645    /// integration tests exercise the `NotReady` retry path
646    /// without racing against runtime startup.
647    ///
648    /// No effect on `is_ready()` or `spawn` / `stop` — those use
649    /// the underlying `state` directly. Only the dispatcher's
650    /// readiness predicate is affected.
651    pub fn simulate_not_ready(&self, flag: bool) {
652        self.inner.simulate_not_ready.store(flag, Ordering::Release);
653    }
654
655    /// **Test-only.** Inject a sleep inside `spawn` between the
656    /// initial `require_ready` check and the registry inserts,
657    /// giving integration tests a deterministic window to race
658    /// `shutdown` against. Duration is stored as millis; `0`
659    /// disables.
660    #[doc(hidden)]
661    pub fn set_spawn_stall_ms(&self, millis: u32) {
662        self.inner.spawn_stall_ms.store(millis, Ordering::Release);
663    }
664
665    /// **Test-only.** Inject a sleep inside `start` between the
666    /// handler install and the Registering→Ready CAS. Used to
667    /// deterministically race `shutdown` against a mid-flight
668    /// `start` so tests can verify the handler-cleanup path.
669    #[doc(hidden)]
670    pub fn set_start_stall_ms(&self, millis: u32) {
671        self.inner.start_stall_ms.store(millis, Ordering::Release);
672    }
673
674    /// Readiness accessor for tests + operators. `true` iff the
675    /// runtime has transitioned to `Ready` and has not yet begun
676    /// shutting down.
677    pub fn is_ready(&self) -> bool {
678        self.state() == State::Ready
679    }
680
681    /// Spawn a daemon of `kind` under the caller-provided
682    /// [`Identity`]. The identity's keypair seeds the daemon's
683    /// `origin_hash` + `entity_id`; the runtime registers both the
684    /// live host and a kind-keyed factory in the core registry so a
685    /// future migration target can reconstruct the daemon through
686    /// the existing `DaemonFactoryRegistry::construct` path.
687    ///
688    /// The returned [`DaemonHandle`] is clone-safe; dropping it does
689    /// not stop the daemon. Call [`Self::stop`] explicitly.
690    pub async fn spawn(
691        &self,
692        kind: &str,
693        identity: Identity,
694        config: DaemonHostConfig,
695    ) -> Result<DaemonHandle, DaemonError> {
696        self.require_ready()?;
697        // Test-only stall between the readiness check and the
698        // registry inserts. In production `spawn_stall_ms` is
699        // always 0 and this branch is a no-op.
700        let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
701        if stall_ms > 0 {
702            tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
703        }
704        let factory = self.factory_for_kind(kind)?;
705        let daemon = (factory)();
706        let keypair = identity.keypair().as_ref().clone();
707        let origin_hash = keypair.origin_hash();
708        let entity_id = keypair.entity_id().clone();
709
710        // Mirror the factory into the core registry BEFORE registering
711        // the host, so a migration-target handler that catches up on
712        // this origin mid-spawn always sees a consistent view. Atomic
713        // on collision: if another daemon already claims this
714        // `origin_hash`, bail without mutating either registry —
715        // otherwise the rollback below would strip the factory entry
716        // for the *existing* daemon and silently break its future
717        // migratability.
718        let factory_for_core = factory.clone();
719        self.inner
720            .factory_registry
721            .register(keypair.clone(), config.clone(), move || {
722                (factory_for_core)()
723            })
724            .map_err(DaemonError::Core)?;
725
726        let host = DaemonHost::new(daemon, keypair, config);
727        // `DaemonRegistry::register` errors on origin_hash collisions
728        // — two daemons can't share the same identity.
729        //
730        // Rolling back our `factory_registry` insert is safe
731        // because `factory_registry::register` is atomic on
732        // collision: since our call above *succeeded*, the slot
733        // was empty and we exclusively own it. No other code path
734        // replaces an occupied slot (`register` /
735        // `register_placeholder` both error on collision;
736        // `remove` / `take` only remove their caller's own
737        // entries), so the entry we're about to remove is still
738        // ours. The rollback cannot affect a pre-existing
739        // placeholder or another daemon's factory entry.
740        if let Err(e) = self.inner.registry.register(host) {
741            self.inner.factory_registry.remove(origin_hash);
742            return Err(DaemonError::Core(e));
743        }
744
745        // Post-insert fence: `shutdown` may have raced past our
746        // initial `require_ready()` and already swept the
747        // registries. In that case our entries are either already
748        // gone (shutdown saw them in its sweep) or will outlive
749        // shutdown's sweep (we inserted after `list()`). Roll back
750        // unconditionally under the ShuttingDown branch so no
751        // zombie daemon survives the torn-down runtime; the
752        // rollback's `unregister` / `remove` calls are idempotent
753        // no-ops if shutdown already drained our slot.
754        if self.state() == State::ShuttingDown {
755            let _ = self.inner.registry.unregister(origin_hash);
756            self.inner.factory_registry.remove(origin_hash);
757            return Err(DaemonError::ShuttingDown);
758        }
759
760        Ok(DaemonHandle {
761            origin_hash,
762            entity_id,
763            inner: self.inner.clone(),
764        })
765    }
766
767    /// Spawn a daemon with a caller-supplied `MeshDaemon` instance,
768    /// bypassing the SDK-side kind-factory lookup.
769    ///
770    /// Used by language-binding layers (currently: the NAPI `compute`
771    /// module) that build daemon instances via cross-FFI dispatch —
772    /// the factory closure for such a daemon can't be a plain
773    /// `Fn() -> Box<dyn MeshDaemon>` because constructing the
774    /// daemon requires an awaitable call into the host language.
775    /// The binding does the await itself, hands in the resulting
776    /// `Box<dyn MeshDaemon>`, and this method does the rest of
777    /// what [`Self::spawn`] does: register the `(origin_hash →
778    /// kind-factory)` mirror in the core registry so future
779    /// migrations can reconstruct the daemon, insert the host,
780    /// and run the same shutdown-race fence.
781    ///
782    /// `kind_factory` is the closure the core registry stores for
783    /// migration-target reconstruction; it must be re-callable
784    /// (migration targets call it when they restore the daemon
785    /// on another node). Bindings typically build this by cloning
786    /// the same TSFN used for the initial spawn.
787    pub async fn spawn_with_daemon<F>(
788        &self,
789        identity: Identity,
790        config: DaemonHostConfig,
791        daemon: Box<dyn MeshDaemon>,
792        kind_factory: F,
793    ) -> Result<DaemonHandle, DaemonError>
794    where
795        F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
796    {
797        self.require_ready()?;
798        // Same test-only stall as `spawn` — tests race shutdown
799        // against this path too.
800        let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
801        if stall_ms > 0 {
802            tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
803        }
804        let keypair = identity.keypair().as_ref().clone();
805        let origin_hash = keypair.origin_hash();
806        let entity_id = keypair.entity_id().clone();
807
808        // Mirror the caller-supplied kind_factory into the core
809        // registry. Same atomic-register semantics as `spawn` —
810        // if another daemon already claims this origin_hash, bail
811        // without touching state.
812        self.inner
813            .factory_registry
814            .register(keypair.clone(), config.clone(), kind_factory)
815            .map_err(DaemonError::Core)?;
816
817        let host = DaemonHost::new(daemon, keypair, config);
818        if let Err(e) = self.inner.registry.register(host) {
819            self.inner.factory_registry.remove(origin_hash);
820            return Err(DaemonError::Core(e));
821        }
822
823        // Post-insert shutdown fence, matching `spawn`. Without
824        // this, a concurrent `shutdown()` that raced past our
825        // `require_ready` check and already swept the registries
826        // would leave a zombie daemon live in the torn-down
827        // runtime.
828        if self.state() == State::ShuttingDown {
829            let _ = self.inner.registry.unregister(origin_hash);
830            self.inner.factory_registry.remove(origin_hash);
831            return Err(DaemonError::ShuttingDown);
832        }
833
834        Ok(DaemonHandle {
835            origin_hash,
836            entity_id,
837            inner: self.inner.clone(),
838        })
839    }
840
841    /// Spawn a daemon from a caller-supplied instance and restore its
842    /// state from `snapshot`, bypassing the SDK-side kind-factory
843    /// lookup. Parallels [`Self::spawn_with_daemon`] for restore.
844    ///
845    /// Used by language-binding layers whose daemons are built via
846    /// cross-FFI dispatch — construction goes through the host
847    /// language, then the binding hands the built `Box<dyn MeshDaemon>`
848    /// (already wired to its TSFN bridge) plus the `kind_factory`
849    /// closure used by the core registry for migration-target
850    /// reconstruction.
851    pub async fn spawn_from_snapshot_with_daemon<F>(
852        &self,
853        identity: Identity,
854        snapshot: StateSnapshot,
855        config: DaemonHostConfig,
856        daemon: Box<dyn MeshDaemon>,
857        kind_factory: F,
858    ) -> Result<DaemonHandle, DaemonError>
859    where
860        F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
861    {
862        self.require_ready()?;
863        let keypair = identity.keypair().as_ref().clone();
864        let origin_hash = keypair.origin_hash();
865        let entity_id = keypair.entity_id().clone();
866
867        // Full `entity_id` comparison — see `spawn_from_snapshot` for
868        // the birthday-collision rationale.
869        if snapshot.entity_id != entity_id {
870            return Err(DaemonError::SnapshotIdentityMismatch {
871                snapshot: snapshot.entity_id.origin_hash(),
872                identity: origin_hash,
873            });
874        }
875
876        self.inner
877            .factory_registry
878            .register(keypair.clone(), config.clone(), kind_factory)
879            .map_err(DaemonError::Core)?;
880
881        let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
882            Ok(h) => h,
883            Err(e) => {
884                self.inner.factory_registry.remove(origin_hash);
885                return Err(DaemonError::Core(e));
886            }
887        };
888
889        if let Err(e) = self.inner.registry.register(host) {
890            self.inner.factory_registry.remove(origin_hash);
891            return Err(DaemonError::Core(e));
892        }
893
894        if self.state() == State::ShuttingDown {
895            let _ = self.inner.registry.unregister(origin_hash);
896            self.inner.factory_registry.remove(origin_hash);
897            return Err(DaemonError::ShuttingDown);
898        }
899
900        Ok(DaemonHandle {
901            origin_hash,
902            entity_id,
903            inner: self.inner.clone(),
904        })
905    }
906
907    /// Spawn a daemon of `kind` and restore its state from `snapshot`.
908    /// The snapshot's `entity_id` must match the caller's
909    /// [`Identity`]; mismatch returns
910    /// [`DaemonError::SnapshotIdentityMismatch`] before any side
911    /// effects land.
912    pub async fn spawn_from_snapshot(
913        &self,
914        kind: &str,
915        identity: Identity,
916        snapshot: StateSnapshot,
917        config: DaemonHostConfig,
918    ) -> Result<DaemonHandle, DaemonError> {
919        self.require_ready()?;
920        let factory = self.factory_for_kind(kind)?;
921        let keypair = identity.keypair().as_ref().clone();
922        let origin_hash = keypair.origin_hash();
923        let entity_id = keypair.entity_id().clone();
924
925        // Compare the **full** 32-byte `entity_id`, not just the
926        // 64-bit `origin_hash` projection. `origin_hash` is a
927        // birthday-bounded 64-bit hash of the ed25519 public key;
928        // two legitimately-different identities can collide on
929        // `origin_hash` with probability ~2^-32 after ~4B daemons.
930        // A collision would let the *wrong* identity restore the
931        // snapshot, producing a daemon signed under one pubkey
932        // but claiming to be another — downstream signatures then
933        // verify against the wrong key and the daemon silently
934        // produces outputs no peer accepts.
935        if snapshot.entity_id != entity_id {
936            return Err(DaemonError::SnapshotIdentityMismatch {
937                snapshot: snapshot.entity_id.origin_hash(),
938                identity: origin_hash,
939            });
940        }
941
942        let daemon = (factory)();
943        // Atomic register: collision here means some other daemon
944        // (live or a previous spawn_from_snapshot in-flight) already
945        // owns this `origin_hash`. Bail without touching the other
946        // daemon's state — the later rollback would otherwise remove
947        // the victim's factory entry and silently break its future
948        // migratability.
949        let factory_for_core = factory.clone();
950        self.inner
951            .factory_registry
952            .register(keypair.clone(), config.clone(), move || {
953                (factory_for_core)()
954            })
955            .map_err(DaemonError::Core)?;
956
957        let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
958            Ok(h) => h,
959            Err(e) => {
960                // Same rollback-safety argument as in `spawn`:
961                // atomic `factory_registry::register` above means
962                // we exclusively own the slot, and nothing else
963                // can replace an occupied slot. Removing here
964                // cleans up strictly our insert — an adjacent
965                // pre-existing placeholder (e.g. from
966                // `expect_migration`) would have made our
967                // register fail atomically upstream, and we'd
968                // never have reached this branch.
969                self.inner.factory_registry.remove(origin_hash);
970                return Err(DaemonError::Core(e));
971            }
972        };
973
974        if let Err(e) = self.inner.registry.register(host) {
975            // Same ownership argument — the atomic register
976            // above means we own this slot exclusively.
977            self.inner.factory_registry.remove(origin_hash);
978            return Err(DaemonError::Core(e));
979        }
980
981        // Post-insert fence against a concurrent `shutdown` — see
982        // the matching comment in `spawn`.
983        if self.state() == State::ShuttingDown {
984            let _ = self.inner.registry.unregister(origin_hash);
985            self.inner.factory_registry.remove(origin_hash);
986            return Err(DaemonError::ShuttingDown);
987        }
988
989        Ok(DaemonHandle {
990            origin_hash,
991            entity_id,
992            inner: self.inner.clone(),
993        })
994    }
995
996    /// Stop a daemon, removing it from the runtime's registry. Valid
997    /// while `Ready` and (idempotently) during `ShuttingDown` —
998    /// `ShuttingDown` paths through here are a no-op because the
999    /// shutdown sweep has already drained the registry.
1000    pub async fn stop(&self, origin_hash: u64) -> Result<(), DaemonError> {
1001        if self.state() == State::Registering {
1002            return Err(DaemonError::NotReady);
1003        }
1004        // Treat a missing daemon as success during ShuttingDown —
1005        // the shutdown sweep drained it.
1006        match self.inner.registry.unregister(origin_hash) {
1007            Ok(_) => {
1008                self.inner.factory_registry.remove(origin_hash);
1009                Ok(())
1010            }
1011            Err(CoreDaemonError::NotFound(_)) if self.state() == State::ShuttingDown => Ok(()),
1012            Err(e) => Err(DaemonError::Core(e)),
1013        }
1014    }
1015
1016    /// Take a snapshot of a running daemon by `origin_hash`. Returns
1017    /// `Ok(None)` when the daemon is stateless.
1018    pub async fn snapshot(&self, origin_hash: u64) -> Result<Option<StateSnapshot>, DaemonError> {
1019        self.require_ready()?;
1020        self.inner
1021            .registry
1022            .snapshot(origin_hash)
1023            .map_err(DaemonError::Core)
1024    }
1025
1026    /// Deliver one causal event to the daemon identified by
1027    /// `origin_hash`, returning the daemon's outputs wrapped in the
1028    /// host's causal chain.
1029    ///
1030    /// Stage 1 convenience — Stage 2 adds mesh-dispatched delivery
1031    /// via the causal subprotocol, and this direct path becomes
1032    /// testing sugar rather than the primary ingress.
1033    pub fn deliver(
1034        &self,
1035        origin_hash: u64,
1036        event: &CausalEvent,
1037    ) -> Result<Vec<CausalEvent>, DaemonError> {
1038        self.require_ready()?;
1039        let outputs = self
1040            .inner
1041            .registry
1042            .deliver(origin_hash, event)
1043            .map_err(DaemonError::Core)?;
1044
1045        // Fire post-delivery observers (e.g. `StandbyGroup`'s replay
1046        // buffer). Observers run AFTER a successful `process` so
1047        // replay on promotion doesn't re-apply events the active
1048        // rejected. Snapshot the observer list into a local `Vec`
1049        // so each callback runs without the map's mutex held —
1050        // prevents a misbehaving observer from blocking unrelated
1051        // deliveries and rules out re-entrant-lock deadlocks if an
1052        // observer ever calls back into `deliver`.
1053        let to_fire: Vec<DeliverObserver> = {
1054            let map = self
1055                .inner
1056                .observers
1057                .lock()
1058                .expect("DaemonRuntime observers mutex poisoned");
1059            map.get(&origin_hash)
1060                .map(|v| v.iter().map(|(_, cb)| cb.clone()).collect())
1061                .unwrap_or_default()
1062        };
1063        for cb in to_fire {
1064            cb(event);
1065        }
1066        Ok(outputs)
1067    }
1068
1069    /// Number of daemons currently registered.
1070    pub fn daemon_count(&self) -> usize {
1071        self.inner.registry.count()
1072    }
1073
1074    /// Current orchestrator-side migration phase for `origin_hash`,
1075    /// or `None` when no migration record exists (either never
1076    /// started here or already reached its terminal state and was
1077    /// removed). Useful for tests that assert the migration reached
1078    /// true completion (record gone via `ActivateAck`) rather than
1079    /// simply advancing to the `Complete` phase.
1080    pub fn migration_phase(&self, origin_hash: u64) -> Option<MigrationPhase> {
1081        self.inner.orchestrator.status(origin_hash)
1082    }
1083
1084    /// **Test-only.** Peek at the cached failure reason for
1085    /// `origin_hash` without consuming it.
1086    ///
1087    /// Normal `MigrationHandle::wait` code path pops the entry when
1088    /// it hits status=None; this accessor exists so regression tests
1089    /// can observe the cache's lifecycle directly (e.g. assert the
1090    /// cache is cleared by `start_migration_with`).
1091    ///
1092    /// Exposed publicly because SDK integration tests live in a
1093    /// separate crate; not part of the stable surface.
1094    #[doc(hidden)]
1095    pub fn peek_migration_failure(&self, origin_hash: u64) -> Option<MigrationFailureReason> {
1096        self.inner
1097            .recent_failures
1098            .lock()
1099            .ok()
1100            .and_then(|m| m.get(&origin_hash).cloned())
1101    }
1102
1103    /// **Test-only.** Inject a failure reason into the cache for
1104    /// `origin_hash`. Lets tests stage a "stale entry from a prior
1105    /// attempt" scenario deterministically, without having to run
1106    /// a whole losing migration to populate it.
1107    ///
1108    /// Same caveat as [`Self::peek_migration_failure`] — visible
1109    /// because SDK integration tests live out-of-crate; not part of
1110    /// the stable surface.
1111    #[doc(hidden)]
1112    pub fn inject_migration_failure(&self, origin_hash: u64, reason: MigrationFailureReason) {
1113        if let Ok(mut m) = self.inner.recent_failures.lock() {
1114            m.insert(origin_hash, reason);
1115        }
1116    }
1117
1118    /// Snapshot the daemon's subscription ledger — a cloned view of
1119    /// every `(publisher, channel)` pair the daemon has subscribed
1120    /// to via [`Self::subscribe_channel`]. Used by the migration
1121    /// target path to drive replay and by tests / operators to
1122    /// observe what a daemon is subscribed to.
1123    pub fn subscriptions(&self, origin_hash: u64) -> Result<Vec<SubscriptionBinding>, DaemonError> {
1124        self.inner
1125            .registry
1126            .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1127            .map_err(DaemonError::Core)
1128    }
1129
1130    /// Subscribe a specific daemon to a channel on a remote
1131    /// publisher. Routes through the mesh's membership subprotocol
1132    /// and **records the subscription in the daemon's ledger** so
1133    /// a migration target can replay it after cutover. Users should
1134    /// use this method (rather than reaching through
1135    /// `rt.mesh().inner().subscribe_channel_*`) for daemon-owned
1136    /// subscriptions; otherwise the subscription travels with the
1137    /// node, not the daemon, and silently drops on migration.
1138    ///
1139    /// Flow:
1140    /// 1. Hit the publisher's membership endpoint via
1141    ///    `Mesh::subscribe_channel_with_token` (or the
1142    ///    no-token variant).
1143    /// 2. On success, record
1144    ///    `(publisher, channel) → SubscriptionBinding` in the
1145    ///    host's ledger.
1146    /// 3. On wire failure, no ledger mutation.
1147    ///
1148    /// `token` is the caller-owned [`PermissionToken`] for
1149    /// token-gated channels; `None` for open channels.
1150    pub async fn subscribe_channel(
1151        &self,
1152        origin_hash: u64,
1153        publisher: u64,
1154        channel: ChannelName,
1155        token: Option<PermissionToken>,
1156    ) -> Result<(), DaemonError> {
1157        self.require_ready()?;
1158        if !self.inner.registry.contains(origin_hash) {
1159            return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1160        }
1161        // Capture serialized token bytes for the ledger BEFORE
1162        // handing ownership to the mesh. The mesh call consumes the
1163        // token by value on the token-path.
1164        let token_bytes = token.as_ref().map(|t| t.to_bytes().to_vec());
1165        let result = match token {
1166            Some(tok) => {
1167                self.inner
1168                    .mesh
1169                    .inner()
1170                    .subscribe_channel_with_token(publisher, channel.clone(), tok)
1171                    .await
1172            }
1173            None => {
1174                self.inner
1175                    .mesh
1176                    .inner()
1177                    .subscribe_channel(publisher, channel.clone())
1178                    .await
1179            }
1180        };
1181        result.map_err(|e| {
1182            DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1183                "subscribe_channel failed: {e}"
1184            )))
1185        })?;
1186
1187        // Mesh accepted the subscribe — record in the ledger.
1188        // `with_host` reaches into the registry's per-daemon mutex;
1189        // since this is SDK-level code we do the minimum work
1190        // under the lock (a single DashMap insert).
1191        if let Err(e) = self.inner.registry.with_host(origin_hash, |host| {
1192            host.record_subscription(publisher, channel, token_bytes);
1193        }) {
1194            return Err(DaemonError::Core(e));
1195        }
1196        Ok(())
1197    }
1198
1199    /// Unsubscribe a specific daemon from a channel. Symmetric to
1200    /// [`Self::subscribe_channel`]: mesh wire call first, then
1201    /// ledger update.
1202    pub async fn unsubscribe_channel(
1203        &self,
1204        origin_hash: u64,
1205        publisher: u64,
1206        channel: ChannelName,
1207    ) -> Result<(), DaemonError> {
1208        self.require_ready()?;
1209        if !self.inner.registry.contains(origin_hash) {
1210            return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1211        }
1212        self.inner
1213            .mesh
1214            .inner()
1215            .unsubscribe_channel(publisher, channel.clone())
1216            .await
1217            .map_err(|e| {
1218                DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1219                    "unsubscribe_channel failed: {e}"
1220                )))
1221            })?;
1222        let _ = self.inner.registry.with_host(origin_hash, |host| {
1223            host.forget_subscription(publisher, &channel);
1224        });
1225        Ok(())
1226    }
1227
1228    /// Pre-register a factory on the target node keyed by the
1229    /// daemon's `origin_hash`, using the caller-supplied `Identity`
1230    /// as the **fallback** keypair.
1231    ///
1232    /// Use this when:
1233    /// - The caller genuinely has the daemon's keypair on hand
1234    ///   (typical: test harnesses that share the same
1235    ///   `Identity` between source and target runtimes).
1236    /// - Migration runs with `transport_identity = false`, so the
1237    ///   snapshot carries no envelope and the target needs a
1238    ///   matching keypair pre-provisioned.
1239    ///
1240    /// For the common envelope-transport case where the target
1241    /// doesn't know the daemon's private key ahead of time,
1242    /// prefer [`Self::expect_migration`] — it registers a
1243    /// placeholder factory keyed only on `origin_hash`, and the
1244    /// envelope in the migration snapshot supplies the real
1245    /// keypair at restore time.
1246    pub fn register_migration_target_identity(
1247        &self,
1248        kind: &str,
1249        identity: Identity,
1250        config: DaemonHostConfig,
1251    ) -> Result<(), DaemonError> {
1252        if self.state() == State::ShuttingDown {
1253            return Err(DaemonError::ShuttingDown);
1254        }
1255        let factory = self.factory_for_kind(kind)?;
1256        let keypair = identity.keypair().as_ref().clone();
1257        let origin_hash = keypair.origin_hash();
1258        let factory_clone = factory.clone();
1259        self.inner
1260            .factory_registry
1261            .register(keypair, config, move || (factory_clone)())
1262            .map_err(DaemonError::Core)?;
1263        // Post-insert fence — a concurrent `shutdown` may have
1264        // raced past the initial state check. Roll back so no
1265        // factory entry outlives the torn-down runtime.
1266        if self.state() == State::ShuttingDown {
1267            self.inner.factory_registry.remove(origin_hash);
1268            return Err(DaemonError::ShuttingDown);
1269        }
1270        Ok(())
1271    }
1272
1273    /// Declare on the target that this node expects a migration
1274    /// for `origin_hash` of the given `kind`. Registers a
1275    /// **placeholder** factory in the core registry — no matching
1276    /// keypair required, because the migration snapshot's
1277    /// [`IdentityEnvelope`](::net::adapter::net::identity::IdentityEnvelope)
1278    /// carries the real keypair and the dispatcher overrides the
1279    /// placeholder at restore time.
1280    ///
1281    /// Fails cleanly if the source migrates without an envelope
1282    /// (e.g., `MigrationOpts { transport_identity: false }`) —
1283    /// the target's factory has no keypair and the dispatcher
1284    /// emits `IdentityTransportFailed`. Use
1285    /// [`Self::register_migration_target_identity`] with a shared
1286    /// identity for the explicit public-identity-migration case.
1287    ///
1288    /// Landing this method closes the seam documented in the
1289    /// `envelope_overrides_target_placeholder_keypair` test of
1290    /// Stage 5b of the identity-migration plan — targets can now
1291    /// pre-register for a migration by `origin_hash` alone.
1292    pub fn expect_migration(
1293        &self,
1294        kind: &str,
1295        origin_hash: u64,
1296        config: DaemonHostConfig,
1297    ) -> Result<(), DaemonError> {
1298        if self.state() == State::ShuttingDown {
1299            return Err(DaemonError::ShuttingDown);
1300        }
1301        let factory = self.factory_for_kind(kind)?;
1302        let factory_clone = factory.clone();
1303        self.inner
1304            .factory_registry
1305            .register_placeholder(origin_hash, config, move || (factory_clone)())
1306            .map_err(DaemonError::Core)?;
1307        // Post-insert fence against a concurrent `shutdown` — see
1308        // the matching comment in `register_migration_target_identity`.
1309        if self.state() == State::ShuttingDown {
1310            self.inner.factory_registry.remove(origin_hash);
1311            return Err(DaemonError::ShuttingDown);
1312        }
1313        Ok(())
1314    }
1315
1316    /// Start migrating a daemon from `source_node` to `target_node`.
1317    /// The orchestrator runs on this node regardless of who owns
1318    /// the daemon — call this on whichever node wants to drive the
1319    /// migration state machine.
1320    ///
1321    /// Returns a [`MigrationHandle`] whose [`MigrationHandle::wait`]
1322    /// resolves when the migration reaches a terminal state
1323    /// (`Complete` on success, `MigrationError` on abort / failure).
1324    ///
1325    /// For the common local-source case (`source_node ==
1326    /// mesh.node_id()`), the snapshot is taken synchronously inside
1327    /// this call and `SnapshotReady` is shipped to the target. For
1328    /// a remote source, the orchestrator sends `TakeSnapshot` to
1329    /// the source and drives the rest of the state machine from
1330    /// inbound wire messages.
1331    pub async fn start_migration(
1332        &self,
1333        origin_hash: u64,
1334        source_node: u64,
1335        target_node: u64,
1336    ) -> Result<MigrationHandle, DaemonError> {
1337        self.start_migration_with(
1338            origin_hash,
1339            source_node,
1340            target_node,
1341            MigrationOpts::default(),
1342        )
1343        .await
1344    }
1345
1346    /// `start_migration` with caller-supplied options. Stage 6 of
1347    /// [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1348    /// lets the caller opt out of identity transport when the daemon
1349    /// doesn't need to sign anything on the target.
1350    pub async fn start_migration_with(
1351        &self,
1352        origin_hash: u64,
1353        source_node: u64,
1354        target_node: u64,
1355        opts: MigrationOpts,
1356    ) -> Result<MigrationHandle, DaemonError> {
1357        self.require_ready()?;
1358        // Clear any stale failure reason from a prior migration
1359        // attempt for this same `origin_hash`. Without this, if the
1360        // previous attempt's `MigrationHandle` was dropped before
1361        // `wait()` ran (or never `wait()`ed at all), the dispatcher's
1362        // failure callback left an entry in `recent_failures` that
1363        // would leak into THIS attempt's `wait()` — a successful
1364        // new migration would incorrectly surface the old reason
1365        // when `wait_one_attempt` hits its None-status branch and
1366        // pops `recent_failures`.
1367        if let Ok(mut map) = self.inner.recent_failures.lock() {
1368            map.remove(&origin_hash);
1369        }
1370        let msgs = self
1371            .inner
1372            .orchestrator
1373            .start_migration(origin_hash, source_node, target_node)
1374            .map_err(DaemonError::Migration)?;
1375
1376        // Local-source path: `start_migration` builds chunked
1377        // `SnapshotReady` messages synchronously from the local
1378        // registry. If the caller asked for identity transport, seal
1379        // the envelope HERE — the dispatcher's source-side seal only
1380        // fires on the TakeSnapshot path (remote source). Sealing
1381        // operates on the whole snapshot, so reassemble the chunks,
1382        // seal, and rechunk.
1383        let msgs = if opts.transport_identity {
1384            self.maybe_seal_chunked_snapshot(origin_hash, target_node, msgs)
1385                .await?
1386        } else {
1387            msgs
1388        };
1389
1390        // Determine dest_node from the first message variant and
1391        // send all messages in order. `start_migration` returns
1392        // `TakeSnapshot` (single message) when source is remote, or
1393        // a non-empty run of `SnapshotReady` chunks when source is
1394        // local.
1395        let dest_node = match msgs.first() {
1396            Some(MigrationMessage::TakeSnapshot { .. }) => source_node,
1397            Some(MigrationMessage::SnapshotReady { .. }) => target_node,
1398            Some(other) => {
1399                let _ = self
1400                    .inner
1401                    .orchestrator
1402                    .abort_migration(origin_hash, "unexpected initial message".into());
1403                return Err(DaemonError::Migration(MigrationError::StateFailed(
1404                    format!(
1405                        "orchestrator returned unexpected initial migration message: {:?}",
1406                        other
1407                    ),
1408                )));
1409            }
1410            None => {
1411                let _ = self
1412                    .inner
1413                    .orchestrator
1414                    .abort_migration(origin_hash, "orchestrator returned no messages".into());
1415                return Err(DaemonError::Migration(MigrationError::StateFailed(
1416                    "orchestrator returned no migration messages".into(),
1417                )));
1418            }
1419        };
1420
1421        for msg in &msgs {
1422            if let Err(e) = self.send_migration_message(dest_node, msg).await {
1423                let _ = self
1424                    .inner
1425                    .orchestrator
1426                    .abort_migration(origin_hash, format!("initial send failed: {e}"));
1427                return Err(e);
1428            }
1429        }
1430
1431        Ok(MigrationHandle {
1432            origin_hash,
1433            source_node,
1434            target_node,
1435            runtime: self.clone(),
1436            opts,
1437        })
1438    }
1439
1440    /// Decode `snapshot_bytes`, seal an identity envelope using
1441    /// the local daemon's keypair + target's X25519 static pubkey,
1442    /// and re-encode.
1443    ///
1444    /// Called only when the caller opted into envelope transport
1445    /// via `MigrationOpts { transport_identity: true }`. The
1446    /// caller committed to the stronger guarantee at that opt-in
1447    /// point; this helper must never silently downgrade.
1448    ///
1449    /// Resolution:
1450    /// - `Ok(None)`: the snapshot already carries an envelope
1451    ///   (e.g. pre-sealed upstream). Caller proceeds with the
1452    ///   existing bytes — this is not a downgrade, the envelope
1453    ///   is already there.
1454    /// - `Ok(Some(new_bytes))`: sealed successfully; caller should
1455    ///   replace the snapshot payload.
1456    /// - `Err(_)`: any missing prerequisite (peer X25519 static
1457    ///   unknown, daemon keypair absent) or seal-crypto failure.
1458    ///   The caller **must** abort — silently falling back to
1459    ///   unsealed transport would break the caller's opt-in
1460    ///   guarantee, and the target would restore under whatever
1461    ///   pre-provisioned keypair the factory registry carries
1462    ///   (possibly stale, possibly absent).
1463    ///
1464    /// The NKpsk0-responder case (target was the handshake
1465    /// responder, its peer static is not surfaced by `snow`) is a
1466    /// concrete prerequisite-missing scenario that now fails
1467    /// here. Callers in that topology should use `transport_identity:
1468    /// false` explicitly, which signals "I know identity transport
1469    /// isn't reachable; proceed unsealed."
1470    fn maybe_seal_local_snapshot(
1471        &self,
1472        daemon_origin: u64,
1473        target_node: u64,
1474        snapshot_bytes: &[u8],
1475    ) -> Result<Option<Vec<u8>>, DaemonError> {
1476        let snapshot = StateSnapshot::from_bytes(snapshot_bytes).ok_or_else(|| {
1477            DaemonError::Migration(MigrationError::StateFailed(
1478                "failed to decode local snapshot for envelope sealing".into(),
1479            ))
1480        })?;
1481        if snapshot.identity_envelope.is_some() {
1482            // Upstream already sealed — not a downgrade, the
1483            // envelope is present.
1484            return Ok(None);
1485        }
1486        let Some(target_pub) = self.inner.mesh.inner().peer_static_x25519(target_node) else {
1487            return Err(DaemonError::Migration(MigrationError::StateFailed(
1488                format!(
1489                    "identity transport requested but peer X25519 static for \
1490                 {target_node:#x} is unknown (e.g. NKpsk0-responder \
1491                 side) — cannot seal envelope; use \
1492                 `transport_identity: false` to proceed unsealed"
1493                ),
1494            )));
1495        };
1496        let Some(kp) = self.inner.registry.daemon_keypair(daemon_origin) else {
1497            return Err(DaemonError::Migration(MigrationError::StateFailed(
1498                format!(
1499                    "identity transport requested but daemon {daemon_origin:#x} has \
1500                 no local keypair to seal with"
1501                ),
1502            )));
1503        };
1504        snapshot
1505            .with_identity_envelope(&kp, target_pub)
1506            .map(|sealed| Some(sealed.to_bytes()))
1507            .map_err(|e| {
1508                DaemonError::Migration(MigrationError::StateFailed(format!(
1509                    "identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
1510                )))
1511            })
1512    }
1513
1514    /// Reassemble chunked `SnapshotReady` messages into the full
1515    /// snapshot, seal the identity envelope, and rechunk.
1516    ///
1517    /// `start_migration` returns chunked `SnapshotReady` messages on
1518    /// the local-source path; sealing operates on the whole snapshot
1519    /// so chunks must be reassembled first. If `msgs` does not start
1520    /// with `SnapshotReady` (i.e. remote-source `TakeSnapshot` path)
1521    /// the messages are returned unchanged — the dispatcher seals
1522    /// on that path.
1523    ///
1524    /// On any seal failure the orchestrator record is aborted so a
1525    /// retry starts from phase 0.
1526    async fn maybe_seal_chunked_snapshot(
1527        &self,
1528        origin_hash: u64,
1529        target_node: u64,
1530        mut msgs: Vec<MigrationMessage>,
1531    ) -> Result<Vec<MigrationMessage>, DaemonError> {
1532        if !matches!(msgs.first(), Some(MigrationMessage::SnapshotReady { .. })) {
1533            return Ok(msgs);
1534        }
1535
1536        let seq_through = match msgs.first() {
1537            Some(MigrationMessage::SnapshotReady { seq_through, .. }) => *seq_through,
1538            _ => unreachable!("checked by matches! above"),
1539        };
1540
1541        // `chunk_snapshot` emits chunks in `chunk_index` order, but
1542        // sort defensively in case a future caller hands us a
1543        // pre-mixed Vec.
1544        msgs.sort_by_key(|m| match m {
1545            MigrationMessage::SnapshotReady { chunk_index, .. } => *chunk_index,
1546            _ => 0,
1547        });
1548        let mut reassembled: Vec<u8> = Vec::new();
1549        for m in &msgs {
1550            if let MigrationMessage::SnapshotReady { snapshot_bytes, .. } = m {
1551                reassembled.extend_from_slice(snapshot_bytes);
1552            }
1553        }
1554
1555        // `transport_identity: true` is a strict opt-in:
1556        // prerequisites-missing (e.g. NKpsk0-responder) surfaces as
1557        // `Err` and aborts the migration, not a silent downgrade to
1558        // unsealed. `Ok(None)` is reserved for "snapshot already
1559        // carries an envelope" — return the original chunks.
1560        match self.maybe_seal_local_snapshot(origin_hash, target_node, &reassembled) {
1561            Ok(Some(sealed)) => chunk_snapshot(origin_hash, sealed, seq_through).map_err(|e| {
1562                let _ = self
1563                    .inner
1564                    .orchestrator
1565                    .abort_migration(origin_hash, format!("rechunk after seal failed: {e}"));
1566                DaemonError::Migration(e)
1567            }),
1568            Ok(None) => Ok(msgs),
1569            Err(e) => {
1570                let _ = self
1571                    .inner
1572                    .orchestrator
1573                    .abort_migration(origin_hash, format!("envelope seal failed: {e}"));
1574                Err(e)
1575            }
1576        }
1577    }
1578
1579    async fn send_migration_message(
1580        &self,
1581        dest_node: u64,
1582        msg: &MigrationMessage,
1583    ) -> Result<(), DaemonError> {
1584        let addr = self
1585            .inner
1586            .mesh
1587            .inner()
1588            .peer_addr(dest_node)
1589            .ok_or(DaemonError::Migration(MigrationError::TargetUnavailable(
1590                dest_node,
1591            )))?;
1592        let bytes = migration_wire::encode(msg).map_err(DaemonError::Migration)?;
1593        self.inner
1594            .mesh
1595            .inner()
1596            .send_subprotocol(addr, SUBPROTOCOL_MIGRATION, &bytes)
1597            .await
1598            .map_err(|e| {
1599                DaemonError::Migration(MigrationError::StateFailed(format!(
1600                    "send_subprotocol failed: {e}"
1601                )))
1602            })
1603    }
1604
1605    /// Underlying mesh. Exposed read-only so the caller can still
1606    /// reach the channel / subscribe / publish surface without
1607    /// reaching around the runtime.
1608    pub fn mesh(&self) -> &Arc<Mesh> {
1609        &self.inner.mesh
1610    }
1611
1612    // ------------ internal helpers ------------
1613
1614    fn state(&self) -> State {
1615        State::from_u8(self.inner.state.load(Ordering::Acquire))
1616    }
1617
1618    fn require_ready(&self) -> Result<(), DaemonError> {
1619        match self.state() {
1620            State::Ready => Ok(()),
1621            State::Registering => Err(DaemonError::NotReady),
1622            State::ShuttingDown => Err(DaemonError::ShuttingDown),
1623        }
1624    }
1625
1626    fn factory_for_kind(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1627        self.inner
1628            .factories
1629            .read()
1630            .expect("factory map poisoned")
1631            .get(kind)
1632            .cloned()
1633            .ok_or_else(|| DaemonError::FactoryNotFound(kind.to_string()))
1634    }
1635
1636    // ─── `groups` feature accessors ─────────────────────────────────
1637    //
1638    // These surface the internal `Scheduler` / `DaemonRegistry` /
1639    // factory closures that the `groups` module (ReplicaGroup /
1640    // ForkGroup / StandbyGroup) needs to wire into core group
1641    // constructors. Kept `pub(crate)` so they don't leak into the
1642    // SDK's public API — group types wrap them in their own
1643    // ergonomic surfaces and callers stay on the clean side of the
1644    // boundary.
1645
1646    /// Shared scheduler for capability-based placement.
1647    #[cfg(feature = "groups")]
1648    pub(crate) fn scheduler_arc(&self) -> Arc<Scheduler> {
1649        self.inner.scheduler.clone()
1650    }
1651
1652    /// Shared migration orchestrator. The
1653    /// `OrchestratorMigrationSnapshotSource` adapter wraps this
1654    /// `Arc` to bridge the compute-layer orchestrator's
1655    /// in-flight state into the MeshOS snapshot's
1656    /// `in_flight_migrations` field. Exposed `pub(crate)` so
1657    /// the in-crate `testing` harness builds the source
1658    /// out-of-band; the adapter type itself lives in
1659    /// `net::adapter::net::behavior::meshos::migration_snapshot_source`.
1660    #[cfg(feature = "testing")]
1661    pub(crate) fn migration_orchestrator_arc(&self) -> Arc<MigrationOrchestrator> {
1662        self.inner.orchestrator.clone()
1663    }
1664
1665    /// Shared daemon registry (group members register/unregister
1666    /// here alongside direct-spawn daemons).
1667    #[cfg(feature = "groups")]
1668    pub(crate) fn registry_arc(&self) -> Arc<DaemonRegistry> {
1669        self.inner.registry.clone()
1670    }
1671
1672    /// Look up a factory closure by kind. Used by group constructors
1673    /// to build members via the same factory the caller registered
1674    /// with `register_factory`.
1675    #[cfg(feature = "groups")]
1676    pub(crate) fn factory_for_kind_pub(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1677        self.factory_for_kind(kind)
1678    }
1679
1680    /// `true` iff the runtime is in the `Ready` state. Groups use
1681    /// this to gate `spawn` (rejecting early-spawn calls with a
1682    /// typed `GroupError::NotReady`).
1683    #[cfg(feature = "groups")]
1684    pub(crate) fn is_ready_pub(&self) -> bool {
1685        self.state() == State::Ready
1686    }
1687}
1688
1689/// Handle to a running daemon. Clone-safe; dropping does not stop
1690/// the daemon — call [`DaemonRuntime::stop`] explicitly.
1691#[derive(Clone)]
1692pub struct DaemonHandle {
1693    /// Daemon's 64-bit origin hash. Stable for the daemon's lifetime
1694    /// and across migrations.
1695    pub origin_hash: u64,
1696    /// Daemon's full 32-byte entity id.
1697    pub entity_id: EntityId,
1698    inner: Arc<Inner>,
1699}
1700
1701impl DaemonHandle {
1702    /// Read the daemon's current stats.
1703    pub fn stats(&self) -> Result<DaemonStats, DaemonError> {
1704        self.inner
1705            .registry
1706            .stats(self.origin_hash)
1707            .map_err(DaemonError::Core)
1708    }
1709
1710    /// Take a snapshot of the daemon's current state. `Ok(None)` for
1711    /// stateless daemons.
1712    pub async fn snapshot(&self) -> Result<Option<StateSnapshot>, DaemonError> {
1713        self.inner
1714            .registry
1715            .snapshot(self.origin_hash)
1716            .map_err(DaemonError::Core)
1717    }
1718}
1719
1720impl std::fmt::Debug for DaemonRuntime {
1721    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1722        let factory_count = self
1723            .inner
1724            .factories
1725            .read()
1726            .map(|m| m.len())
1727            .unwrap_or_default();
1728        f.debug_struct("DaemonRuntime")
1729            .field("state", &self.state())
1730            .field("factories", &factory_count)
1731            .field("daemons", &self.inner.registry.count())
1732            .finish()
1733    }
1734}
1735
1736impl std::fmt::Debug for DaemonHandle {
1737    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1738        f.debug_struct("DaemonHandle")
1739            .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
1740            .field("entity_id", &self.entity_id)
1741            .finish()
1742    }
1743}
1744
1745/// Stage 3 of `DAEMON_CHANNEL_REBIND_PLAN.md` — after a migration
1746/// target restores a daemon, walk its subscription ledger and
1747/// re-send each `subscribe_channel` to the matching publisher so
1748/// messages flow to the target without waiting for the source's
1749/// entry to age out of the publisher's roster. Errors are
1750/// per-subscription; one publisher being offline doesn't fail the
1751/// rest.
1752///
1753/// Runs in a tokio task spawned by the post-restore callback
1754/// installed on `MigrationSubprotocolHandler`.
1755async fn replay_subscriptions(inner: Arc<Inner>, origin_hash: u64) {
1756    let bindings = match inner
1757        .registry
1758        .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1759    {
1760        Ok(list) => list,
1761        Err(_) => return,
1762    };
1763    for sub in bindings {
1764        let token = sub
1765            .token_bytes
1766            .as_deref()
1767            .and_then(|bytes| PermissionToken::from_bytes(bytes).ok());
1768        let result = match token {
1769            Some(tok) => {
1770                inner
1771                    .mesh
1772                    .inner()
1773                    .subscribe_channel_with_token(sub.publisher, sub.channel.clone(), tok)
1774                    .await
1775            }
1776            None => {
1777                inner
1778                    .mesh
1779                    .inner()
1780                    .subscribe_channel(sub.publisher, sub.channel.clone())
1781                    .await
1782            }
1783        };
1784        if let Err(e) = result {
1785            // Non-fatal: one subscription failing (publisher
1786            // offline, token expired, etc.) must not take down the
1787            // rest of the ledger's replay. The SDK doesn't depend
1788            // on `tracing`; drop the failure to stderr via
1789            // `eprintln!` so operators running `RUST_LOG=warn`
1790            // still see it. Future work can add a `ReplayPartial`
1791            // event on the migration phase stream (plan §
1792            // *Error surface*) to surface failures programmatically.
1793            eprintln!(
1794                "channel re-bind replay failed: daemon={:#x} channel={} publisher={:#x} error={}",
1795                origin_hash, sub.channel, sub.publisher, e,
1796            );
1797        }
1798    }
1799}
1800
1801/// Stage 4 of `DAEMON_CHANNEL_REBIND_PLAN.md` — fires at `Cutover`
1802/// on the source node (just before daemon cleanup). Walks the
1803/// daemon's ledger and sends `unsubscribe_channel` to each
1804/// publisher so rosters drop the source without waiting for
1805/// session-timeout (~30 s). Fire-and-forget: we don't block the
1806/// cutover dispatch on acks.
1807async fn teardown_subscriptions(inner: Arc<Inner>, bindings: Vec<SubscriptionBinding>) {
1808    for sub in bindings {
1809        if let Err(e) = inner
1810            .mesh
1811            .inner()
1812            .unsubscribe_channel(sub.publisher, sub.channel.clone())
1813            .await
1814        {
1815            // Non-fatal; the publisher's session timeout will
1816            // eventually clean up our stale roster entry even
1817            // without an explicit Unsubscribe.
1818            eprintln!(
1819                "channel re-bind teardown failed: channel={} publisher={:#x} error={}",
1820                sub.channel, sub.publisher, e,
1821            );
1822        }
1823    }
1824}
1825
1826/// Options for [`DaemonRuntime::start_migration_with`].
1827///
1828/// - Stage 6 of
1829///   [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1830///   the `transport_identity` flag. Default `true`.
1831/// - Stages 3 + 4 of
1832///   [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
1833///   the `retry_not_ready` budget. When the migration target
1834///   responds `NotReady` (runtime still in `Registering`), the
1835///   source backs off + re-initiates up to this total elapsed
1836///   time. `None` disables retry; the first `NotReady` surfaces
1837///   immediately.
1838#[derive(Debug, Clone)]
1839pub struct MigrationOpts {
1840    /// If `true` (default), the source node seals its daemon's
1841    /// ed25519 seed into the outbound snapshot using the target's
1842    /// X25519 static pubkey. The target unseals on arrival and the
1843    /// migrated daemon keeps its full signing capability.
1844    ///
1845    /// If `false`, the envelope is omitted and the target
1846    /// reconstructs the daemon with a `public_only` keypair —
1847    /// identity queries (`entity_id`, `origin_hash`) work, but
1848    /// `sign` calls fail with `EntityError::ReadOnly`. Appropriate
1849    /// for pure compute daemons that only consume events and emit
1850    /// payloads, and do NOT need to mint capability announcements
1851    /// or issue permission tokens from the target.
1852    pub transport_identity: bool,
1853
1854    /// Retry budget for [`MigrationFailureReason::NotReady`].
1855    ///
1856    /// `Some(d)` (default 30 s): on `NotReady`, the source backs
1857    /// off (500 ms → 1 s → 2 s → 4 s → 8 s, capped at 16 s) and
1858    /// re-initiates the migration. The total retry clock is
1859    /// capped at `d`; after that, the caller sees
1860    /// [`MigrationFailureReason::NotReadyTimeout`].
1861    ///
1862    /// `None`: no retry. The first `NotReady` surfaces as a
1863    /// terminal failure to the caller.
1864    pub retry_not_ready: Option<std::time::Duration>,
1865}
1866
1867impl Default for MigrationOpts {
1868    fn default() -> Self {
1869        Self {
1870            transport_identity: true,
1871            retry_not_ready: Some(std::time::Duration::from_secs(30)),
1872        }
1873    }
1874}
1875
1876/// Exponential backoff for the i-th `NotReady` retry attempt.
1877/// First retry waits 500 ms, subsequent retries double up to a
1878/// 16 s cap — total budget is controlled separately by
1879/// [`MigrationOpts::retry_not_ready`]. Matching the schedule in
1880/// `DAEMON_RUNTIME_READINESS_PLAN.md` § *Source-side retry*.
1881fn not_ready_backoff(attempt: u8) -> std::time::Duration {
1882    use std::time::Duration;
1883    let ms = 500u64 << (attempt.saturating_sub(1).min(5));
1884    Duration::from_millis(ms.min(16_000))
1885}
1886
1887/// Handle to an in-flight migration. Drop the handle and the
1888/// orchestrator continues driving the migration to completion in the
1889/// background; keep it to observe phase transitions or request abort.
1890///
1891/// Cheap to clone — the backing state is shared with the
1892/// [`DaemonRuntime`] that produced it.
1893#[derive(Clone)]
1894pub struct MigrationHandle {
1895    /// Daemon being migrated.
1896    pub origin_hash: u64,
1897    /// Source node that currently hosts the daemon.
1898    pub source_node: u64,
1899    /// Target node that will host the daemon after cutover.
1900    pub target_node: u64,
1901    /// Runtime the orchestrator lives on. Used by [`Self::phase`]
1902    /// and [`Self::wait`] to poll migration state.
1903    runtime: DaemonRuntime,
1904    /// Options the migration was initiated with. Drives retry
1905    /// policy on `NotReady` + the identity-transport flag for
1906    /// re-initiated attempts.
1907    opts: MigrationOpts,
1908}
1909
1910impl MigrationHandle {
1911    /// Current migration phase, or `None` once the migration has
1912    /// left the orchestrator's records (either via `Complete` → auto
1913    /// cleanup, or via explicit abort). Callers distinguish the two
1914    /// by remembering the last non-None phase.
1915    pub fn phase(&self) -> Option<MigrationPhase> {
1916        self.runtime.inner.orchestrator.status(self.origin_hash)
1917    }
1918
1919    /// Block until the migration reaches a terminal state.
1920    ///
1921    /// Returns `Ok(())` on normal completion (saw `Complete`, then
1922    /// the orchestrator cleaned up). Returns `Err(MigrationError)`
1923    /// if the orchestrator's record disappeared without the caller
1924    /// ever having seen `Complete` — either an explicit abort or a
1925    /// failure at some upstream stage.
1926    ///
1927    /// This method does **not** enforce any wall-clock timeout. A
1928    /// migration that stalls waiting on an unresponsive peer will
1929    /// block indefinitely; callers that want a bound should use
1930    /// [`Self::wait_with_timeout`] instead.
1931    ///
1932    /// Polls every 50 ms. The implementation is deliberately
1933    /// simple — Stage 2 of `DAEMON_IDENTITY_MIGRATION_PLAN.md` and
1934    /// the V2 iteration of this plan will swap this for a
1935    /// broadcast-channel push, but 50 ms polling is plenty for the
1936    /// use cases a migration API sees today.
1937    pub async fn wait(self) -> Result<(), DaemonError> {
1938        self.wait_until(None).await
1939    }
1940
1941    /// Like [`Self::wait`] with a caller-controlled timeout. A
1942    /// timeout aborts the orchestrator-side record and returns
1943    /// `Err(MigrationError::StateFailed)`; a graceful `Complete`
1944    /// returns `Ok`.
1945    pub async fn wait_with_timeout(self, timeout: std::time::Duration) -> Result<(), DaemonError> {
1946        let deadline = tokio::time::Instant::now() + timeout;
1947        self.wait_until(Some(deadline)).await
1948    }
1949
1950    /// Inner wait loop with an optional deadline. `None` = block
1951    /// forever until the migration reaches a terminal state;
1952    /// `Some(d)` = give up and abort at `d`.
1953    ///
1954    /// Centralised here so `wait` and `wait_with_timeout` can't
1955    /// drift on the retry/backoff semantics, and so the
1956    /// "hidden 60 s ceiling" cannot reappear under `wait`.
1957    async fn wait_until(
1958        self,
1959        overall_deadline: Option<tokio::time::Instant>,
1960    ) -> Result<(), DaemonError> {
1961        let start = tokio::time::Instant::now();
1962        let retry_deadline = self.opts.retry_not_ready.map(|b| start + b);
1963        let mut attempts: u8 = 1; // first attempt initiated by start_migration_with
1964        loop {
1965            match self.wait_one_attempt(overall_deadline).await {
1966                Ok(()) => return Ok(()),
1967                Err(DaemonError::MigrationFailed(reason)) if reason.is_retriable() => {
1968                    // Retry decision.
1969                    let Some(retry_d) = retry_deadline else {
1970                        // Opts explicitly disable retry — surface
1971                        // the NotReady verbatim.
1972                        return Err(DaemonError::MigrationFailed(reason));
1973                    };
1974                    let now = tokio::time::Instant::now();
1975                    let overall_exhausted = overall_deadline.map(|d| now >= d).unwrap_or(false);
1976                    if now >= retry_d || overall_exhausted {
1977                        // Budget exhausted. `NotReadyTimeout` carries
1978                        // the attempt count for operator diagnosis.
1979                        return Err(DaemonError::MigrationFailed(
1980                            MigrationFailureReason::NotReadyTimeout { attempts },
1981                        ));
1982                    }
1983                    // Back off and re-initiate.
1984                    let backoff = not_ready_backoff(attempts);
1985                    tokio::time::sleep(backoff).await;
1986                    attempts = attempts.saturating_add(1);
1987                    // Re-initiate the migration by calling the
1988                    // orchestrator fresh. The previous record has
1989                    // been cleaned up by the dispatcher's
1990                    // MigrationFailed handler, so this starts a new
1991                    // attempt from phase 0.
1992                    self.reinitiate_attempt().await?;
1993                    // Loop; poll this new attempt's outcome.
1994                }
1995                Err(e) => return Err(e),
1996            }
1997        }
1998    }
1999
2000    /// Poll a single migration attempt's outcome. Returns:
2001    /// - `Ok(())` on Complete.
2002    /// - `Err(MigrationFailed(reason))` when the dispatcher
2003    ///   observed a structured failure.
2004    /// - `Err(Migration(_))` on overall-timeout or unknown abort.
2005    ///
2006    /// `overall_deadline = None` disables the deadline check — the
2007    /// poll loop only returns via a terminal status transition.
2008    async fn wait_one_attempt(
2009        &self,
2010        overall_deadline: Option<tokio::time::Instant>,
2011    ) -> Result<(), DaemonError> {
2012        loop {
2013            let current_phase = self.runtime.inner.orchestrator.status(self.origin_hash);
2014            match current_phase {
2015                Some(phase) => {
2016                    if phase == MigrationPhase::Complete {
2017                        // Give the dispatcher a beat to finish
2018                        // cleanup, then surface success.
2019                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2020                        return Ok(());
2021                    }
2022                }
2023                None => {
2024                    // A recorded failure is authoritative — the
2025                    // dispatcher populates `recent_failures` before
2026                    // the orchestrator removes the record, so
2027                    // status=None + recorded-reason unambiguously
2028                    // means abort.
2029                    if let Some(reason) = self.take_recent_failure() {
2030                        return Err(DaemonError::MigrationFailed(reason));
2031                    }
2032                    // No recorded failure. The orchestrator removes
2033                    // records via two paths:
2034                    //   1. `on_activate_ack` — success.
2035                    //   2. `abort_migration_with_reason` — failure,
2036                    //      which rides through the dispatcher's
2037                    //      `MigrationFailed` handler *before* the
2038                    //      record is dropped, so `recent_failures`
2039                    //      is populated first.
2040                    // Therefore status=None + no-recorded-failure is
2041                    // unambiguously success, regardless of what
2042                    // phase we last observed. This matters when the
2043                    // dispatcher runs the tail of the migration
2044                    // (Cutover → Complete → ActivateAck) entirely
2045                    // between two 50 ms polls — we may never observe
2046                    // `Complete` explicitly.
2047                    //
2048                    // Synchronous abort paths that bypass the
2049                    // dispatcher (`wait_one_attempt`'s own timeout,
2050                    // `start_migration_with`'s send-failure path)
2051                    // return `Err` to the caller *before* the wait
2052                    // loop can observe the None status, so they
2053                    // don't trip this branch.
2054                    return Ok(());
2055                }
2056            }
2057            if let Some(d) = overall_deadline {
2058                if tokio::time::Instant::now() >= d {
2059                    let _ = self
2060                        .runtime
2061                        .inner
2062                        .orchestrator
2063                        .abort_migration(self.origin_hash, "timeout".into());
2064                    return Err(DaemonError::Migration(MigrationError::StateFailed(
2065                        format!("migration timed out in phase {:?}", current_phase),
2066                    )));
2067                }
2068            }
2069            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2070        }
2071    }
2072
2073    /// Re-initiate a migration attempt for the same daemon after a
2074    /// retriable failure. Calls the orchestrator fresh + sends the
2075    /// first wire message; mirrors the tail of
2076    /// [`DaemonRuntime::start_migration_with`] but without building
2077    /// a new handle (we keep the existing one).
2078    async fn reinitiate_attempt(&self) -> Result<(), DaemonError> {
2079        let msgs = self
2080            .runtime
2081            .inner
2082            .orchestrator
2083            .start_migration(self.origin_hash, self.source_node, self.target_node)
2084            .map_err(DaemonError::Migration)?;
2085
2086        let msgs = if self.opts.transport_identity {
2087            self.runtime
2088                .maybe_seal_chunked_snapshot(self.origin_hash, self.target_node, msgs)
2089                .await?
2090        } else {
2091            msgs
2092        };
2093
2094        let dest_node = match msgs.first() {
2095            Some(MigrationMessage::TakeSnapshot { .. }) => self.source_node,
2096            Some(MigrationMessage::SnapshotReady { .. }) => self.target_node,
2097            Some(other) => {
2098                let _ = self
2099                    .runtime
2100                    .inner
2101                    .orchestrator
2102                    .abort_migration(self.origin_hash, "unexpected retry message".into());
2103                return Err(DaemonError::Migration(MigrationError::StateFailed(
2104                    format!("unexpected retry initial message: {other:?}"),
2105                )));
2106            }
2107            None => {
2108                let _ = self.runtime.inner.orchestrator.abort_migration(
2109                    self.origin_hash,
2110                    "orchestrator returned no retry messages".into(),
2111                );
2112                return Err(DaemonError::Migration(MigrationError::StateFailed(
2113                    "orchestrator returned no migration messages on retry".into(),
2114                )));
2115            }
2116        };
2117
2118        for msg in &msgs {
2119            self.runtime.send_migration_message(dest_node, msg).await?;
2120        }
2121        Ok(())
2122    }
2123
2124    /// Pop the most recent `MigrationFailureReason` the dispatcher
2125    /// observed for this migration (if any). Consumed: subsequent
2126    /// calls see `None` until the next failure arrives.
2127    fn take_recent_failure(&self) -> Option<MigrationFailureReason> {
2128        self.runtime
2129            .inner
2130            .recent_failures
2131            .lock()
2132            .ok()?
2133            .remove(&self.origin_hash)
2134    }
2135
2136    /// Request abort. The orchestrator emits a `MigrationFailed`
2137    /// message to involved nodes and clears its record; the target
2138    /// rolls back via its own handler. Best-effort — a migration
2139    /// past `Cutover` cannot be undone cleanly because routing has
2140    /// already flipped.
2141    pub async fn cancel(&self) -> Result<(), DaemonError> {
2142        let msg = self
2143            .runtime
2144            .inner
2145            .orchestrator
2146            .abort_migration(self.origin_hash, "cancel requested".into())
2147            .map_err(DaemonError::Migration)?;
2148        // Best-effort notify — ignore send errors, the orchestrator
2149        // record is already gone on our side.
2150        let _ = self
2151            .runtime
2152            .send_migration_message(self.source_node, &msg)
2153            .await;
2154        let _ = self
2155            .runtime
2156            .send_migration_message(self.target_node, &msg)
2157            .await;
2158        Ok(())
2159    }
2160}
2161
2162impl std::fmt::Debug for MigrationHandle {
2163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2164        f.debug_struct("MigrationHandle")
2165            .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
2166            .field("source_node", &format_args!("{:#x}", self.source_node))
2167            .field("target_node", &format_args!("{:#x}", self.target_node))
2168            .field("phase", &self.phase())
2169            .finish()
2170    }
2171}