Skip to main content

net_sdk/
compute.rs

1//! Compute surface — `MeshDaemon` + `DaemonRuntime`.
2//!
3//! Users implement [`MeshDaemon`] and hand it to a [`DaemonRuntime`]
4//! tied to a [`Mesh`] node. The runtime holds the
5//! kind-keyed factory table, the per-daemon host registry, and the
6//! lifecycle gate that decides when inbound migrations may land.
7//!
8//! This file is Stage 1 of
9//! [`SDK_COMPUTE_SURFACE_PLAN.md`](../../../docs/SDK_COMPUTE_SURFACE_PLAN.md)
10//! plus the lifecycle half of
11//! [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
12//! local spawn / snapshot / stop, with an explicit
13//! `Registering → Ready → ShuttingDown` fence. Migration is Stage 2;
14//! the wire-level half of the readiness plan
15//! (`MigrationFailureReason`) ships alongside it.
16//!
17//! # Example
18//!
19//! ```no_run
20//! use std::sync::Arc;
21//! use bytes::Bytes;
22//! use net_sdk::{Identity, Mesh};
23//! use net_sdk::compute::{
24//!     CausalEvent, DaemonHostConfig, DaemonRuntime, MeshDaemon,
25//! };
26//! use net_sdk::capabilities::CapabilityFilter;
27//! use net::adapter::net::compute::DaemonError as CoreDaemonError;
28//!
29//! struct EchoDaemon;
30//! impl MeshDaemon for EchoDaemon {
31//!     fn name(&self) -> &str { "echo" }
32//!     fn requirements(&self) -> CapabilityFilter { CapabilityFilter::default() }
33//!     fn process(&mut self, event: &CausalEvent) -> Result<Vec<Bytes>, CoreDaemonError> {
34//!         Ok(vec![event.payload.clone()])
35//!     }
36//! }
37//!
38//! # async fn example(mesh: Arc<Mesh>) -> Result<(), Box<dyn std::error::Error>> {
39//! let rt = DaemonRuntime::new(mesh);
40//! rt.register_factory("echo", || Box::new(EchoDaemon))?;
41//! rt.start().await?;
42//! let handle = rt.spawn("echo", Identity::generate(), DaemonHostConfig::default()).await?;
43//! # Ok(())
44//! # }
45//! ```
46
47use parking_lot::{Mutex, RwLock};
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
50use std::sync::Arc;
51
52use thiserror::Error;
53
54pub use ::net::adapter::net::compute::{
55    DaemonBindings, DaemonError as CoreDaemonError, DaemonHostConfig, DaemonStats, MeshDaemon,
56    MigrationError, MigrationFailureReason, MigrationPhase, PlacementDecision, SchedulerError,
57    SubscriptionBinding, SUBPROTOCOL_MIGRATION,
58};
59pub use ::net::adapter::net::state::causal::{CausalEvent, CausalLink};
60pub use ::net::adapter::net::state::snapshot::StateSnapshot;
61
62use ::net::adapter::net::channel::ChannelName;
63use ::net::adapter::net::identity::PermissionToken;
64
65use ::net::adapter::net::behavior::capability::CapabilitySet;
66use ::net::adapter::net::compute::{
67    chunk_snapshot, orchestrator::wire as migration_wire, DaemonFactoryRegistry, DaemonHost,
68    DaemonRegistry, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
69    MigrationTargetHandler, Scheduler,
70};
71use ::net::adapter::net::identity::EntityId;
72use ::net::adapter::net::subprotocol::{
73    FailureCallback, MigrationHandlerHooks, MigrationSubprotocolHandler, PostRestoreCallback,
74    PreCleanupCallback, ReadinessCallback,
75};
76
77use crate::identity::Identity;
78use crate::mesh::Mesh;
79
80/// Arc-wrapped factory closure. Kind-keyed at the SDK layer; cloned
81/// into the core `DaemonFactoryRegistry` at `spawn` time so a future
82/// migration target can reconstruct the daemon by `origin_hash`.
83type FactoryFn = Arc<dyn Fn() -> Box<dyn MeshDaemon> + Send + Sync>;
84
85/// Errors from the SDK daemon runtime.
86#[derive(Debug, Error)]
87pub enum DaemonError {
88    /// `start()` has not been called yet; the runtime is still in
89    /// `Registering` and will not accept spawns or migrations.
90    #[error("daemon runtime is not ready — call DaemonRuntime::start() first")]
91    NotReady,
92    /// `shutdown()` has been called; the runtime is permanently
93    /// non-functional.
94    #[error("daemon runtime has been shut down")]
95    ShuttingDown,
96    /// Two `register_factory` calls used the same `kind` string.
97    #[error("factory for kind '{0}' is already registered")]
98    FactoryAlreadyRegistered(String),
99    /// `spawn` / `spawn_from_snapshot` referenced an unregistered kind.
100    #[error("no factory registered for kind '{0}'")]
101    FactoryNotFound(String),
102    /// The snapshot's `entity_id.origin_hash` does not match the
103    /// identity handed to `spawn_from_snapshot`.
104    #[error(
105        "snapshot/identity mismatch: snapshot origin {snapshot:#x} != identity origin {identity:#x}"
106    )]
107    SnapshotIdentityMismatch { snapshot: u64, identity: u64 },
108    /// Pass-through for errors surfaced by the core compute layer.
109    #[error(transparent)]
110    Core(#[from] CoreDaemonError),
111    /// Pass-through for migration-layer errors.
112    #[error("migration failed: {0}")]
113    Migration(#[from] MigrationError),
114    /// Structured failure reason surfaced by the migration
115    /// dispatcher on the source side. Use
116    /// [`MigrationFailureReason::is_retriable`] to decide whether
117    /// the caller should back off and retry rather than propagating.
118    #[error("migration failed: {0}")]
119    MigrationFailed(MigrationFailureReason),
120}
121
122// Runtime state machine. Encoded as `u8` so it rides in an
123// `AtomicU8` without an extra layer of indirection. Values are
124// stable across the lifetime of a `DaemonRuntime` and must not be
125// reordered (release / acquire cmpxchg compares by value).
126#[repr(u8)]
127#[derive(Copy, Clone, Debug, Eq, PartialEq)]
128enum State {
129    Registering = 0,
130    Ready = 1,
131    ShuttingDown = 2,
132}
133
134impl State {
135    fn from_u8(v: u8) -> Self {
136        match v {
137            0 => State::Registering,
138            1 => State::Ready,
139            2 => State::ShuttingDown,
140            // `AtomicU8` is only written through `State::*` variants,
141            // so any other value means memory corruption. Panic
142            // loudly rather than silently misinterpret.
143            other => panic!("daemon runtime: corrupt state byte {other}"),
144        }
145    }
146}
147
148/// Per-mesh compute runtime.
149///
150/// Holds the kind-keyed factory table, the per-daemon host registry,
151/// and the `Registering → Ready → ShuttingDown` lifecycle gate. One
152/// `DaemonRuntime` per [`Mesh`]; clone the handle freely — the inner
153/// state is `Arc`-shared.
154#[derive(Clone)]
155pub struct DaemonRuntime {
156    inner: Arc<Inner>,
157}
158
159struct Inner {
160    mesh: Arc<Mesh>,
161    state: AtomicU8,
162    /// SDK-side kind → factory map. The migration target path reaches
163    /// into the core `factory_registry` below by `origin_hash`;
164    /// `kind` is SDK sugar so the *caller* can spawn without knowing
165    /// the underlying keypair.
166    factories: RwLock<HashMap<String, FactoryFn>>,
167    /// Core registry — shared with the migration target handler so
168    /// daemons restored from an inbound snapshot land in the same
169    /// map a local `spawn` uses.
170    registry: Arc<DaemonRegistry>,
171    /// Core scheduler — built once at `new()` from the mesh's
172    /// shared `CapabilityIndex`. Used by the `groups` feature's
173    /// `ReplicaGroup` / `ForkGroup` / `StandbyGroup` for
174    /// capability-based placement.
175    ///
176    /// `local_caps` is `CapabilitySet::default()` because the mesh
177    /// doesn't currently expose the locally-announced caps as a
178    /// readable snapshot. The only behavioral impact is that
179    /// capability-filtered daemons which could run locally won't
180    /// get `LocalPreferred` short-circuit placement — they fall
181    /// through to the `CapabilityIndex::query` path, which still
182    /// returns the local node (with the right announcements) and
183    /// places correctly. Revisit when a `MeshNode::local_caps()`
184    /// getter lands.
185    #[cfg_attr(not(feature = "groups"), allow(dead_code))]
186    scheduler: Arc<Scheduler>,
187    /// Core factory registry, keyed by `origin_hash`. `spawn` mirrors
188    /// each SDK-side kind registration into this map with the
189    /// concrete keypair attached so the migration target restores
190    /// through the existing `DaemonFactoryRegistry::construct` path.
191    factory_registry: Arc<DaemonFactoryRegistry>,
192    /// Migration orchestrator, owned by this node. Orchestrates the
193    /// 6-phase state machine when this node initiates a migration.
194    orchestrator: Arc<MigrationOrchestrator>,
195    /// Migration source handler — drives the source side when THIS
196    /// node is the source of a migration.
197    source_handler: Arc<MigrationSourceHandler>,
198    /// Migration target handler — drives the target side when THIS
199    /// node is the target of a migration.
200    target_handler: Arc<MigrationTargetHandler>,
201    /// Most recent `MigrationFailureReason` observed on the source
202    /// side for each migration, keyed by `daemon_origin`. Populated
203    /// by the dispatcher's failure callback; consumed by
204    /// `MigrationHandle::wait` to surface the reason to the caller.
205    /// Mutex because the SDK's dependency set doesn't include
206    /// `dashmap` directly, and this map sees low write frequency
207    /// (one entry per failed migration).
208    recent_failures: Mutex<HashMap<u64, MigrationFailureReason>>,
209    /// Test-only knob: when set to `true`, the readiness callback
210    /// reports "not ready" even when the runtime is in `Ready`.
211    /// Lets integration tests drive the `NotReady` retry path
212    /// without racing against runtime startup. Defaults to `false`;
213    /// production code should not touch it.
214    simulate_not_ready: AtomicBool,
215    /// Test-only stall injected into `spawn` between the
216    /// `require_ready` check and the registry inserts. Measured
217    /// in milliseconds; `0` = no stall (production default). See
218    /// [`DaemonRuntime::set_spawn_stall_ms`].
219    spawn_stall_ms: std::sync::atomic::AtomicU32,
220    /// Test-only stall injected into `start` between installing
221    /// the migration handler and the Registering→Ready CAS.
222    /// Measured in milliseconds; `0` = no stall.
223    start_stall_ms: std::sync::atomic::AtomicU32,
224    /// Per-origin post-delivery observers. Fired on every successful
225    /// `deliver(origin_hash, event)`, inside the registry call and
226    /// after the daemon's `process` returns Ok.
227    ///
228    /// **Why this exists.** `StandbyGroup` needs every event routed
229    /// to its active member captured in a replay buffer so a
230    /// promoted standby can reapply the window between the last
231    /// `sync_standbys` and the failure. A manual `on_event_delivered`
232    /// hook relied on the caller pairing every `deliver` with the
233    /// buffering call, which silently lost events on omission.
234    /// Observers close that gap: a group installs a weak-ref
235    /// closure that pushes the event into its buffer automatically,
236    /// with no contract on the caller.
237    ///
238    /// `pub(crate)` so `sdk/src/groups/*` can register; not a
239    /// stable public API. The observer list is a `Vec` so unrelated
240    /// subsystems (future audit / metrics hooks) could coexist on
241    /// the same origin without one overwriting another.
242    observers: Mutex<HashMap<u64, Vec<(u64, DeliverObserver)>>>,
243    /// Monotonic id minted by `register_deliver_observer`, used by
244    /// the returned `ObserverHandle` to identify its entry on
245    /// `Drop`/`unregister`.
246    #[cfg_attr(not(feature = "groups"), allow(dead_code))]
247    observer_id_counter: std::sync::atomic::AtomicU64,
248}
249
250/// A post-delivery observer closure registered against an
251/// `origin_hash`. Fires on every successful `DaemonRuntime::deliver`
252/// for that origin.
253///
254/// Observers MUST be cheap — they run on the delivery thread, after
255/// the daemon's `process` but before `deliver` returns. A slow or
256/// panicking observer stalls delivery. The in-tree use (standby
257/// replay buffer) is a single `VecDeque::push_back` under a
258/// short-lived lock; new observers should aim for similar.
259pub(crate) type DeliverObserver = Arc<dyn Fn(&CausalEvent) + Send + Sync>;
260
261/// Handle returned by
262/// [`DaemonRuntime::register_deliver_observer`]. Dropping the
263/// handle removes the observer from the runtime's per-origin
264/// observer list. Safe to drop after the runtime itself has been
265/// dropped: the `Weak` upgrade in `Drop` silently no-ops.
266#[cfg_attr(not(feature = "groups"), allow(dead_code))]
267pub(crate) struct ObserverHandle {
268    runtime: std::sync::Weak<Inner>,
269    origin: u64,
270    id: u64,
271}
272
273impl Drop for ObserverHandle {
274    fn drop(&mut self) {
275        if let Some(inner) = self.runtime.upgrade() {
276            let mut map = inner.observers.lock();
277            if let Some(v) = map.get_mut(&self.origin) {
278                v.retain(|(id, _)| *id != self.id);
279                if v.is_empty() {
280                    map.remove(&self.origin);
281                }
282            }
283        }
284    }
285}
286
287impl DaemonRuntime {
288    /// Attach a runtime to an existing [`Mesh`]. Stage 1 does not
289    /// consume the `Mesh` — users keep their `Arc<Mesh>` for channel
290    /// registration, subscription, and the rest of the non-compute
291    /// surface. Stage 2 will install the migration subprotocol
292    /// handler when [`Self::start`] runs; until then inbound
293    /// migration messages (if any) are silently dropped by the core,
294    /// same as today.
295    pub fn new(mesh: Arc<Mesh>) -> Self {
296        let local_node_id = mesh.inner().node_id();
297        let registry = Arc::new(DaemonRegistry::new());
298        let factory_registry = Arc::new(DaemonFactoryRegistry::new());
299        let source_handler = Arc::new(MigrationSourceHandler::new(registry.clone()));
300        // Wire `source_handler` into the orchestrator so
301        // local-source migrations register the migration in the
302        // source-side handler — without this, post-snapshot events
303        // are silently mutated into the source daemon's state and
304        // lost at cutover. See
305        // `MigrationOrchestrator::with_source_handler`.
306        let orchestrator = Arc::new(
307            MigrationOrchestrator::new(registry.clone(), local_node_id)
308                .with_source_handler(source_handler.clone()),
309        );
310        let target_handler = Arc::new(MigrationTargetHandler::new_with_factories(
311            registry.clone(),
312            factory_registry.clone(),
313        ));
314        // Scheduler shares the mesh's `CapabilityFold`, so
315        // announcements the mesh publishes become visible to
316        // placement queries immediately. `CapabilitySet::default()`
317        // for `local_caps` is a known gap — see the `scheduler`
318        // field's docstring on [`Inner`].
319        let scheduler = Arc::new(Scheduler::new(
320            mesh.inner().capability_fold().clone(),
321            local_node_id,
322            CapabilitySet::default(),
323        ));
324        Self {
325            inner: Arc::new(Inner {
326                mesh,
327                state: AtomicU8::new(State::Registering as u8),
328                factories: RwLock::new(HashMap::new()),
329                registry,
330                scheduler,
331                factory_registry,
332                orchestrator,
333                source_handler,
334                target_handler,
335                recent_failures: Mutex::new(HashMap::new()),
336                simulate_not_ready: AtomicBool::new(false),
337                spawn_stall_ms: std::sync::atomic::AtomicU32::new(0),
338                start_stall_ms: std::sync::atomic::AtomicU32::new(0),
339                observers: Mutex::new(HashMap::new()),
340                observer_id_counter: std::sync::atomic::AtomicU64::new(1),
341            }),
342        }
343    }
344
345    /// Register a post-delivery observer for `origin_hash`. Every
346    /// successful `deliver(origin_hash, event)` fires the closure
347    /// after the daemon's `process` returns Ok. Returns an
348    /// [`ObserverHandle`] whose `Drop` unregisters the observer.
349    ///
350    /// `pub(crate)` — only the SDK's own group wrappers use this.
351    /// See the `observers` field on [`Inner`] for the design rationale.
352    #[cfg_attr(not(feature = "groups"), allow(dead_code))]
353    pub(crate) fn register_deliver_observer(
354        &self,
355        origin: u64,
356        cb: DeliverObserver,
357    ) -> ObserverHandle {
358        let id = self
359            .inner
360            .observer_id_counter
361            .fetch_add(1, Ordering::Relaxed);
362        {
363            let mut map = self.inner.observers.lock();
364            map.entry(origin).or_default().push((id, cb));
365        }
366        ObserverHandle {
367            runtime: Arc::downgrade(&self.inner),
368            origin,
369            id,
370        }
371    }
372
373    /// Register a factory for a daemon type. `kind` is a user-chosen
374    /// string shared across every node that may host this daemon.
375    /// Second registrations of the same `kind` return
376    /// [`DaemonError::FactoryAlreadyRegistered`].
377    ///
378    /// Valid in both `Registering` and `Ready` states; the runtime
379    /// permits new kinds to appear at runtime. Only `ShuttingDown`
380    /// rejects.
381    ///
382    /// # Migration targeting
383    ///
384    /// `register_factory` alone is **not sufficient** to accept
385    /// inbound migrations — it registers the kind-to-closure mapping
386    /// only on the SDK side. The core migration dispatcher looks up
387    /// factories by `origin_hash` (the daemon's identity), not by
388    /// `kind`, because the migration wire protocol doesn't carry a
389    /// kind string; the target couldn't pick the right factory from
390    /// an inbound snapshot without an explicit binding.
391    ///
392    /// To accept migrations for a specific daemon, the target must
393    /// ALSO call one of:
394    ///
395    /// - [`Self::expect_migration`] `(kind, origin_hash, config)` —
396    ///   placeholder factory keyed by `origin_hash`; the envelope
397    ///   on the snapshot supplies the keypair at restore time.
398    ///   This is the common case.
399    /// - [`Self::register_migration_target_identity`] `(kind,
400    ///   identity, config)` — pre-provisions the keypair as a
401    ///   fallback when the source migrates with
402    ///   `transport_identity: false`.
403    ///
404    /// Or spawn the daemon locally first (via [`Self::spawn`]);
405    /// spawn seeds both the SDK map and the core registry, so a
406    /// daemon that migrated out and migrates back in on the same
407    /// node is covered without extra calls.
408    pub fn register_factory<F>(&self, kind: &str, factory: F) -> Result<(), DaemonError>
409    where
410        F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
411    {
412        if self.state() == State::ShuttingDown {
413            return Err(DaemonError::ShuttingDown);
414        }
415        // `contains_key` + `insert` would be atomic under the
416        // single `write` guard, but the `entry` API makes
417        // atomicity self-evident in one expression — no opening
418        // for a future reviewer to wonder whether the check and
419        // the insert can drift across two separately-acquired
420        // guards.
421        let mut map = self.inner.factories.write();
422        match map.entry(kind.to_string()) {
423            std::collections::hash_map::Entry::Occupied(_) => {
424                Err(DaemonError::FactoryAlreadyRegistered(kind.to_string()))
425            }
426            std::collections::hash_map::Entry::Vacant(slot) => {
427                slot.insert(Arc::new(factory));
428                Ok(())
429            }
430        }
431    }
432
433    /// Promote to `Ready`. Idempotent — a second call on an already-
434    /// `Ready` runtime is a no-op; a call on a `ShuttingDown` runtime
435    /// returns [`DaemonError::ShuttingDown`].
436    ///
437    /// Wires the migration subprotocol (`0x0500`) handler into the
438    /// mesh so inbound `TakeSnapshot` / `SnapshotReady` / etc.
439    /// messages reach the orchestrator / source / target handlers
440    /// owned by this runtime. Installing is idempotent w.r.t.
441    /// multiple `start` calls — the `ArcSwapOption` on the mesh
442    /// swaps the same handler in on each call.
443    pub async fn start(&self) -> Result<(), DaemonError> {
444        loop {
445            match self.state() {
446                State::Registering => {
447                    // Install the migration subprotocol handler
448                    // **before** publishing `Ready`. Other threads
449                    // that observe `Ready` must be able to rely on
450                    // the handler being live: the previous ordering
451                    // (CAS → install) left a window where a
452                    // concurrent caller read `Ready`, began a
453                    // migration, and sent `SnapshotReady` onto a
454                    // mesh whose handler slot was still empty —
455                    // the dispatcher's no-handler fallback would
456                    // synthesise `ComputeNotSupported`, aborting
457                    // the migration nondeterministically during
458                    // startup.
459                    //
460                    // Double-install is safe: `set_migration_handler`
461                    // is an `ArcSwap` store, so if two concurrent
462                    // `start()`s both reach this point the later
463                    // store just wins and the CAS picks one caller
464                    // to return first. Both built handlers are
465                    // functionally equivalent (same registry,
466                    // orchestrator, hooks).
467                    let handler = Arc::new(self.build_migration_handler());
468                    self.inner.mesh.inner().set_migration_handler(handler);
469
470                    // Test-only stall between install and CAS so
471                    // integration tests can race `shutdown` in. In
472                    // production the atomic load is 0 and this is
473                    // a no-op.
474                    let stall_ms = self.inner.start_stall_ms.load(Ordering::Acquire);
475                    if stall_ms > 0 {
476                        tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
477                    }
478
479                    let swap = self.inner.state.compare_exchange(
480                        State::Registering as u8,
481                        State::Ready as u8,
482                        Ordering::AcqRel,
483                        Ordering::Acquire,
484                    );
485                    if swap.is_ok() {
486                        return Ok(());
487                    }
488                    // Lost the CAS. State is now either `Ready`
489                    // (another `start` caller beat us) or
490                    // `ShuttingDown` (a concurrent `shutdown`
491                    // raced past our install). Handle the second
492                    // case explicitly — otherwise a torn-down
493                    // runtime would leave a live handler on the
494                    // mesh, accepting migration traffic and firing
495                    // callbacks against stale registry state.
496                    //
497                    // Under the `Ready` case we leave the handler
498                    // installed: the winning `start` caller also
499                    // installed an equivalent handler (same
500                    // registry, orchestrator, hooks), so whichever
501                    // `ArcSwap` store lands last is indistinguishable
502                    // from the winner's.
503                    if self.state() == State::ShuttingDown {
504                        self.inner.mesh.inner().clear_migration_handler();
505                        return Err(DaemonError::ShuttingDown);
506                    }
507                    // CAS lost to another `start` that won the
508                    // race — loop to re-classify and return Ok on
509                    // the `Ready` arm.
510                }
511                State::Ready => return Ok(()),
512                State::ShuttingDown => return Err(DaemonError::ShuttingDown),
513            }
514        }
515    }
516
517    /// Construct the migration subprotocol handler with every
518    /// hook wired — identity context, channel-rebind replay,
519    /// unsubscribe teardown, readiness predicate, failure
520    /// observer. Extracted so [`Self::start`] can install it
521    /// before the atomic state flip (race fix) without burying
522    /// the construction inline.
523    fn build_migration_handler(&self) -> MigrationSubprotocolHandler {
524        let local_node_id = self.inner.mesh.inner().node_id();
525        // Ask the core crate to build the context. The Noise static
526        // private key is captured inside closures the core owns and
527        // never crosses this boundary as raw bytes — see
528        // `MeshNode::migration_identity_context`.
529        let ctx = self.inner.mesh.inner().migration_identity_context();
530        // Capture the active tokio Handle so the substrate callbacks
531        // can spawn even when fired from a thread that doesn't have a
532        // current runtime (e.g. an FFI trampoline driving the
533        // migration handler synchronously from a Go test). The
534        // substrate explicitly calls these hooks synchronously and
535        // expects them to `tokio::spawn` the actual work — without an
536        // explicit Handle, `tokio::spawn` panics with "no reactor
537        // running" on non-runtime threads.
538        //
539        // Safe to call `Handle::current()` here: `build_migration_handler`
540        // is only invoked from `pub async fn start`, which always
541        // runs inside a tokio runtime.
542        let rt = tokio::runtime::Handle::current();
543        let inner_for_rebind = self.inner.clone();
544        let rt_for_rebind = rt.clone();
545        let post_restore: PostRestoreCallback = Arc::new(move |origin_hash: u64| {
546            let inner = inner_for_rebind.clone();
547            rt_for_rebind.spawn(async move {
548                replay_subscriptions(inner, origin_hash).await;
549            });
550        });
551        let inner_for_teardown = self.inner.clone();
552        let rt_for_teardown = rt.clone();
553        let pre_cleanup: PreCleanupCallback = Arc::new(move |origin_hash: u64| {
554            // Snapshot the ledger BEFORE cleanup drops the host —
555            // after that, the ledger is gone. Spawn async
556            // unsubscribes so the dispatcher thread returns
557            // immediately.
558            let bindings = inner_for_teardown
559                .registry
560                .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
561                .unwrap_or_default();
562            if bindings.is_empty() {
563                return;
564            }
565            let inner = inner_for_teardown.clone();
566            rt_for_teardown.spawn(async move {
567                teardown_subscriptions(inner, bindings).await;
568            });
569        });
570        let inner_for_readiness = self.inner.clone();
571        let readiness: ReadinessCallback = Arc::new(move || {
572            // Test-only: `simulate_not_ready` flips the predicate
573            // to false regardless of the underlying lifecycle
574            // state. Honour it first so integration tests can
575            // drive the NotReady retry path.
576            if inner_for_readiness
577                .simulate_not_ready
578                .load(Ordering::Acquire)
579            {
580                return false;
581            }
582            inner_for_readiness.state.load(Ordering::Acquire) == State::Ready as u8
583        });
584        let inner_for_failure = self.inner.clone();
585        let failure: FailureCallback =
586            Arc::new(move |origin_hash: u64, reason: MigrationFailureReason| {
587                inner_for_failure
588                    .recent_failures
589                    .lock()
590                    .insert(origin_hash, reason);
591            });
592        MigrationSubprotocolHandler::with_hooks(
593            self.inner.orchestrator.clone(),
594            self.inner.source_handler.clone(),
595            self.inner.target_handler.clone(),
596            local_node_id,
597            MigrationHandlerHooks {
598                identity: Some(ctx),
599                post_restore: Some(post_restore),
600                pre_cleanup: Some(pre_cleanup),
601                readiness: Some(readiness),
602                failure: Some(failure),
603            },
604        )
605    }
606
607    /// Tear down the runtime. Unregisters every local daemon host,
608    /// clears the factory registry, and transitions state to
609    /// `ShuttingDown`. Subsequent calls on this runtime fail with
610    /// [`DaemonError::ShuttingDown`]. A second `shutdown` is a no-op.
611    pub async fn shutdown(&self) -> Result<(), DaemonError> {
612        // Mark ShuttingDown first so new spawns / registrations
613        // immediately short-circuit. The store isn't a CAS — a
614        // `ShuttingDown → ShuttingDown` re-store is cheap and
615        // benign.
616        self.inner
617            .state
618            .store(State::ShuttingDown as u8, Ordering::Release);
619
620        // Drain the registry. `list()` snapshots origin_hashes under
621        // its internal read guard; iterating is safe because we own
622        // the unregister path.
623        let origins: Vec<u64> = self
624            .inner
625            .registry
626            .list()
627            .into_iter()
628            .map(|(origin, _)| origin)
629            .collect();
630        for origin in origins {
631            let _ = self.inner.registry.unregister(origin);
632            self.inner.factory_registry.remove(origin);
633        }
634        // Drop any leftover migration-failure entries so they
635        // don't count against the process's memory footprint after
636        // shutdown. The runtime is permanently non-functional at
637        // this point, so no one will consume them.
638        self.inner.recent_failures.lock().clear();
639        // Uninstall the migration subprotocol handler. The
640        // handler carries `Arc` clones into our `Inner` — leaving
641        // it installed keeps the runtime's internals alive via
642        // the mesh even after we've drained every registry, and
643        // would accept inbound migration traffic that now
644        // unconditionally fails (empty registry). The happy-path
645        // teardown should leave the mesh in the same shape it
646        // had before `start()`.
647        self.inner.mesh.inner().clear_migration_handler();
648        Ok(())
649    }
650
651    /// **Test-only.** Force the readiness predicate seen by the
652    /// migration dispatcher to return `false` regardless of
653    /// lifecycle state — simulates a target that's still in
654    /// `Registering` even after `start()` has run. Lets
655    /// integration tests exercise the `NotReady` retry path
656    /// without racing against runtime startup.
657    ///
658    /// No effect on `is_ready()` or `spawn` / `stop` — those use
659    /// the underlying `state` directly. Only the dispatcher's
660    /// readiness predicate is affected.
661    pub fn simulate_not_ready(&self, flag: bool) {
662        self.inner.simulate_not_ready.store(flag, Ordering::Release);
663    }
664
665    /// **Test-only.** Inject a sleep inside `spawn` between the
666    /// initial `require_ready` check and the registry inserts,
667    /// giving integration tests a deterministic window to race
668    /// `shutdown` against. Duration is stored as millis; `0`
669    /// disables.
670    #[doc(hidden)]
671    pub fn set_spawn_stall_ms(&self, millis: u32) {
672        self.inner.spawn_stall_ms.store(millis, Ordering::Release);
673    }
674
675    /// **Test-only.** Inject a sleep inside `start` between the
676    /// handler install and the Registering→Ready CAS. Used to
677    /// deterministically race `shutdown` against a mid-flight
678    /// `start` so tests can verify the handler-cleanup path.
679    #[doc(hidden)]
680    pub fn set_start_stall_ms(&self, millis: u32) {
681        self.inner.start_stall_ms.store(millis, Ordering::Release);
682    }
683
684    /// Readiness accessor for tests + operators. `true` iff the
685    /// runtime has transitioned to `Ready` and has not yet begun
686    /// shutting down.
687    pub fn is_ready(&self) -> bool {
688        self.state() == State::Ready
689    }
690
691    /// Spawn a daemon of `kind` under the caller-provided
692    /// [`Identity`]. The identity's keypair seeds the daemon's
693    /// `origin_hash` + `entity_id`; the runtime registers both the
694    /// live host and a kind-keyed factory in the core registry so a
695    /// future migration target can reconstruct the daemon through
696    /// the existing `DaemonFactoryRegistry::construct` path.
697    ///
698    /// The returned [`DaemonHandle`] is clone-safe; dropping it does
699    /// not stop the daemon. Call [`Self::stop`] explicitly.
700    pub async fn spawn(
701        &self,
702        kind: &str,
703        identity: Identity,
704        config: DaemonHostConfig,
705    ) -> Result<DaemonHandle, DaemonError> {
706        self.require_ready()?;
707        // Test-only stall between the readiness check and the
708        // registry inserts. In production `spawn_stall_ms` is
709        // always 0 and this branch is a no-op.
710        let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
711        if stall_ms > 0 {
712            tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
713        }
714        let factory = self.factory_for_kind(kind)?;
715        let daemon = (factory)();
716        let keypair = identity.keypair().as_ref().clone();
717        let origin_hash = keypair.origin_hash();
718        let entity_id = keypair.entity_id().clone();
719
720        // Mirror the factory into the core registry BEFORE registering
721        // the host, so a migration-target handler that catches up on
722        // this origin mid-spawn always sees a consistent view. Atomic
723        // on collision: if another daemon already claims this
724        // `origin_hash`, bail without mutating either registry —
725        // otherwise the rollback below would strip the factory entry
726        // for the *existing* daemon and silently break its future
727        // migratability.
728        let factory_for_core = factory.clone();
729        self.inner
730            .factory_registry
731            .register(keypair.clone(), config.clone(), move || {
732                (factory_for_core)()
733            })
734            .map_err(DaemonError::Core)?;
735
736        let host = DaemonHost::new(daemon, keypair, config);
737        // `DaemonRegistry::register` errors on origin_hash collisions
738        // — two daemons can't share the same identity.
739        //
740        // Rolling back our `factory_registry` insert is safe
741        // because `factory_registry::register` is atomic on
742        // collision: since our call above *succeeded*, the slot
743        // was empty and we exclusively own it. No other code path
744        // replaces an occupied slot (`register` /
745        // `register_placeholder` both error on collision;
746        // `remove` / `take` only remove their caller's own
747        // entries), so the entry we're about to remove is still
748        // ours. The rollback cannot affect a pre-existing
749        // placeholder or another daemon's factory entry.
750        if let Err(e) = self.inner.registry.register(host) {
751            self.inner.factory_registry.remove(origin_hash);
752            return Err(DaemonError::Core(e));
753        }
754
755        // Post-insert fence: `shutdown` may have raced past our
756        // initial `require_ready()` and already swept the
757        // registries. In that case our entries are either already
758        // gone (shutdown saw them in its sweep) or will outlive
759        // shutdown's sweep (we inserted after `list()`). Roll back
760        // unconditionally under the ShuttingDown branch so no
761        // zombie daemon survives the torn-down runtime; the
762        // rollback's `unregister` / `remove` calls are idempotent
763        // no-ops if shutdown already drained our slot.
764        if self.state() == State::ShuttingDown {
765            let _ = self.inner.registry.unregister(origin_hash);
766            self.inner.factory_registry.remove(origin_hash);
767            return Err(DaemonError::ShuttingDown);
768        }
769
770        Ok(DaemonHandle {
771            origin_hash,
772            entity_id,
773            inner: self.inner.clone(),
774        })
775    }
776
777    /// Spawn a daemon with a caller-supplied `MeshDaemon` instance,
778    /// bypassing the SDK-side kind-factory lookup.
779    ///
780    /// Used by language-binding layers (currently: the NAPI `compute`
781    /// module) that build daemon instances via cross-FFI dispatch —
782    /// the factory closure for such a daemon can't be a plain
783    /// `Fn() -> Box<dyn MeshDaemon>` because constructing the
784    /// daemon requires an awaitable call into the host language.
785    /// The binding does the await itself, hands in the resulting
786    /// `Box<dyn MeshDaemon>`, and this method does the rest of
787    /// what [`Self::spawn`] does: register the `(origin_hash →
788    /// kind-factory)` mirror in the core registry so future
789    /// migrations can reconstruct the daemon, insert the host,
790    /// and run the same shutdown-race fence.
791    ///
792    /// `kind_factory` is the closure the core registry stores for
793    /// migration-target reconstruction; it must be re-callable
794    /// (migration targets call it when they restore the daemon
795    /// on another node). Bindings typically build this by cloning
796    /// the same TSFN used for the initial spawn.
797    pub async fn spawn_with_daemon<F>(
798        &self,
799        identity: Identity,
800        config: DaemonHostConfig,
801        daemon: Box<dyn MeshDaemon>,
802        kind_factory: F,
803    ) -> Result<DaemonHandle, DaemonError>
804    where
805        F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
806    {
807        self.require_ready()?;
808        // Same test-only stall as `spawn` — tests race shutdown
809        // against this path too.
810        let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
811        if stall_ms > 0 {
812            tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
813        }
814        let keypair = identity.keypair().as_ref().clone();
815        let origin_hash = keypair.origin_hash();
816        let entity_id = keypair.entity_id().clone();
817
818        // Mirror the caller-supplied kind_factory into the core
819        // registry. Same atomic-register semantics as `spawn` —
820        // if another daemon already claims this origin_hash, bail
821        // without touching state.
822        self.inner
823            .factory_registry
824            .register(keypair.clone(), config.clone(), kind_factory)
825            .map_err(DaemonError::Core)?;
826
827        let host = DaemonHost::new(daemon, keypair, config);
828        if let Err(e) = self.inner.registry.register(host) {
829            self.inner.factory_registry.remove(origin_hash);
830            return Err(DaemonError::Core(e));
831        }
832
833        // Post-insert shutdown fence, matching `spawn`. Without
834        // this, a concurrent `shutdown()` that raced past our
835        // `require_ready` check and already swept the registries
836        // would leave a zombie daemon live in the torn-down
837        // runtime.
838        if self.state() == State::ShuttingDown {
839            let _ = self.inner.registry.unregister(origin_hash);
840            self.inner.factory_registry.remove(origin_hash);
841            return Err(DaemonError::ShuttingDown);
842        }
843
844        Ok(DaemonHandle {
845            origin_hash,
846            entity_id,
847            inner: self.inner.clone(),
848        })
849    }
850
851    /// Spawn a daemon from a caller-supplied instance and restore its
852    /// state from `snapshot`, bypassing the SDK-side kind-factory
853    /// lookup. Parallels [`Self::spawn_with_daemon`] for restore.
854    ///
855    /// Used by language-binding layers whose daemons are built via
856    /// cross-FFI dispatch — construction goes through the host
857    /// language, then the binding hands the built `Box<dyn MeshDaemon>`
858    /// (already wired to its TSFN bridge) plus the `kind_factory`
859    /// closure used by the core registry for migration-target
860    /// reconstruction.
861    pub async fn spawn_from_snapshot_with_daemon<F>(
862        &self,
863        identity: Identity,
864        snapshot: StateSnapshot,
865        config: DaemonHostConfig,
866        daemon: Box<dyn MeshDaemon>,
867        kind_factory: F,
868    ) -> Result<DaemonHandle, DaemonError>
869    where
870        F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
871    {
872        self.require_ready()?;
873        let keypair = identity.keypair().as_ref().clone();
874        let origin_hash = keypair.origin_hash();
875        let entity_id = keypair.entity_id().clone();
876
877        // Full `entity_id` comparison — see `spawn_from_snapshot` for
878        // the birthday-collision rationale.
879        if snapshot.entity_id != entity_id {
880            return Err(DaemonError::SnapshotIdentityMismatch {
881                snapshot: snapshot.entity_id.origin_hash(),
882                identity: origin_hash,
883            });
884        }
885
886        self.inner
887            .factory_registry
888            .register(keypair.clone(), config.clone(), kind_factory)
889            .map_err(DaemonError::Core)?;
890
891        let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
892            Ok(h) => h,
893            Err(e) => {
894                self.inner.factory_registry.remove(origin_hash);
895                return Err(DaemonError::Core(e));
896            }
897        };
898
899        if let Err(e) = self.inner.registry.register(host) {
900            self.inner.factory_registry.remove(origin_hash);
901            return Err(DaemonError::Core(e));
902        }
903
904        if self.state() == State::ShuttingDown {
905            let _ = self.inner.registry.unregister(origin_hash);
906            self.inner.factory_registry.remove(origin_hash);
907            return Err(DaemonError::ShuttingDown);
908        }
909
910        Ok(DaemonHandle {
911            origin_hash,
912            entity_id,
913            inner: self.inner.clone(),
914        })
915    }
916
917    /// Spawn a daemon of `kind` and restore its state from `snapshot`.
918    /// The snapshot's `entity_id` must match the caller's
919    /// [`Identity`]; mismatch returns
920    /// [`DaemonError::SnapshotIdentityMismatch`] before any side
921    /// effects land.
922    pub async fn spawn_from_snapshot(
923        &self,
924        kind: &str,
925        identity: Identity,
926        snapshot: StateSnapshot,
927        config: DaemonHostConfig,
928    ) -> Result<DaemonHandle, DaemonError> {
929        self.require_ready()?;
930        let factory = self.factory_for_kind(kind)?;
931        let keypair = identity.keypair().as_ref().clone();
932        let origin_hash = keypair.origin_hash();
933        let entity_id = keypair.entity_id().clone();
934
935        // Compare the **full** 32-byte `entity_id`, not just the
936        // 64-bit `origin_hash` projection. `origin_hash` is a
937        // birthday-bounded 64-bit hash of the ed25519 public key;
938        // two legitimately-different identities can collide on
939        // `origin_hash` with probability ~2^-32 after ~4B daemons.
940        // A collision would let the *wrong* identity restore the
941        // snapshot, producing a daemon signed under one pubkey
942        // but claiming to be another — downstream signatures then
943        // verify against the wrong key and the daemon silently
944        // produces outputs no peer accepts.
945        if snapshot.entity_id != entity_id {
946            return Err(DaemonError::SnapshotIdentityMismatch {
947                snapshot: snapshot.entity_id.origin_hash(),
948                identity: origin_hash,
949            });
950        }
951
952        let daemon = (factory)();
953        // Atomic register: collision here means some other daemon
954        // (live or a previous spawn_from_snapshot in-flight) already
955        // owns this `origin_hash`. Bail without touching the other
956        // daemon's state — the later rollback would otherwise remove
957        // the victim's factory entry and silently break its future
958        // migratability.
959        let factory_for_core = factory.clone();
960        self.inner
961            .factory_registry
962            .register(keypair.clone(), config.clone(), move || {
963                (factory_for_core)()
964            })
965            .map_err(DaemonError::Core)?;
966
967        let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
968            Ok(h) => h,
969            Err(e) => {
970                // Same rollback-safety argument as in `spawn`:
971                // atomic `factory_registry::register` above means
972                // we exclusively own the slot, and nothing else
973                // can replace an occupied slot. Removing here
974                // cleans up strictly our insert — an adjacent
975                // pre-existing placeholder (e.g. from
976                // `expect_migration`) would have made our
977                // register fail atomically upstream, and we'd
978                // never have reached this branch.
979                self.inner.factory_registry.remove(origin_hash);
980                return Err(DaemonError::Core(e));
981            }
982        };
983
984        if let Err(e) = self.inner.registry.register(host) {
985            // Same ownership argument — the atomic register
986            // above means we own this slot exclusively.
987            self.inner.factory_registry.remove(origin_hash);
988            return Err(DaemonError::Core(e));
989        }
990
991        // Post-insert fence against a concurrent `shutdown` — see
992        // the matching comment in `spawn`.
993        if self.state() == State::ShuttingDown {
994            let _ = self.inner.registry.unregister(origin_hash);
995            self.inner.factory_registry.remove(origin_hash);
996            return Err(DaemonError::ShuttingDown);
997        }
998
999        Ok(DaemonHandle {
1000            origin_hash,
1001            entity_id,
1002            inner: self.inner.clone(),
1003        })
1004    }
1005
1006    /// Stop a daemon, removing it from the runtime's registry. Valid
1007    /// while `Ready` and (idempotently) during `ShuttingDown` —
1008    /// `ShuttingDown` paths through here are a no-op because the
1009    /// shutdown sweep has already drained the registry.
1010    pub async fn stop(&self, origin_hash: u64) -> Result<(), DaemonError> {
1011        if self.state() == State::Registering {
1012            return Err(DaemonError::NotReady);
1013        }
1014        // Treat a missing daemon as success during ShuttingDown —
1015        // the shutdown sweep drained it.
1016        match self.inner.registry.unregister(origin_hash) {
1017            Ok(_) => {
1018                self.inner.factory_registry.remove(origin_hash);
1019                Ok(())
1020            }
1021            Err(CoreDaemonError::NotFound(_)) if self.state() == State::ShuttingDown => Ok(()),
1022            Err(e) => Err(DaemonError::Core(e)),
1023        }
1024    }
1025
1026    /// Take a snapshot of a running daemon by `origin_hash`. Returns
1027    /// `Ok(None)` when the daemon is stateless.
1028    pub async fn snapshot(&self, origin_hash: u64) -> Result<Option<StateSnapshot>, DaemonError> {
1029        self.require_ready()?;
1030        self.inner
1031            .registry
1032            .snapshot(origin_hash)
1033            .map_err(DaemonError::Core)
1034    }
1035
1036    /// Deliver one causal event to the daemon identified by
1037    /// `origin_hash`, returning the daemon's outputs wrapped in the
1038    /// host's causal chain.
1039    ///
1040    /// Stage 1 convenience — Stage 2 adds mesh-dispatched delivery
1041    /// via the causal subprotocol, and this direct path becomes
1042    /// testing sugar rather than the primary ingress.
1043    pub fn deliver(
1044        &self,
1045        origin_hash: u64,
1046        event: &CausalEvent,
1047    ) -> Result<Vec<CausalEvent>, DaemonError> {
1048        self.require_ready()?;
1049        let outputs = self
1050            .inner
1051            .registry
1052            .deliver(origin_hash, event)
1053            .map_err(DaemonError::Core)?;
1054
1055        // Fire post-delivery observers (e.g. `StandbyGroup`'s replay
1056        // buffer). Observers run AFTER a successful `process` so
1057        // replay on promotion doesn't re-apply events the active
1058        // rejected. Snapshot the observer list into a local `Vec`
1059        // so each callback runs without the map's mutex held —
1060        // prevents a misbehaving observer from blocking unrelated
1061        // deliveries and rules out re-entrant-lock deadlocks if an
1062        // observer ever calls back into `deliver`.
1063        let to_fire: Vec<DeliverObserver> = {
1064            let map = self.inner.observers.lock();
1065            map.get(&origin_hash)
1066                .map(|v| v.iter().map(|(_, cb)| cb.clone()).collect())
1067                .unwrap_or_default()
1068        };
1069        for cb in to_fire {
1070            cb(event);
1071        }
1072        Ok(outputs)
1073    }
1074
1075    /// Number of daemons currently registered.
1076    pub fn daemon_count(&self) -> usize {
1077        self.inner.registry.count()
1078    }
1079
1080    /// Current orchestrator-side migration phase for `origin_hash`,
1081    /// or `None` when no migration record exists (either never
1082    /// started here or already reached its terminal state and was
1083    /// removed). Useful for tests that assert the migration reached
1084    /// true completion (record gone via `ActivateAck`) rather than
1085    /// simply advancing to the `Complete` phase.
1086    pub fn migration_phase(&self, origin_hash: u64) -> Option<MigrationPhase> {
1087        self.inner.orchestrator.status(origin_hash)
1088    }
1089
1090    /// **Test-only.** Peek at the cached failure reason for
1091    /// `origin_hash` without consuming it.
1092    ///
1093    /// Normal `MigrationHandle::wait` code path pops the entry when
1094    /// it hits status=None; this accessor exists so regression tests
1095    /// can observe the cache's lifecycle directly (e.g. assert the
1096    /// cache is cleared by `start_migration_with`).
1097    ///
1098    /// Exposed publicly because SDK integration tests live in a
1099    /// separate crate; not part of the stable surface.
1100    #[doc(hidden)]
1101    pub fn peek_migration_failure(&self, origin_hash: u64) -> Option<MigrationFailureReason> {
1102        self.inner.recent_failures.lock().get(&origin_hash).cloned()
1103    }
1104
1105    /// **Test-only.** Inject a failure reason into the cache for
1106    /// `origin_hash`. Lets tests stage a "stale entry from a prior
1107    /// attempt" scenario deterministically, without having to run
1108    /// a whole losing migration to populate it.
1109    ///
1110    /// Same caveat as [`Self::peek_migration_failure`] — visible
1111    /// because SDK integration tests live out-of-crate; not part of
1112    /// the stable surface.
1113    #[doc(hidden)]
1114    pub fn inject_migration_failure(&self, origin_hash: u64, reason: MigrationFailureReason) {
1115        self.inner
1116            .recent_failures
1117            .lock()
1118            .insert(origin_hash, reason);
1119    }
1120
1121    /// Snapshot the daemon's subscription ledger — a cloned view of
1122    /// every `(publisher, channel)` pair the daemon has subscribed
1123    /// to via [`Self::subscribe_channel`]. Used by the migration
1124    /// target path to drive replay and by tests / operators to
1125    /// observe what a daemon is subscribed to.
1126    pub fn subscriptions(&self, origin_hash: u64) -> Result<Vec<SubscriptionBinding>, DaemonError> {
1127        self.inner
1128            .registry
1129            .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1130            .map_err(DaemonError::Core)
1131    }
1132
1133    /// Subscribe a specific daemon to a channel on a remote
1134    /// publisher. Routes through the mesh's membership subprotocol
1135    /// and **records the subscription in the daemon's ledger** so
1136    /// a migration target can replay it after cutover. Users should
1137    /// use this method (rather than reaching through
1138    /// `rt.mesh().inner().subscribe_channel_*`) for daemon-owned
1139    /// subscriptions; otherwise the subscription travels with the
1140    /// node, not the daemon, and silently drops on migration.
1141    ///
1142    /// Flow:
1143    /// 1. Hit the publisher's membership endpoint via
1144    ///    `Mesh::subscribe_channel_with_token` (or the
1145    ///    no-token variant).
1146    /// 2. On success, record
1147    ///    `(publisher, channel) → SubscriptionBinding` in the
1148    ///    host's ledger.
1149    /// 3. On wire failure, no ledger mutation.
1150    ///
1151    /// `token` is the caller-owned [`PermissionToken`] for
1152    /// token-gated channels; `None` for open channels.
1153    pub async fn subscribe_channel(
1154        &self,
1155        origin_hash: u64,
1156        publisher: u64,
1157        channel: ChannelName,
1158        token: Option<PermissionToken>,
1159    ) -> Result<(), DaemonError> {
1160        self.require_ready()?;
1161        if !self.inner.registry.contains(origin_hash) {
1162            return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1163        }
1164        // Capture serialized token bytes for the ledger BEFORE
1165        // handing ownership to the mesh. The mesh call consumes the
1166        // token by value on the token-path.
1167        let token_bytes = token.as_ref().map(|t| t.to_bytes().to_vec());
1168        let result = match token {
1169            Some(tok) => {
1170                self.inner
1171                    .mesh
1172                    .inner()
1173                    .subscribe_channel_with_token(publisher, channel.clone(), tok)
1174                    .await
1175            }
1176            None => {
1177                self.inner
1178                    .mesh
1179                    .inner()
1180                    .subscribe_channel(publisher, channel.clone())
1181                    .await
1182            }
1183        };
1184        result.map_err(|e| {
1185            DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1186                "subscribe_channel failed: {e}"
1187            )))
1188        })?;
1189
1190        // Mesh accepted the subscribe — record in the ledger.
1191        // `with_host` reaches into the registry's per-daemon mutex;
1192        // since this is SDK-level code we do the minimum work
1193        // under the lock (a single DashMap insert).
1194        if let Err(e) = self.inner.registry.with_host(origin_hash, |host| {
1195            host.record_subscription(publisher, channel, token_bytes);
1196        }) {
1197            return Err(DaemonError::Core(e));
1198        }
1199        Ok(())
1200    }
1201
1202    /// Unsubscribe a specific daemon from a channel. Symmetric to
1203    /// [`Self::subscribe_channel`]: mesh wire call first, then
1204    /// ledger update.
1205    pub async fn unsubscribe_channel(
1206        &self,
1207        origin_hash: u64,
1208        publisher: u64,
1209        channel: ChannelName,
1210    ) -> Result<(), DaemonError> {
1211        self.require_ready()?;
1212        if !self.inner.registry.contains(origin_hash) {
1213            return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1214        }
1215        self.inner
1216            .mesh
1217            .inner()
1218            .unsubscribe_channel(publisher, channel.clone())
1219            .await
1220            .map_err(|e| {
1221                DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1222                    "unsubscribe_channel failed: {e}"
1223                )))
1224            })?;
1225        let _ = self.inner.registry.with_host(origin_hash, |host| {
1226            host.forget_subscription(publisher, &channel);
1227        });
1228        Ok(())
1229    }
1230
1231    /// Pre-register a factory on the target node keyed by the
1232    /// daemon's `origin_hash`, using the caller-supplied `Identity`
1233    /// as the **fallback** keypair.
1234    ///
1235    /// Use this when:
1236    /// - The caller genuinely has the daemon's keypair on hand
1237    ///   (typical: test harnesses that share the same
1238    ///   `Identity` between source and target runtimes).
1239    /// - Migration runs with `transport_identity = false`, so the
1240    ///   snapshot carries no envelope and the target needs a
1241    ///   matching keypair pre-provisioned.
1242    ///
1243    /// For the common envelope-transport case where the target
1244    /// doesn't know the daemon's private key ahead of time,
1245    /// prefer [`Self::expect_migration`] — it registers a
1246    /// placeholder factory keyed only on `origin_hash`, and the
1247    /// envelope in the migration snapshot supplies the real
1248    /// keypair at restore time.
1249    pub fn register_migration_target_identity(
1250        &self,
1251        kind: &str,
1252        identity: Identity,
1253        config: DaemonHostConfig,
1254    ) -> Result<(), DaemonError> {
1255        if self.state() == State::ShuttingDown {
1256            return Err(DaemonError::ShuttingDown);
1257        }
1258        let factory = self.factory_for_kind(kind)?;
1259        let keypair = identity.keypair().as_ref().clone();
1260        let origin_hash = keypair.origin_hash();
1261        let factory_clone = factory.clone();
1262        self.inner
1263            .factory_registry
1264            .register(keypair, config, move || (factory_clone)())
1265            .map_err(DaemonError::Core)?;
1266        // Post-insert fence — a concurrent `shutdown` may have
1267        // raced past the initial state check. Roll back so no
1268        // factory entry outlives the torn-down runtime.
1269        if self.state() == State::ShuttingDown {
1270            self.inner.factory_registry.remove(origin_hash);
1271            return Err(DaemonError::ShuttingDown);
1272        }
1273        Ok(())
1274    }
1275
1276    /// Declare on the target that this node expects a migration
1277    /// for `origin_hash` of the given `kind`. Registers a
1278    /// **placeholder** factory in the core registry — no matching
1279    /// keypair required, because the migration snapshot's
1280    /// [`IdentityEnvelope`](::net::adapter::net::identity::IdentityEnvelope)
1281    /// carries the real keypair and the dispatcher overrides the
1282    /// placeholder at restore time.
1283    ///
1284    /// Fails cleanly if the source migrates without an envelope
1285    /// (e.g., `MigrationOpts { transport_identity: false }`) —
1286    /// the target's factory has no keypair and the dispatcher
1287    /// emits `IdentityTransportFailed`. Use
1288    /// [`Self::register_migration_target_identity`] with a shared
1289    /// identity for the explicit public-identity-migration case.
1290    ///
1291    /// Landing this method closes the seam documented in the
1292    /// `envelope_overrides_target_placeholder_keypair` test of
1293    /// Stage 5b of the identity-migration plan — targets can now
1294    /// pre-register for a migration by `origin_hash` alone.
1295    pub fn expect_migration(
1296        &self,
1297        kind: &str,
1298        origin_hash: u64,
1299        config: DaemonHostConfig,
1300    ) -> Result<(), DaemonError> {
1301        if self.state() == State::ShuttingDown {
1302            return Err(DaemonError::ShuttingDown);
1303        }
1304        let factory = self.factory_for_kind(kind)?;
1305        let factory_clone = factory.clone();
1306        self.inner
1307            .factory_registry
1308            .register_placeholder(origin_hash, config, move || (factory_clone)())
1309            .map_err(DaemonError::Core)?;
1310        // Post-insert fence against a concurrent `shutdown` — see
1311        // the matching comment in `register_migration_target_identity`.
1312        if self.state() == State::ShuttingDown {
1313            self.inner.factory_registry.remove(origin_hash);
1314            return Err(DaemonError::ShuttingDown);
1315        }
1316        Ok(())
1317    }
1318
1319    /// Start migrating a daemon from `source_node` to `target_node`.
1320    /// The orchestrator runs on this node regardless of who owns
1321    /// the daemon — call this on whichever node wants to drive the
1322    /// migration state machine.
1323    ///
1324    /// Returns a [`MigrationHandle`] whose [`MigrationHandle::wait`]
1325    /// resolves when the migration reaches a terminal state
1326    /// (`Complete` on success, `MigrationError` on abort / failure).
1327    ///
1328    /// For the common local-source case (`source_node ==
1329    /// mesh.node_id()`), the snapshot is taken synchronously inside
1330    /// this call and `SnapshotReady` is shipped to the target. For
1331    /// a remote source, the orchestrator sends `TakeSnapshot` to
1332    /// the source and drives the rest of the state machine from
1333    /// inbound wire messages.
1334    pub async fn start_migration(
1335        &self,
1336        origin_hash: u64,
1337        source_node: u64,
1338        target_node: u64,
1339    ) -> Result<MigrationHandle, DaemonError> {
1340        self.start_migration_with(
1341            origin_hash,
1342            source_node,
1343            target_node,
1344            MigrationOpts::default(),
1345        )
1346        .await
1347    }
1348
1349    /// `start_migration` with caller-supplied options. Stage 6 of
1350    /// [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1351    /// lets the caller opt out of identity transport when the daemon
1352    /// doesn't need to sign anything on the target.
1353    pub async fn start_migration_with(
1354        &self,
1355        origin_hash: u64,
1356        source_node: u64,
1357        target_node: u64,
1358        opts: MigrationOpts,
1359    ) -> Result<MigrationHandle, DaemonError> {
1360        self.require_ready()?;
1361        // Clear any stale failure reason from a prior migration
1362        // attempt for this same `origin_hash`. Without this, if the
1363        // previous attempt's `MigrationHandle` was dropped before
1364        // `wait()` ran (or never `wait()`ed at all), the dispatcher's
1365        // failure callback left an entry in `recent_failures` that
1366        // would leak into THIS attempt's `wait()` — a successful
1367        // new migration would incorrectly surface the old reason
1368        // when `wait_one_attempt` hits its None-status branch and
1369        // pops `recent_failures`.
1370        self.inner.recent_failures.lock().remove(&origin_hash);
1371        let msgs = self
1372            .inner
1373            .orchestrator
1374            .start_migration(origin_hash, source_node, target_node)
1375            .map_err(DaemonError::Migration)?;
1376
1377        // Local-source path: `start_migration` builds chunked
1378        // `SnapshotReady` messages synchronously from the local
1379        // registry. If the caller asked for identity transport, seal
1380        // the envelope HERE — the dispatcher's source-side seal only
1381        // fires on the TakeSnapshot path (remote source). Sealing
1382        // operates on the whole snapshot, so reassemble the chunks,
1383        // seal, and rechunk.
1384        let msgs = if opts.transport_identity {
1385            self.maybe_seal_chunked_snapshot(origin_hash, target_node, msgs)
1386                .await?
1387        } else {
1388            msgs
1389        };
1390
1391        // Determine dest_node from the first message variant and
1392        // send all messages in order. `start_migration` returns
1393        // `TakeSnapshot` (single message) when source is remote, or
1394        // a non-empty run of `SnapshotReady` chunks when source is
1395        // local.
1396        let dest_node = match msgs.first() {
1397            Some(MigrationMessage::TakeSnapshot { .. }) => source_node,
1398            Some(MigrationMessage::SnapshotReady { .. }) => target_node,
1399            Some(other) => {
1400                let _ = self
1401                    .inner
1402                    .orchestrator
1403                    .abort_migration(origin_hash, "unexpected initial message".into());
1404                return Err(DaemonError::Migration(MigrationError::StateFailed(
1405                    format!(
1406                        "orchestrator returned unexpected initial migration message: {:?}",
1407                        other
1408                    ),
1409                )));
1410            }
1411            None => {
1412                let _ = self
1413                    .inner
1414                    .orchestrator
1415                    .abort_migration(origin_hash, "orchestrator returned no messages".into());
1416                return Err(DaemonError::Migration(MigrationError::StateFailed(
1417                    "orchestrator returned no migration messages".into(),
1418                )));
1419            }
1420        };
1421
1422        for msg in &msgs {
1423            if let Err(e) = self.send_migration_message(dest_node, msg).await {
1424                let _ = self
1425                    .inner
1426                    .orchestrator
1427                    .abort_migration(origin_hash, format!("initial send failed: {e}"));
1428                return Err(e);
1429            }
1430        }
1431
1432        Ok(MigrationHandle {
1433            origin_hash,
1434            source_node,
1435            target_node,
1436            runtime: self.clone(),
1437            opts,
1438        })
1439    }
1440
1441    /// Decode `snapshot_bytes`, seal an identity envelope using
1442    /// the local daemon's keypair + target's X25519 static pubkey,
1443    /// and re-encode.
1444    ///
1445    /// Called only when the caller opted into envelope transport
1446    /// via `MigrationOpts { transport_identity: true }`. The
1447    /// caller committed to the stronger guarantee at that opt-in
1448    /// point; this helper must never silently downgrade.
1449    ///
1450    /// Resolution:
1451    /// - `Ok(None)`: the snapshot already carries an envelope
1452    ///   (e.g. pre-sealed upstream). Caller proceeds with the
1453    ///   existing bytes — this is not a downgrade, the envelope
1454    ///   is already there.
1455    /// - `Ok(Some(new_bytes))`: sealed successfully; caller should
1456    ///   replace the snapshot payload.
1457    /// - `Err(_)`: any missing prerequisite (peer X25519 static
1458    ///   unknown, daemon keypair absent) or seal-crypto failure.
1459    ///   The caller **must** abort — silently falling back to
1460    ///   unsealed transport would break the caller's opt-in
1461    ///   guarantee, and the target would restore under whatever
1462    ///   pre-provisioned keypair the factory registry carries
1463    ///   (possibly stale, possibly absent).
1464    ///
1465    /// The NKpsk0-responder case (target was the handshake
1466    /// responder, its peer static is not surfaced by `snow`) is a
1467    /// concrete prerequisite-missing scenario that now fails
1468    /// here. Callers in that topology should use `transport_identity:
1469    /// false` explicitly, which signals "I know identity transport
1470    /// isn't reachable; proceed unsealed."
1471    fn maybe_seal_local_snapshot(
1472        &self,
1473        daemon_origin: u64,
1474        target_node: u64,
1475        snapshot_bytes: &[u8],
1476    ) -> Result<Option<Vec<u8>>, DaemonError> {
1477        let snapshot = StateSnapshot::from_bytes(snapshot_bytes).ok_or_else(|| {
1478            DaemonError::Migration(MigrationError::StateFailed(
1479                "failed to decode local snapshot for envelope sealing".into(),
1480            ))
1481        })?;
1482        if snapshot.identity_envelope.is_some() {
1483            // Upstream already sealed — not a downgrade, the
1484            // envelope is present.
1485            return Ok(None);
1486        }
1487        let Some(target_pub) = self.inner.mesh.inner().peer_static_x25519(target_node) else {
1488            return Err(DaemonError::Migration(MigrationError::StateFailed(
1489                format!(
1490                    "identity transport requested but peer X25519 static for \
1491                 {target_node:#x} is unknown (e.g. NKpsk0-responder \
1492                 side) — cannot seal envelope; use \
1493                 `transport_identity: false` to proceed unsealed"
1494                ),
1495            )));
1496        };
1497        let Some(kp) = self.inner.registry.daemon_keypair(daemon_origin) else {
1498            return Err(DaemonError::Migration(MigrationError::StateFailed(
1499                format!(
1500                    "identity transport requested but daemon {daemon_origin:#x} has \
1501                 no local keypair to seal with"
1502                ),
1503            )));
1504        };
1505        snapshot
1506            .with_identity_envelope(&kp, target_pub)
1507            .map(|sealed| Some(sealed.to_bytes()))
1508            .map_err(|e| {
1509                DaemonError::Migration(MigrationError::StateFailed(format!(
1510                    "identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
1511                )))
1512            })
1513    }
1514
1515    /// Reassemble chunked `SnapshotReady` messages into the full
1516    /// snapshot, seal the identity envelope, and rechunk.
1517    ///
1518    /// `start_migration` returns chunked `SnapshotReady` messages on
1519    /// the local-source path; sealing operates on the whole snapshot
1520    /// so chunks must be reassembled first. If `msgs` does not start
1521    /// with `SnapshotReady` (i.e. remote-source `TakeSnapshot` path)
1522    /// the messages are returned unchanged — the dispatcher seals
1523    /// on that path.
1524    ///
1525    /// On any seal failure the orchestrator record is aborted so a
1526    /// retry starts from phase 0.
1527    async fn maybe_seal_chunked_snapshot(
1528        &self,
1529        origin_hash: u64,
1530        target_node: u64,
1531        mut msgs: Vec<MigrationMessage>,
1532    ) -> Result<Vec<MigrationMessage>, DaemonError> {
1533        if !matches!(msgs.first(), Some(MigrationMessage::SnapshotReady { .. })) {
1534            return Ok(msgs);
1535        }
1536
1537        let seq_through = match msgs.first() {
1538            Some(MigrationMessage::SnapshotReady { seq_through, .. }) => *seq_through,
1539            _ => unreachable!("checked by matches! above"),
1540        };
1541
1542        // `chunk_snapshot` emits chunks in `chunk_index` order, but
1543        // sort defensively in case a future caller hands us a
1544        // pre-mixed Vec.
1545        msgs.sort_by_key(|m| match m {
1546            MigrationMessage::SnapshotReady { chunk_index, .. } => *chunk_index,
1547            _ => 0,
1548        });
1549        let mut reassembled: Vec<u8> = Vec::new();
1550        for m in &msgs {
1551            if let MigrationMessage::SnapshotReady { snapshot_bytes, .. } = m {
1552                reassembled.extend_from_slice(snapshot_bytes);
1553            }
1554        }
1555
1556        // `transport_identity: true` is a strict opt-in:
1557        // prerequisites-missing (e.g. NKpsk0-responder) surfaces as
1558        // `Err` and aborts the migration, not a silent downgrade to
1559        // unsealed. `Ok(None)` is reserved for "snapshot already
1560        // carries an envelope" — return the original chunks.
1561        match self.maybe_seal_local_snapshot(origin_hash, target_node, &reassembled) {
1562            Ok(Some(sealed)) => chunk_snapshot(origin_hash, sealed, seq_through).map_err(|e| {
1563                let _ = self
1564                    .inner
1565                    .orchestrator
1566                    .abort_migration(origin_hash, format!("rechunk after seal failed: {e}"));
1567                DaemonError::Migration(e)
1568            }),
1569            Ok(None) => Ok(msgs),
1570            Err(e) => {
1571                let _ = self
1572                    .inner
1573                    .orchestrator
1574                    .abort_migration(origin_hash, format!("envelope seal failed: {e}"));
1575                Err(e)
1576            }
1577        }
1578    }
1579
1580    async fn send_migration_message(
1581        &self,
1582        dest_node: u64,
1583        msg: &MigrationMessage,
1584    ) -> Result<(), DaemonError> {
1585        let addr = self
1586            .inner
1587            .mesh
1588            .inner()
1589            .peer_addr(dest_node)
1590            .ok_or(DaemonError::Migration(MigrationError::TargetUnavailable(
1591                dest_node,
1592            )))?;
1593        let bytes = migration_wire::encode(msg).map_err(DaemonError::Migration)?;
1594        self.inner
1595            .mesh
1596            .inner()
1597            .send_subprotocol(addr, SUBPROTOCOL_MIGRATION, &bytes)
1598            .await
1599            .map_err(|e| {
1600                DaemonError::Migration(MigrationError::StateFailed(format!(
1601                    "send_subprotocol failed: {e}"
1602                )))
1603            })
1604    }
1605
1606    /// Underlying mesh. Exposed read-only so the caller can still
1607    /// reach the channel / subscribe / publish surface without
1608    /// reaching around the runtime.
1609    pub fn mesh(&self) -> &Arc<Mesh> {
1610        &self.inner.mesh
1611    }
1612
1613    // ------------ internal helpers ------------
1614
1615    fn state(&self) -> State {
1616        State::from_u8(self.inner.state.load(Ordering::Acquire))
1617    }
1618
1619    fn require_ready(&self) -> Result<(), DaemonError> {
1620        match self.state() {
1621            State::Ready => Ok(()),
1622            State::Registering => Err(DaemonError::NotReady),
1623            State::ShuttingDown => Err(DaemonError::ShuttingDown),
1624        }
1625    }
1626
1627    fn factory_for_kind(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1628        self.inner
1629            .factories
1630            .read()
1631            .get(kind)
1632            .cloned()
1633            .ok_or_else(|| DaemonError::FactoryNotFound(kind.to_string()))
1634    }
1635
1636    // ─── `groups` feature accessors ─────────────────────────────────
1637    //
1638    // These surface the internal `Scheduler` / `DaemonRegistry` /
1639    // factory closures that the `groups` module (ReplicaGroup /
1640    // ForkGroup / StandbyGroup) needs to wire into core group
1641    // constructors. Kept `pub(crate)` so they don't leak into the
1642    // SDK's public API — group types wrap them in their own
1643    // ergonomic surfaces and callers stay on the clean side of the
1644    // boundary.
1645
1646    /// Shared scheduler for capability-based placement.
1647    #[cfg(feature = "groups")]
1648    pub(crate) fn scheduler_arc(&self) -> Arc<Scheduler> {
1649        self.inner.scheduler.clone()
1650    }
1651
1652    /// Shared migration orchestrator. The
1653    /// `OrchestratorMigrationSnapshotSource` adapter wraps this
1654    /// `Arc` to bridge the compute-layer orchestrator's
1655    /// in-flight state into the MeshOS snapshot's
1656    /// `in_flight_migrations` field. Exposed `pub(crate)` so
1657    /// the in-crate `testing` harness builds the source
1658    /// out-of-band; the adapter type itself lives in
1659    /// `net::adapter::net::behavior::meshos::migration_snapshot_source`.
1660    #[cfg(feature = "testing")]
1661    pub(crate) fn migration_orchestrator_arc(&self) -> Arc<MigrationOrchestrator> {
1662        self.inner.orchestrator.clone()
1663    }
1664
1665    /// Shared daemon registry (group members register/unregister
1666    /// here alongside direct-spawn daemons).
1667    #[cfg(feature = "groups")]
1668    pub(crate) fn registry_arc(&self) -> Arc<DaemonRegistry> {
1669        self.inner.registry.clone()
1670    }
1671
1672    /// Look up a factory closure by kind. Used by group constructors
1673    /// to build members via the same factory the caller registered
1674    /// with `register_factory`.
1675    #[cfg(feature = "groups")]
1676    pub(crate) fn factory_for_kind_pub(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1677        self.factory_for_kind(kind)
1678    }
1679
1680    /// `true` iff the runtime is in the `Ready` state. Groups use
1681    /// this to gate `spawn` (rejecting early-spawn calls with a
1682    /// typed `GroupError::NotReady`).
1683    #[cfg(feature = "groups")]
1684    pub(crate) fn is_ready_pub(&self) -> bool {
1685        self.state() == State::Ready
1686    }
1687}
1688
1689/// Handle to a running daemon. Clone-safe; dropping does not stop
1690/// the daemon — call [`DaemonRuntime::stop`] explicitly.
1691#[derive(Clone)]
1692pub struct DaemonHandle {
1693    /// Daemon's 64-bit origin hash. Stable for the daemon's lifetime
1694    /// and across migrations.
1695    pub origin_hash: u64,
1696    /// Daemon's full 32-byte entity id.
1697    pub entity_id: EntityId,
1698    inner: Arc<Inner>,
1699}
1700
1701impl DaemonHandle {
1702    /// Read the daemon's current stats.
1703    pub fn stats(&self) -> Result<DaemonStats, DaemonError> {
1704        self.inner
1705            .registry
1706            .stats(self.origin_hash)
1707            .map_err(DaemonError::Core)
1708    }
1709
1710    /// Take a snapshot of the daemon's current state. `Ok(None)` for
1711    /// stateless daemons.
1712    pub async fn snapshot(&self) -> Result<Option<StateSnapshot>, DaemonError> {
1713        self.inner
1714            .registry
1715            .snapshot(self.origin_hash)
1716            .map_err(DaemonError::Core)
1717    }
1718}
1719
1720impl std::fmt::Debug for DaemonRuntime {
1721    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1722        let factory_count = self.inner.factories.read().len();
1723        f.debug_struct("DaemonRuntime")
1724            .field("state", &self.state())
1725            .field("factories", &factory_count)
1726            .field("daemons", &self.inner.registry.count())
1727            .finish()
1728    }
1729}
1730
1731impl std::fmt::Debug for DaemonHandle {
1732    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1733        f.debug_struct("DaemonHandle")
1734            .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
1735            .field("entity_id", &self.entity_id)
1736            .finish()
1737    }
1738}
1739
1740/// Stage 3 of `DAEMON_CHANNEL_REBIND_PLAN.md` — after a migration
1741/// target restores a daemon, walk its subscription ledger and
1742/// re-send each `subscribe_channel` to the matching publisher so
1743/// messages flow to the target without waiting for the source's
1744/// entry to age out of the publisher's roster. Errors are
1745/// per-subscription; one publisher being offline doesn't fail the
1746/// rest.
1747///
1748/// Runs in a tokio task spawned by the post-restore callback
1749/// installed on `MigrationSubprotocolHandler`.
1750async fn replay_subscriptions(inner: Arc<Inner>, origin_hash: u64) {
1751    let bindings = match inner
1752        .registry
1753        .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1754    {
1755        Ok(list) => list,
1756        Err(_) => return,
1757    };
1758    for sub in bindings {
1759        let token = sub
1760            .token_bytes
1761            .as_deref()
1762            .and_then(|bytes| PermissionToken::from_bytes(bytes).ok());
1763        let result = match token {
1764            Some(tok) => {
1765                inner
1766                    .mesh
1767                    .inner()
1768                    .subscribe_channel_with_token(sub.publisher, sub.channel.clone(), tok)
1769                    .await
1770            }
1771            None => {
1772                inner
1773                    .mesh
1774                    .inner()
1775                    .subscribe_channel(sub.publisher, sub.channel.clone())
1776                    .await
1777            }
1778        };
1779        if let Err(e) = result {
1780            // Non-fatal: one subscription failing (publisher
1781            // offline, token expired, etc.) must not take down the
1782            // rest of the ledger's replay. The SDK doesn't depend
1783            // on `tracing`; drop the failure to stderr via
1784            // `eprintln!` so operators running `RUST_LOG=warn`
1785            // still see it. Future work can add a `ReplayPartial`
1786            // event on the migration phase stream (plan §
1787            // *Error surface*) to surface failures programmatically.
1788            eprintln!(
1789                "channel re-bind replay failed: daemon={:#x} channel={} publisher={:#x} error={}",
1790                origin_hash, sub.channel, sub.publisher, e,
1791            );
1792        }
1793    }
1794}
1795
1796/// Stage 4 of `DAEMON_CHANNEL_REBIND_PLAN.md` — fires at `Cutover`
1797/// on the source node (just before daemon cleanup). Walks the
1798/// daemon's ledger and sends `unsubscribe_channel` to each
1799/// publisher so rosters drop the source without waiting for
1800/// session-timeout (~30 s). Fire-and-forget: we don't block the
1801/// cutover dispatch on acks.
1802async fn teardown_subscriptions(inner: Arc<Inner>, bindings: Vec<SubscriptionBinding>) {
1803    for sub in bindings {
1804        if let Err(e) = inner
1805            .mesh
1806            .inner()
1807            .unsubscribe_channel(sub.publisher, sub.channel.clone())
1808            .await
1809        {
1810            // Non-fatal; the publisher's session timeout will
1811            // eventually clean up our stale roster entry even
1812            // without an explicit Unsubscribe.
1813            eprintln!(
1814                "channel re-bind teardown failed: channel={} publisher={:#x} error={}",
1815                sub.channel, sub.publisher, e,
1816            );
1817        }
1818    }
1819}
1820
1821/// Options for [`DaemonRuntime::start_migration_with`].
1822///
1823/// - Stage 6 of
1824///   [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1825///   the `transport_identity` flag. Default `true`.
1826/// - Stages 3 + 4 of
1827///   [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
1828///   the `retry_not_ready` budget. When the migration target
1829///   responds `NotReady` (runtime still in `Registering`), the
1830///   source backs off + re-initiates up to this total elapsed
1831///   time. `None` disables retry; the first `NotReady` surfaces
1832///   immediately.
1833#[derive(Debug, Clone)]
1834pub struct MigrationOpts {
1835    /// If `true` (default), the source node seals its daemon's
1836    /// ed25519 seed into the outbound snapshot using the target's
1837    /// X25519 static pubkey. The target unseals on arrival and the
1838    /// migrated daemon keeps its full signing capability.
1839    ///
1840    /// If `false`, the envelope is omitted and the target
1841    /// reconstructs the daemon with a `public_only` keypair —
1842    /// identity queries (`entity_id`, `origin_hash`) work, but
1843    /// `sign` calls fail with `EntityError::ReadOnly`. Appropriate
1844    /// for pure compute daemons that only consume events and emit
1845    /// payloads, and do NOT need to mint capability announcements
1846    /// or issue permission tokens from the target.
1847    pub transport_identity: bool,
1848
1849    /// Retry budget for [`MigrationFailureReason::NotReady`].
1850    ///
1851    /// `Some(d)` (default 30 s): on `NotReady`, the source backs
1852    /// off (500 ms → 1 s → 2 s → 4 s → 8 s, capped at 16 s) and
1853    /// re-initiates the migration. The total retry clock is
1854    /// capped at `d`; after that, the caller sees
1855    /// [`MigrationFailureReason::NotReadyTimeout`].
1856    ///
1857    /// `None`: no retry. The first `NotReady` surfaces as a
1858    /// terminal failure to the caller.
1859    pub retry_not_ready: Option<std::time::Duration>,
1860}
1861
1862impl Default for MigrationOpts {
1863    fn default() -> Self {
1864        Self {
1865            transport_identity: true,
1866            retry_not_ready: Some(std::time::Duration::from_secs(30)),
1867        }
1868    }
1869}
1870
1871/// Exponential backoff for the i-th `NotReady` retry attempt.
1872/// First retry waits 500 ms, subsequent retries double up to a
1873/// 16 s cap — total budget is controlled separately by
1874/// [`MigrationOpts::retry_not_ready`]. Matching the schedule in
1875/// `DAEMON_RUNTIME_READINESS_PLAN.md` § *Source-side retry*.
1876fn not_ready_backoff(attempt: u8) -> std::time::Duration {
1877    use std::time::Duration;
1878    let ms = 500u64 << (attempt.saturating_sub(1).min(5));
1879    Duration::from_millis(ms.min(16_000))
1880}
1881
1882/// Handle to an in-flight migration. Drop the handle and the
1883/// orchestrator continues driving the migration to completion in the
1884/// background; keep it to observe phase transitions or request abort.
1885///
1886/// Cheap to clone — the backing state is shared with the
1887/// [`DaemonRuntime`] that produced it.
1888#[derive(Clone)]
1889pub struct MigrationHandle {
1890    /// Daemon being migrated.
1891    pub origin_hash: u64,
1892    /// Source node that currently hosts the daemon.
1893    pub source_node: u64,
1894    /// Target node that will host the daemon after cutover.
1895    pub target_node: u64,
1896    /// Runtime the orchestrator lives on. Used by [`Self::phase`]
1897    /// and [`Self::wait`] to poll migration state.
1898    runtime: DaemonRuntime,
1899    /// Options the migration was initiated with. Drives retry
1900    /// policy on `NotReady` + the identity-transport flag for
1901    /// re-initiated attempts.
1902    opts: MigrationOpts,
1903}
1904
1905impl MigrationHandle {
1906    /// Current migration phase, or `None` once the migration has
1907    /// left the orchestrator's records (either via `Complete` → auto
1908    /// cleanup, or via explicit abort). Callers distinguish the two
1909    /// by remembering the last non-None phase.
1910    pub fn phase(&self) -> Option<MigrationPhase> {
1911        self.runtime.inner.orchestrator.status(self.origin_hash)
1912    }
1913
1914    /// Block until the migration reaches a terminal state.
1915    ///
1916    /// Returns `Ok(())` on normal completion (saw `Complete`, then
1917    /// the orchestrator cleaned up). Returns `Err(MigrationError)`
1918    /// if the orchestrator's record disappeared without the caller
1919    /// ever having seen `Complete` — either an explicit abort or a
1920    /// failure at some upstream stage.
1921    ///
1922    /// This method does **not** enforce any wall-clock timeout. A
1923    /// migration that stalls waiting on an unresponsive peer will
1924    /// block indefinitely; callers that want a bound should use
1925    /// [`Self::wait_with_timeout`] instead.
1926    ///
1927    /// Polls every 50 ms. The implementation is deliberately
1928    /// simple — Stage 2 of `DAEMON_IDENTITY_MIGRATION_PLAN.md` and
1929    /// the V2 iteration of this plan will swap this for a
1930    /// broadcast-channel push, but 50 ms polling is plenty for the
1931    /// use cases a migration API sees today.
1932    pub async fn wait(self) -> Result<(), DaemonError> {
1933        self.wait_until(None).await
1934    }
1935
1936    /// Like [`Self::wait`] with a caller-controlled timeout. A
1937    /// timeout aborts the orchestrator-side record and returns
1938    /// `Err(MigrationError::StateFailed)`; a graceful `Complete`
1939    /// returns `Ok`.
1940    pub async fn wait_with_timeout(self, timeout: std::time::Duration) -> Result<(), DaemonError> {
1941        let deadline = tokio::time::Instant::now() + timeout;
1942        self.wait_until(Some(deadline)).await
1943    }
1944
1945    /// Inner wait loop with an optional deadline. `None` = block
1946    /// forever until the migration reaches a terminal state;
1947    /// `Some(d)` = give up and abort at `d`.
1948    ///
1949    /// Centralised here so `wait` and `wait_with_timeout` can't
1950    /// drift on the retry/backoff semantics, and so the
1951    /// "hidden 60 s ceiling" cannot reappear under `wait`.
1952    async fn wait_until(
1953        self,
1954        overall_deadline: Option<tokio::time::Instant>,
1955    ) -> Result<(), DaemonError> {
1956        let start = tokio::time::Instant::now();
1957        let retry_deadline = self.opts.retry_not_ready.map(|b| start + b);
1958        let mut attempts: u8 = 1; // first attempt initiated by start_migration_with
1959        loop {
1960            match self.wait_one_attempt(overall_deadline).await {
1961                Ok(()) => return Ok(()),
1962                Err(DaemonError::MigrationFailed(reason)) if reason.is_retriable() => {
1963                    // Retry decision.
1964                    let Some(retry_d) = retry_deadline else {
1965                        // Opts explicitly disable retry — surface
1966                        // the NotReady verbatim.
1967                        return Err(DaemonError::MigrationFailed(reason));
1968                    };
1969                    let now = tokio::time::Instant::now();
1970                    let overall_exhausted = overall_deadline.map(|d| now >= d).unwrap_or(false);
1971                    if now >= retry_d || overall_exhausted {
1972                        // Budget exhausted. `NotReadyTimeout` carries
1973                        // the attempt count for operator diagnosis.
1974                        return Err(DaemonError::MigrationFailed(
1975                            MigrationFailureReason::NotReadyTimeout { attempts },
1976                        ));
1977                    }
1978                    // Back off and re-initiate.
1979                    let backoff = not_ready_backoff(attempts);
1980                    tokio::time::sleep(backoff).await;
1981                    attempts = attempts.saturating_add(1);
1982                    // Re-initiate the migration by calling the
1983                    // orchestrator fresh. The previous record has
1984                    // been cleaned up by the dispatcher's
1985                    // MigrationFailed handler, so this starts a new
1986                    // attempt from phase 0.
1987                    self.reinitiate_attempt().await?;
1988                    // Loop; poll this new attempt's outcome.
1989                }
1990                Err(e) => return Err(e),
1991            }
1992        }
1993    }
1994
1995    /// Poll a single migration attempt's outcome. Returns:
1996    /// - `Ok(())` on Complete.
1997    /// - `Err(MigrationFailed(reason))` when the dispatcher
1998    ///   observed a structured failure.
1999    /// - `Err(Migration(_))` on overall-timeout or unknown abort.
2000    ///
2001    /// `overall_deadline = None` disables the deadline check — the
2002    /// poll loop only returns via a terminal status transition.
2003    async fn wait_one_attempt(
2004        &self,
2005        overall_deadline: Option<tokio::time::Instant>,
2006    ) -> Result<(), DaemonError> {
2007        loop {
2008            let current_phase = self.runtime.inner.orchestrator.status(self.origin_hash);
2009            match current_phase {
2010                Some(phase) => {
2011                    if phase == MigrationPhase::Complete {
2012                        // Give the dispatcher a beat to finish
2013                        // cleanup, then surface success.
2014                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2015                        return Ok(());
2016                    }
2017                }
2018                None => {
2019                    // A recorded failure is authoritative — the
2020                    // dispatcher populates `recent_failures` before
2021                    // the orchestrator removes the record, so
2022                    // status=None + recorded-reason unambiguously
2023                    // means abort.
2024                    if let Some(reason) = self.take_recent_failure() {
2025                        return Err(DaemonError::MigrationFailed(reason));
2026                    }
2027                    // No recorded failure. The orchestrator removes
2028                    // records via two paths:
2029                    //   1. `on_activate_ack` — success.
2030                    //   2. `abort_migration_with_reason` — failure,
2031                    //      which rides through the dispatcher's
2032                    //      `MigrationFailed` handler *before* the
2033                    //      record is dropped, so `recent_failures`
2034                    //      is populated first.
2035                    // Therefore status=None + no-recorded-failure is
2036                    // unambiguously success, regardless of what
2037                    // phase we last observed. This matters when the
2038                    // dispatcher runs the tail of the migration
2039                    // (Cutover → Complete → ActivateAck) entirely
2040                    // between two 50 ms polls — we may never observe
2041                    // `Complete` explicitly.
2042                    //
2043                    // Synchronous abort paths that bypass the
2044                    // dispatcher (`wait_one_attempt`'s own timeout,
2045                    // `start_migration_with`'s send-failure path)
2046                    // return `Err` to the caller *before* the wait
2047                    // loop can observe the None status, so they
2048                    // don't trip this branch.
2049                    return Ok(());
2050                }
2051            }
2052            if let Some(d) = overall_deadline {
2053                if tokio::time::Instant::now() >= d {
2054                    let _ = self
2055                        .runtime
2056                        .inner
2057                        .orchestrator
2058                        .abort_migration(self.origin_hash, "timeout".into());
2059                    return Err(DaemonError::Migration(MigrationError::StateFailed(
2060                        format!("migration timed out in phase {:?}", current_phase),
2061                    )));
2062                }
2063            }
2064            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2065        }
2066    }
2067
2068    /// Re-initiate a migration attempt for the same daemon after a
2069    /// retriable failure. Calls the orchestrator fresh + sends the
2070    /// first wire message; mirrors the tail of
2071    /// [`DaemonRuntime::start_migration_with`] but without building
2072    /// a new handle (we keep the existing one).
2073    async fn reinitiate_attempt(&self) -> Result<(), DaemonError> {
2074        let msgs = self
2075            .runtime
2076            .inner
2077            .orchestrator
2078            .start_migration(self.origin_hash, self.source_node, self.target_node)
2079            .map_err(DaemonError::Migration)?;
2080
2081        let msgs = if self.opts.transport_identity {
2082            self.runtime
2083                .maybe_seal_chunked_snapshot(self.origin_hash, self.target_node, msgs)
2084                .await?
2085        } else {
2086            msgs
2087        };
2088
2089        let dest_node = match msgs.first() {
2090            Some(MigrationMessage::TakeSnapshot { .. }) => self.source_node,
2091            Some(MigrationMessage::SnapshotReady { .. }) => self.target_node,
2092            Some(other) => {
2093                let _ = self
2094                    .runtime
2095                    .inner
2096                    .orchestrator
2097                    .abort_migration(self.origin_hash, "unexpected retry message".into());
2098                return Err(DaemonError::Migration(MigrationError::StateFailed(
2099                    format!("unexpected retry initial message: {other:?}"),
2100                )));
2101            }
2102            None => {
2103                let _ = self.runtime.inner.orchestrator.abort_migration(
2104                    self.origin_hash,
2105                    "orchestrator returned no retry messages".into(),
2106                );
2107                return Err(DaemonError::Migration(MigrationError::StateFailed(
2108                    "orchestrator returned no migration messages on retry".into(),
2109                )));
2110            }
2111        };
2112
2113        for msg in &msgs {
2114            self.runtime.send_migration_message(dest_node, msg).await?;
2115        }
2116        Ok(())
2117    }
2118
2119    /// Pop the most recent `MigrationFailureReason` the dispatcher
2120    /// observed for this migration (if any). Consumed: subsequent
2121    /// calls see `None` until the next failure arrives.
2122    fn take_recent_failure(&self) -> Option<MigrationFailureReason> {
2123        self.runtime
2124            .inner
2125            .recent_failures
2126            .lock()
2127            .remove(&self.origin_hash)
2128    }
2129
2130    /// Request abort. The orchestrator emits a `MigrationFailed`
2131    /// message to involved nodes and clears its record; the target
2132    /// rolls back via its own handler. Best-effort — a migration
2133    /// past `Cutover` cannot be undone cleanly because routing has
2134    /// already flipped.
2135    pub async fn cancel(&self) -> Result<(), DaemonError> {
2136        let msg = self
2137            .runtime
2138            .inner
2139            .orchestrator
2140            .abort_migration(self.origin_hash, "cancel requested".into())
2141            .map_err(DaemonError::Migration)?;
2142        // Best-effort notify — ignore send errors, the orchestrator
2143        // record is already gone on our side.
2144        let _ = self
2145            .runtime
2146            .send_migration_message(self.source_node, &msg)
2147            .await;
2148        let _ = self
2149            .runtime
2150            .send_migration_message(self.target_node, &msg)
2151            .await;
2152        Ok(())
2153    }
2154}
2155
2156impl std::fmt::Debug for MigrationHandle {
2157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2158        f.debug_struct("MigrationHandle")
2159            .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
2160            .field("source_node", &format_args!("{:#x}", self.source_node))
2161            .field("target_node", &format_args!("{:#x}", self.target_node))
2162            .field("phase", &self.phase())
2163            .finish()
2164    }
2165}