net_sdk/compute.rs
1//! Compute surface — `MeshDaemon` + `DaemonRuntime`.
2//!
3//! Users implement [`MeshDaemon`] and hand it to a [`DaemonRuntime`]
4//! tied to a [`Mesh`] node. The runtime holds the
5//! kind-keyed factory table, the per-daemon host registry, and the
6//! lifecycle gate that decides when inbound migrations may land.
7//!
8//! This file is Stage 1 of
9//! [`SDK_COMPUTE_SURFACE_PLAN.md`](../../../docs/SDK_COMPUTE_SURFACE_PLAN.md)
10//! plus the lifecycle half of
11//! [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
12//! local spawn / snapshot / stop, with an explicit
13//! `Registering → Ready → ShuttingDown` fence. Migration is Stage 2;
14//! the wire-level half of the readiness plan
15//! (`MigrationFailureReason`) ships alongside it.
16//!
17//! # Example
18//!
19//! ```no_run
20//! use std::sync::Arc;
21//! use bytes::Bytes;
22//! use net_sdk::{Identity, Mesh};
23//! use net_sdk::compute::{
24//! CausalEvent, DaemonHostConfig, DaemonRuntime, MeshDaemon,
25//! };
26//! use net_sdk::capabilities::CapabilityFilter;
27//! use net::adapter::net::compute::DaemonError as CoreDaemonError;
28//!
29//! struct EchoDaemon;
30//! impl MeshDaemon for EchoDaemon {
31//! fn name(&self) -> &str { "echo" }
32//! fn requirements(&self) -> CapabilityFilter { CapabilityFilter::default() }
33//! fn process(&mut self, event: &CausalEvent) -> Result<Vec<Bytes>, CoreDaemonError> {
34//! Ok(vec![event.payload.clone()])
35//! }
36//! }
37//!
38//! # async fn example(mesh: Arc<Mesh>) -> Result<(), Box<dyn std::error::Error>> {
39//! let rt = DaemonRuntime::new(mesh);
40//! rt.register_factory("echo", || Box::new(EchoDaemon))?;
41//! rt.start().await?;
42//! let handle = rt.spawn("echo", Identity::generate(), DaemonHostConfig::default()).await?;
43//! # Ok(())
44//! # }
45//! ```
46
47use parking_lot::{Mutex, RwLock};
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
50use std::sync::Arc;
51
52use thiserror::Error;
53
54pub use ::net::adapter::net::compute::{
55 DaemonBindings, DaemonError as CoreDaemonError, DaemonHostConfig, DaemonStats, MeshDaemon,
56 MigrationError, MigrationFailureReason, MigrationPhase, PlacementDecision, SchedulerError,
57 SubscriptionBinding, SUBPROTOCOL_MIGRATION,
58};
59pub use ::net::adapter::net::state::causal::{CausalEvent, CausalLink};
60pub use ::net::adapter::net::state::snapshot::StateSnapshot;
61
62use ::net::adapter::net::channel::ChannelName;
63use ::net::adapter::net::identity::PermissionToken;
64
65use ::net::adapter::net::behavior::capability::CapabilitySet;
66use ::net::adapter::net::compute::{
67 chunk_snapshot, orchestrator::wire as migration_wire, DaemonFactoryRegistry, DaemonHost,
68 DaemonRegistry, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
69 MigrationTargetHandler, Scheduler,
70};
71use ::net::adapter::net::identity::EntityId;
72use ::net::adapter::net::subprotocol::{
73 FailureCallback, MigrationHandlerHooks, MigrationSubprotocolHandler, PostRestoreCallback,
74 PreCleanupCallback, ReadinessCallback,
75};
76
77use crate::identity::Identity;
78use crate::mesh::Mesh;
79
80/// Arc-wrapped factory closure. Kind-keyed at the SDK layer; cloned
81/// into the core `DaemonFactoryRegistry` at `spawn` time so a future
82/// migration target can reconstruct the daemon by `origin_hash`.
83type FactoryFn = Arc<dyn Fn() -> Box<dyn MeshDaemon> + Send + Sync>;
84
85/// Errors from the SDK daemon runtime.
86#[derive(Debug, Error)]
87pub enum DaemonError {
88 /// `start()` has not been called yet; the runtime is still in
89 /// `Registering` and will not accept spawns or migrations.
90 #[error("daemon runtime is not ready — call DaemonRuntime::start() first")]
91 NotReady,
92 /// `shutdown()` has been called; the runtime is permanently
93 /// non-functional.
94 #[error("daemon runtime has been shut down")]
95 ShuttingDown,
96 /// Two `register_factory` calls used the same `kind` string.
97 #[error("factory for kind '{0}' is already registered")]
98 FactoryAlreadyRegistered(String),
99 /// `spawn` / `spawn_from_snapshot` referenced an unregistered kind.
100 #[error("no factory registered for kind '{0}'")]
101 FactoryNotFound(String),
102 /// The snapshot's `entity_id.origin_hash` does not match the
103 /// identity handed to `spawn_from_snapshot`.
104 #[error(
105 "snapshot/identity mismatch: snapshot origin {snapshot:#x} != identity origin {identity:#x}"
106 )]
107 SnapshotIdentityMismatch { snapshot: u64, identity: u64 },
108 /// Pass-through for errors surfaced by the core compute layer.
109 #[error(transparent)]
110 Core(#[from] CoreDaemonError),
111 /// Pass-through for migration-layer errors.
112 #[error("migration failed: {0}")]
113 Migration(#[from] MigrationError),
114 /// Structured failure reason surfaced by the migration
115 /// dispatcher on the source side. Use
116 /// [`MigrationFailureReason::is_retriable`] to decide whether
117 /// the caller should back off and retry rather than propagating.
118 #[error("migration failed: {0}")]
119 MigrationFailed(MigrationFailureReason),
120}
121
122// Runtime state machine. Encoded as `u8` so it rides in an
123// `AtomicU8` without an extra layer of indirection. Values are
124// stable across the lifetime of a `DaemonRuntime` and must not be
125// reordered (release / acquire cmpxchg compares by value).
126#[repr(u8)]
127#[derive(Copy, Clone, Debug, Eq, PartialEq)]
128enum State {
129 Registering = 0,
130 Ready = 1,
131 ShuttingDown = 2,
132}
133
134impl State {
135 fn from_u8(v: u8) -> Self {
136 match v {
137 0 => State::Registering,
138 1 => State::Ready,
139 2 => State::ShuttingDown,
140 // `AtomicU8` is only written through `State::*` variants,
141 // so any other value means memory corruption. Panic
142 // loudly rather than silently misinterpret.
143 other => panic!("daemon runtime: corrupt state byte {other}"),
144 }
145 }
146}
147
148/// Per-mesh compute runtime.
149///
150/// Holds the kind-keyed factory table, the per-daemon host registry,
151/// and the `Registering → Ready → ShuttingDown` lifecycle gate. One
152/// `DaemonRuntime` per [`Mesh`]; clone the handle freely — the inner
153/// state is `Arc`-shared.
154#[derive(Clone)]
155pub struct DaemonRuntime {
156 inner: Arc<Inner>,
157}
158
159struct Inner {
160 mesh: Arc<Mesh>,
161 state: AtomicU8,
162 /// SDK-side kind → factory map. The migration target path reaches
163 /// into the core `factory_registry` below by `origin_hash`;
164 /// `kind` is SDK sugar so the *caller* can spawn without knowing
165 /// the underlying keypair.
166 factories: RwLock<HashMap<String, FactoryFn>>,
167 /// Core registry — shared with the migration target handler so
168 /// daemons restored from an inbound snapshot land in the same
169 /// map a local `spawn` uses.
170 registry: Arc<DaemonRegistry>,
171 /// Core scheduler — built once at `new()` from the mesh's
172 /// shared `CapabilityIndex`. Used by the `groups` feature's
173 /// `ReplicaGroup` / `ForkGroup` / `StandbyGroup` for
174 /// capability-based placement.
175 ///
176 /// `local_caps` is `CapabilitySet::default()` because the mesh
177 /// doesn't currently expose the locally-announced caps as a
178 /// readable snapshot. The only behavioral impact is that
179 /// capability-filtered daemons which could run locally won't
180 /// get `LocalPreferred` short-circuit placement — they fall
181 /// through to the `CapabilityIndex::query` path, which still
182 /// returns the local node (with the right announcements) and
183 /// places correctly. Revisit when a `MeshNode::local_caps()`
184 /// getter lands.
185 #[cfg_attr(not(feature = "groups"), allow(dead_code))]
186 scheduler: Arc<Scheduler>,
187 /// Core factory registry, keyed by `origin_hash`. `spawn` mirrors
188 /// each SDK-side kind registration into this map with the
189 /// concrete keypair attached so the migration target restores
190 /// through the existing `DaemonFactoryRegistry::construct` path.
191 factory_registry: Arc<DaemonFactoryRegistry>,
192 /// Migration orchestrator, owned by this node. Orchestrates the
193 /// 6-phase state machine when this node initiates a migration.
194 orchestrator: Arc<MigrationOrchestrator>,
195 /// Migration source handler — drives the source side when THIS
196 /// node is the source of a migration.
197 source_handler: Arc<MigrationSourceHandler>,
198 /// Migration target handler — drives the target side when THIS
199 /// node is the target of a migration.
200 target_handler: Arc<MigrationTargetHandler>,
201 /// Most recent `MigrationFailureReason` observed on the source
202 /// side for each migration, keyed by `daemon_origin`. Populated
203 /// by the dispatcher's failure callback; consumed by
204 /// `MigrationHandle::wait` to surface the reason to the caller.
205 /// Mutex because the SDK's dependency set doesn't include
206 /// `dashmap` directly, and this map sees low write frequency
207 /// (one entry per failed migration).
208 recent_failures: Mutex<HashMap<u64, MigrationFailureReason>>,
209 /// Test-only knob: when set to `true`, the readiness callback
210 /// reports "not ready" even when the runtime is in `Ready`.
211 /// Lets integration tests drive the `NotReady` retry path
212 /// without racing against runtime startup. Defaults to `false`;
213 /// production code should not touch it.
214 simulate_not_ready: AtomicBool,
215 /// Test-only stall injected into `spawn` between the
216 /// `require_ready` check and the registry inserts. Measured
217 /// in milliseconds; `0` = no stall (production default). See
218 /// [`DaemonRuntime::set_spawn_stall_ms`].
219 spawn_stall_ms: std::sync::atomic::AtomicU32,
220 /// Test-only stall injected into `start` between installing
221 /// the migration handler and the Registering→Ready CAS.
222 /// Measured in milliseconds; `0` = no stall.
223 start_stall_ms: std::sync::atomic::AtomicU32,
224 /// Per-origin post-delivery observers. Fired on every successful
225 /// `deliver(origin_hash, event)`, inside the registry call and
226 /// after the daemon's `process` returns Ok.
227 ///
228 /// **Why this exists.** `StandbyGroup` needs every event routed
229 /// to its active member captured in a replay buffer so a
230 /// promoted standby can reapply the window between the last
231 /// `sync_standbys` and the failure. A manual `on_event_delivered`
232 /// hook relied on the caller pairing every `deliver` with the
233 /// buffering call, which silently lost events on omission.
234 /// Observers close that gap: a group installs a weak-ref
235 /// closure that pushes the event into its buffer automatically,
236 /// with no contract on the caller.
237 ///
238 /// `pub(crate)` so `sdk/src/groups/*` can register; not a
239 /// stable public API. The observer list is a `Vec` so unrelated
240 /// subsystems (future audit / metrics hooks) could coexist on
241 /// the same origin without one overwriting another.
242 observers: Mutex<HashMap<u64, Vec<(u64, DeliverObserver)>>>,
243 /// Monotonic id minted by `register_deliver_observer`, used by
244 /// the returned `ObserverHandle` to identify its entry on
245 /// `Drop`/`unregister`.
246 #[cfg_attr(not(feature = "groups"), allow(dead_code))]
247 observer_id_counter: std::sync::atomic::AtomicU64,
248}
249
250/// A post-delivery observer closure registered against an
251/// `origin_hash`. Fires on every successful `DaemonRuntime::deliver`
252/// for that origin.
253///
254/// Observers MUST be cheap — they run on the delivery thread, after
255/// the daemon's `process` but before `deliver` returns. A slow or
256/// panicking observer stalls delivery. The in-tree use (standby
257/// replay buffer) is a single `VecDeque::push_back` under a
258/// short-lived lock; new observers should aim for similar.
259pub(crate) type DeliverObserver = Arc<dyn Fn(&CausalEvent) + Send + Sync>;
260
261/// Handle returned by
262/// [`DaemonRuntime::register_deliver_observer`]. Dropping the
263/// handle removes the observer from the runtime's per-origin
264/// observer list. Safe to drop after the runtime itself has been
265/// dropped: the `Weak` upgrade in `Drop` silently no-ops.
266#[cfg_attr(not(feature = "groups"), allow(dead_code))]
267pub(crate) struct ObserverHandle {
268 runtime: std::sync::Weak<Inner>,
269 origin: u64,
270 id: u64,
271}
272
273impl Drop for ObserverHandle {
274 fn drop(&mut self) {
275 if let Some(inner) = self.runtime.upgrade() {
276 let mut map = inner.observers.lock();
277 if let Some(v) = map.get_mut(&self.origin) {
278 v.retain(|(id, _)| *id != self.id);
279 if v.is_empty() {
280 map.remove(&self.origin);
281 }
282 }
283 }
284 }
285}
286
287impl DaemonRuntime {
288 /// Attach a runtime to an existing [`Mesh`]. Stage 1 does not
289 /// consume the `Mesh` — users keep their `Arc<Mesh>` for channel
290 /// registration, subscription, and the rest of the non-compute
291 /// surface. Stage 2 will install the migration subprotocol
292 /// handler when [`Self::start`] runs; until then inbound
293 /// migration messages (if any) are silently dropped by the core,
294 /// same as today.
295 pub fn new(mesh: Arc<Mesh>) -> Self {
296 let local_node_id = mesh.inner().node_id();
297 let registry = Arc::new(DaemonRegistry::new());
298 let factory_registry = Arc::new(DaemonFactoryRegistry::new());
299 let source_handler = Arc::new(MigrationSourceHandler::new(registry.clone()));
300 // Wire `source_handler` into the orchestrator so
301 // local-source migrations register the migration in the
302 // source-side handler — without this, post-snapshot events
303 // are silently mutated into the source daemon's state and
304 // lost at cutover. See
305 // `MigrationOrchestrator::with_source_handler`.
306 let orchestrator = Arc::new(
307 MigrationOrchestrator::new(registry.clone(), local_node_id)
308 .with_source_handler(source_handler.clone()),
309 );
310 let target_handler = Arc::new(MigrationTargetHandler::new_with_factories(
311 registry.clone(),
312 factory_registry.clone(),
313 ));
314 // Scheduler shares the mesh's `CapabilityFold`, so
315 // announcements the mesh publishes become visible to
316 // placement queries immediately. `CapabilitySet::default()`
317 // for `local_caps` is a known gap — see the `scheduler`
318 // field's docstring on [`Inner`].
319 let scheduler = Arc::new(Scheduler::new(
320 mesh.inner().capability_fold().clone(),
321 local_node_id,
322 CapabilitySet::default(),
323 ));
324 Self {
325 inner: Arc::new(Inner {
326 mesh,
327 state: AtomicU8::new(State::Registering as u8),
328 factories: RwLock::new(HashMap::new()),
329 registry,
330 scheduler,
331 factory_registry,
332 orchestrator,
333 source_handler,
334 target_handler,
335 recent_failures: Mutex::new(HashMap::new()),
336 simulate_not_ready: AtomicBool::new(false),
337 spawn_stall_ms: std::sync::atomic::AtomicU32::new(0),
338 start_stall_ms: std::sync::atomic::AtomicU32::new(0),
339 observers: Mutex::new(HashMap::new()),
340 observer_id_counter: std::sync::atomic::AtomicU64::new(1),
341 }),
342 }
343 }
344
345 /// Register a post-delivery observer for `origin_hash`. Every
346 /// successful `deliver(origin_hash, event)` fires the closure
347 /// after the daemon's `process` returns Ok. Returns an
348 /// [`ObserverHandle`] whose `Drop` unregisters the observer.
349 ///
350 /// `pub(crate)` — only the SDK's own group wrappers use this.
351 /// See the `observers` field on [`Inner`] for the design rationale.
352 #[cfg_attr(not(feature = "groups"), allow(dead_code))]
353 pub(crate) fn register_deliver_observer(
354 &self,
355 origin: u64,
356 cb: DeliverObserver,
357 ) -> ObserverHandle {
358 let id = self
359 .inner
360 .observer_id_counter
361 .fetch_add(1, Ordering::Relaxed);
362 {
363 let mut map = self.inner.observers.lock();
364 map.entry(origin).or_default().push((id, cb));
365 }
366 ObserverHandle {
367 runtime: Arc::downgrade(&self.inner),
368 origin,
369 id,
370 }
371 }
372
373 /// Register a factory for a daemon type. `kind` is a user-chosen
374 /// string shared across every node that may host this daemon.
375 /// Second registrations of the same `kind` return
376 /// [`DaemonError::FactoryAlreadyRegistered`].
377 ///
378 /// Valid in both `Registering` and `Ready` states; the runtime
379 /// permits new kinds to appear at runtime. Only `ShuttingDown`
380 /// rejects.
381 ///
382 /// # Migration targeting
383 ///
384 /// `register_factory` alone is **not sufficient** to accept
385 /// inbound migrations — it registers the kind-to-closure mapping
386 /// only on the SDK side. The core migration dispatcher looks up
387 /// factories by `origin_hash` (the daemon's identity), not by
388 /// `kind`, because the migration wire protocol doesn't carry a
389 /// kind string; the target couldn't pick the right factory from
390 /// an inbound snapshot without an explicit binding.
391 ///
392 /// To accept migrations for a specific daemon, the target must
393 /// ALSO call one of:
394 ///
395 /// - [`Self::expect_migration`] `(kind, origin_hash, config)` —
396 /// placeholder factory keyed by `origin_hash`; the envelope
397 /// on the snapshot supplies the keypair at restore time.
398 /// This is the common case.
399 /// - [`Self::register_migration_target_identity`] `(kind,
400 /// identity, config)` — pre-provisions the keypair as a
401 /// fallback when the source migrates with
402 /// `transport_identity: false`.
403 ///
404 /// Or spawn the daemon locally first (via [`Self::spawn`]);
405 /// spawn seeds both the SDK map and the core registry, so a
406 /// daemon that migrated out and migrates back in on the same
407 /// node is covered without extra calls.
408 pub fn register_factory<F>(&self, kind: &str, factory: F) -> Result<(), DaemonError>
409 where
410 F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
411 {
412 if self.state() == State::ShuttingDown {
413 return Err(DaemonError::ShuttingDown);
414 }
415 // `contains_key` + `insert` would be atomic under the
416 // single `write` guard, but the `entry` API makes
417 // atomicity self-evident in one expression — no opening
418 // for a future reviewer to wonder whether the check and
419 // the insert can drift across two separately-acquired
420 // guards.
421 let mut map = self.inner.factories.write();
422 match map.entry(kind.to_string()) {
423 std::collections::hash_map::Entry::Occupied(_) => {
424 Err(DaemonError::FactoryAlreadyRegistered(kind.to_string()))
425 }
426 std::collections::hash_map::Entry::Vacant(slot) => {
427 slot.insert(Arc::new(factory));
428 Ok(())
429 }
430 }
431 }
432
433 /// Promote to `Ready`. Idempotent — a second call on an already-
434 /// `Ready` runtime is a no-op; a call on a `ShuttingDown` runtime
435 /// returns [`DaemonError::ShuttingDown`].
436 ///
437 /// Wires the migration subprotocol (`0x0500`) handler into the
438 /// mesh so inbound `TakeSnapshot` / `SnapshotReady` / etc.
439 /// messages reach the orchestrator / source / target handlers
440 /// owned by this runtime. Installing is idempotent w.r.t.
441 /// multiple `start` calls — the `ArcSwapOption` on the mesh
442 /// swaps the same handler in on each call.
443 pub async fn start(&self) -> Result<(), DaemonError> {
444 loop {
445 match self.state() {
446 State::Registering => {
447 // Install the migration subprotocol handler
448 // **before** publishing `Ready`. Other threads
449 // that observe `Ready` must be able to rely on
450 // the handler being live: the previous ordering
451 // (CAS → install) left a window where a
452 // concurrent caller read `Ready`, began a
453 // migration, and sent `SnapshotReady` onto a
454 // mesh whose handler slot was still empty —
455 // the dispatcher's no-handler fallback would
456 // synthesise `ComputeNotSupported`, aborting
457 // the migration nondeterministically during
458 // startup.
459 //
460 // Double-install is safe: `set_migration_handler`
461 // is an `ArcSwap` store, so if two concurrent
462 // `start()`s both reach this point the later
463 // store just wins and the CAS picks one caller
464 // to return first. Both built handlers are
465 // functionally equivalent (same registry,
466 // orchestrator, hooks).
467 let handler = Arc::new(self.build_migration_handler());
468 self.inner.mesh.inner().set_migration_handler(handler);
469
470 // Test-only stall between install and CAS so
471 // integration tests can race `shutdown` in. In
472 // production the atomic load is 0 and this is
473 // a no-op.
474 let stall_ms = self.inner.start_stall_ms.load(Ordering::Acquire);
475 if stall_ms > 0 {
476 tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
477 }
478
479 let swap = self.inner.state.compare_exchange(
480 State::Registering as u8,
481 State::Ready as u8,
482 Ordering::AcqRel,
483 Ordering::Acquire,
484 );
485 if swap.is_ok() {
486 return Ok(());
487 }
488 // Lost the CAS. State is now either `Ready`
489 // (another `start` caller beat us) or
490 // `ShuttingDown` (a concurrent `shutdown`
491 // raced past our install). Handle the second
492 // case explicitly — otherwise a torn-down
493 // runtime would leave a live handler on the
494 // mesh, accepting migration traffic and firing
495 // callbacks against stale registry state.
496 //
497 // Under the `Ready` case we leave the handler
498 // installed: the winning `start` caller also
499 // installed an equivalent handler (same
500 // registry, orchestrator, hooks), so whichever
501 // `ArcSwap` store lands last is indistinguishable
502 // from the winner's.
503 if self.state() == State::ShuttingDown {
504 self.inner.mesh.inner().clear_migration_handler();
505 return Err(DaemonError::ShuttingDown);
506 }
507 // CAS lost to another `start` that won the
508 // race — loop to re-classify and return Ok on
509 // the `Ready` arm.
510 }
511 State::Ready => return Ok(()),
512 State::ShuttingDown => return Err(DaemonError::ShuttingDown),
513 }
514 }
515 }
516
517 /// Construct the migration subprotocol handler with every
518 /// hook wired — identity context, channel-rebind replay,
519 /// unsubscribe teardown, readiness predicate, failure
520 /// observer. Extracted so [`Self::start`] can install it
521 /// before the atomic state flip (race fix) without burying
522 /// the construction inline.
523 fn build_migration_handler(&self) -> MigrationSubprotocolHandler {
524 let local_node_id = self.inner.mesh.inner().node_id();
525 // Ask the core crate to build the context. The Noise static
526 // private key is captured inside closures the core owns and
527 // never crosses this boundary as raw bytes — see
528 // `MeshNode::migration_identity_context`.
529 let ctx = self.inner.mesh.inner().migration_identity_context();
530 // Capture the active tokio Handle so the substrate callbacks
531 // can spawn even when fired from a thread that doesn't have a
532 // current runtime (e.g. an FFI trampoline driving the
533 // migration handler synchronously from a Go test). The
534 // substrate explicitly calls these hooks synchronously and
535 // expects them to `tokio::spawn` the actual work — without an
536 // explicit Handle, `tokio::spawn` panics with "no reactor
537 // running" on non-runtime threads.
538 //
539 // Safe to call `Handle::current()` here: `build_migration_handler`
540 // is only invoked from `pub async fn start`, which always
541 // runs inside a tokio runtime.
542 let rt = tokio::runtime::Handle::current();
543 let inner_for_rebind = self.inner.clone();
544 let rt_for_rebind = rt.clone();
545 let post_restore: PostRestoreCallback = Arc::new(move |origin_hash: u64| {
546 let inner = inner_for_rebind.clone();
547 rt_for_rebind.spawn(async move {
548 replay_subscriptions(inner, origin_hash).await;
549 });
550 });
551 let inner_for_teardown = self.inner.clone();
552 let rt_for_teardown = rt.clone();
553 let pre_cleanup: PreCleanupCallback = Arc::new(move |origin_hash: u64| {
554 // Snapshot the ledger BEFORE cleanup drops the host —
555 // after that, the ledger is gone. Spawn async
556 // unsubscribes so the dispatcher thread returns
557 // immediately.
558 let bindings = inner_for_teardown
559 .registry
560 .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
561 .unwrap_or_default();
562 if bindings.is_empty() {
563 return;
564 }
565 let inner = inner_for_teardown.clone();
566 rt_for_teardown.spawn(async move {
567 teardown_subscriptions(inner, bindings).await;
568 });
569 });
570 let inner_for_readiness = self.inner.clone();
571 let readiness: ReadinessCallback = Arc::new(move || {
572 // Test-only: `simulate_not_ready` flips the predicate
573 // to false regardless of the underlying lifecycle
574 // state. Honour it first so integration tests can
575 // drive the NotReady retry path.
576 if inner_for_readiness
577 .simulate_not_ready
578 .load(Ordering::Acquire)
579 {
580 return false;
581 }
582 inner_for_readiness.state.load(Ordering::Acquire) == State::Ready as u8
583 });
584 let inner_for_failure = self.inner.clone();
585 let failure: FailureCallback =
586 Arc::new(move |origin_hash: u64, reason: MigrationFailureReason| {
587 inner_for_failure
588 .recent_failures
589 .lock()
590 .insert(origin_hash, reason);
591 });
592 MigrationSubprotocolHandler::with_hooks(
593 self.inner.orchestrator.clone(),
594 self.inner.source_handler.clone(),
595 self.inner.target_handler.clone(),
596 local_node_id,
597 MigrationHandlerHooks {
598 identity: Some(ctx),
599 post_restore: Some(post_restore),
600 pre_cleanup: Some(pre_cleanup),
601 readiness: Some(readiness),
602 failure: Some(failure),
603 },
604 )
605 }
606
607 /// Tear down the runtime. Unregisters every local daemon host,
608 /// clears the factory registry, and transitions state to
609 /// `ShuttingDown`. Subsequent calls on this runtime fail with
610 /// [`DaemonError::ShuttingDown`]. A second `shutdown` is a no-op.
611 pub async fn shutdown(&self) -> Result<(), DaemonError> {
612 // Mark ShuttingDown first so new spawns / registrations
613 // immediately short-circuit. The store isn't a CAS — a
614 // `ShuttingDown → ShuttingDown` re-store is cheap and
615 // benign.
616 self.inner
617 .state
618 .store(State::ShuttingDown as u8, Ordering::Release);
619
620 // Drain the registry. `list()` snapshots origin_hashes under
621 // its internal read guard; iterating is safe because we own
622 // the unregister path.
623 let origins: Vec<u64> = self
624 .inner
625 .registry
626 .list()
627 .into_iter()
628 .map(|(origin, _)| origin)
629 .collect();
630 for origin in origins {
631 let _ = self.inner.registry.unregister(origin);
632 self.inner.factory_registry.remove(origin);
633 }
634 // Drop any leftover migration-failure entries so they
635 // don't count against the process's memory footprint after
636 // shutdown. The runtime is permanently non-functional at
637 // this point, so no one will consume them.
638 self.inner.recent_failures.lock().clear();
639 // Uninstall the migration subprotocol handler. The
640 // handler carries `Arc` clones into our `Inner` — leaving
641 // it installed keeps the runtime's internals alive via
642 // the mesh even after we've drained every registry, and
643 // would accept inbound migration traffic that now
644 // unconditionally fails (empty registry). The happy-path
645 // teardown should leave the mesh in the same shape it
646 // had before `start()`.
647 self.inner.mesh.inner().clear_migration_handler();
648 Ok(())
649 }
650
651 /// **Test-only.** Force the readiness predicate seen by the
652 /// migration dispatcher to return `false` regardless of
653 /// lifecycle state — simulates a target that's still in
654 /// `Registering` even after `start()` has run. Lets
655 /// integration tests exercise the `NotReady` retry path
656 /// without racing against runtime startup.
657 ///
658 /// No effect on `is_ready()` or `spawn` / `stop` — those use
659 /// the underlying `state` directly. Only the dispatcher's
660 /// readiness predicate is affected.
661 pub fn simulate_not_ready(&self, flag: bool) {
662 self.inner.simulate_not_ready.store(flag, Ordering::Release);
663 }
664
665 /// **Test-only.** Inject a sleep inside `spawn` between the
666 /// initial `require_ready` check and the registry inserts,
667 /// giving integration tests a deterministic window to race
668 /// `shutdown` against. Duration is stored as millis; `0`
669 /// disables.
670 #[doc(hidden)]
671 pub fn set_spawn_stall_ms(&self, millis: u32) {
672 self.inner.spawn_stall_ms.store(millis, Ordering::Release);
673 }
674
675 /// **Test-only.** Inject a sleep inside `start` between the
676 /// handler install and the Registering→Ready CAS. Used to
677 /// deterministically race `shutdown` against a mid-flight
678 /// `start` so tests can verify the handler-cleanup path.
679 #[doc(hidden)]
680 pub fn set_start_stall_ms(&self, millis: u32) {
681 self.inner.start_stall_ms.store(millis, Ordering::Release);
682 }
683
684 /// Readiness accessor for tests + operators. `true` iff the
685 /// runtime has transitioned to `Ready` and has not yet begun
686 /// shutting down.
687 pub fn is_ready(&self) -> bool {
688 self.state() == State::Ready
689 }
690
691 /// Spawn a daemon of `kind` under the caller-provided
692 /// [`Identity`]. The identity's keypair seeds the daemon's
693 /// `origin_hash` + `entity_id`; the runtime registers both the
694 /// live host and a kind-keyed factory in the core registry so a
695 /// future migration target can reconstruct the daemon through
696 /// the existing `DaemonFactoryRegistry::construct` path.
697 ///
698 /// The returned [`DaemonHandle`] is clone-safe; dropping it does
699 /// not stop the daemon. Call [`Self::stop`] explicitly.
700 pub async fn spawn(
701 &self,
702 kind: &str,
703 identity: Identity,
704 config: DaemonHostConfig,
705 ) -> Result<DaemonHandle, DaemonError> {
706 self.require_ready()?;
707 // Test-only stall between the readiness check and the
708 // registry inserts. In production `spawn_stall_ms` is
709 // always 0 and this branch is a no-op.
710 let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
711 if stall_ms > 0 {
712 tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
713 }
714 let factory = self.factory_for_kind(kind)?;
715 let daemon = (factory)();
716 let keypair = identity.keypair().as_ref().clone();
717 let origin_hash = keypair.origin_hash();
718 let entity_id = keypair.entity_id().clone();
719
720 // Mirror the factory into the core registry BEFORE registering
721 // the host, so a migration-target handler that catches up on
722 // this origin mid-spawn always sees a consistent view. Atomic
723 // on collision: if another daemon already claims this
724 // `origin_hash`, bail without mutating either registry —
725 // otherwise the rollback below would strip the factory entry
726 // for the *existing* daemon and silently break its future
727 // migratability.
728 let factory_for_core = factory.clone();
729 self.inner
730 .factory_registry
731 .register(keypair.clone(), config.clone(), move || {
732 (factory_for_core)()
733 })
734 .map_err(DaemonError::Core)?;
735
736 let host = DaemonHost::new(daemon, keypair, config);
737 // `DaemonRegistry::register` errors on origin_hash collisions
738 // — two daemons can't share the same identity.
739 //
740 // Rolling back our `factory_registry` insert is safe
741 // because `factory_registry::register` is atomic on
742 // collision: since our call above *succeeded*, the slot
743 // was empty and we exclusively own it. No other code path
744 // replaces an occupied slot (`register` /
745 // `register_placeholder` both error on collision;
746 // `remove` / `take` only remove their caller's own
747 // entries), so the entry we're about to remove is still
748 // ours. The rollback cannot affect a pre-existing
749 // placeholder or another daemon's factory entry.
750 if let Err(e) = self.inner.registry.register(host) {
751 self.inner.factory_registry.remove(origin_hash);
752 return Err(DaemonError::Core(e));
753 }
754
755 // Post-insert fence: `shutdown` may have raced past our
756 // initial `require_ready()` and already swept the
757 // registries. In that case our entries are either already
758 // gone (shutdown saw them in its sweep) or will outlive
759 // shutdown's sweep (we inserted after `list()`). Roll back
760 // unconditionally under the ShuttingDown branch so no
761 // zombie daemon survives the torn-down runtime; the
762 // rollback's `unregister` / `remove` calls are idempotent
763 // no-ops if shutdown already drained our slot.
764 if self.state() == State::ShuttingDown {
765 let _ = self.inner.registry.unregister(origin_hash);
766 self.inner.factory_registry.remove(origin_hash);
767 return Err(DaemonError::ShuttingDown);
768 }
769
770 Ok(DaemonHandle {
771 origin_hash,
772 entity_id,
773 inner: self.inner.clone(),
774 })
775 }
776
777 /// Spawn a daemon with a caller-supplied `MeshDaemon` instance,
778 /// bypassing the SDK-side kind-factory lookup.
779 ///
780 /// Used by language-binding layers (currently: the NAPI `compute`
781 /// module) that build daemon instances via cross-FFI dispatch —
782 /// the factory closure for such a daemon can't be a plain
783 /// `Fn() -> Box<dyn MeshDaemon>` because constructing the
784 /// daemon requires an awaitable call into the host language.
785 /// The binding does the await itself, hands in the resulting
786 /// `Box<dyn MeshDaemon>`, and this method does the rest of
787 /// what [`Self::spawn`] does: register the `(origin_hash →
788 /// kind-factory)` mirror in the core registry so future
789 /// migrations can reconstruct the daemon, insert the host,
790 /// and run the same shutdown-race fence.
791 ///
792 /// `kind_factory` is the closure the core registry stores for
793 /// migration-target reconstruction; it must be re-callable
794 /// (migration targets call it when they restore the daemon
795 /// on another node). Bindings typically build this by cloning
796 /// the same TSFN used for the initial spawn.
797 pub async fn spawn_with_daemon<F>(
798 &self,
799 identity: Identity,
800 config: DaemonHostConfig,
801 daemon: Box<dyn MeshDaemon>,
802 kind_factory: F,
803 ) -> Result<DaemonHandle, DaemonError>
804 where
805 F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
806 {
807 self.require_ready()?;
808 // Same test-only stall as `spawn` — tests race shutdown
809 // against this path too.
810 let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
811 if stall_ms > 0 {
812 tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
813 }
814 let keypair = identity.keypair().as_ref().clone();
815 let origin_hash = keypair.origin_hash();
816 let entity_id = keypair.entity_id().clone();
817
818 // Mirror the caller-supplied kind_factory into the core
819 // registry. Same atomic-register semantics as `spawn` —
820 // if another daemon already claims this origin_hash, bail
821 // without touching state.
822 self.inner
823 .factory_registry
824 .register(keypair.clone(), config.clone(), kind_factory)
825 .map_err(DaemonError::Core)?;
826
827 let host = DaemonHost::new(daemon, keypair, config);
828 if let Err(e) = self.inner.registry.register(host) {
829 self.inner.factory_registry.remove(origin_hash);
830 return Err(DaemonError::Core(e));
831 }
832
833 // Post-insert shutdown fence, matching `spawn`. Without
834 // this, a concurrent `shutdown()` that raced past our
835 // `require_ready` check and already swept the registries
836 // would leave a zombie daemon live in the torn-down
837 // runtime.
838 if self.state() == State::ShuttingDown {
839 let _ = self.inner.registry.unregister(origin_hash);
840 self.inner.factory_registry.remove(origin_hash);
841 return Err(DaemonError::ShuttingDown);
842 }
843
844 Ok(DaemonHandle {
845 origin_hash,
846 entity_id,
847 inner: self.inner.clone(),
848 })
849 }
850
851 /// Spawn a daemon from a caller-supplied instance and restore its
852 /// state from `snapshot`, bypassing the SDK-side kind-factory
853 /// lookup. Parallels [`Self::spawn_with_daemon`] for restore.
854 ///
855 /// Used by language-binding layers whose daemons are built via
856 /// cross-FFI dispatch — construction goes through the host
857 /// language, then the binding hands the built `Box<dyn MeshDaemon>`
858 /// (already wired to its TSFN bridge) plus the `kind_factory`
859 /// closure used by the core registry for migration-target
860 /// reconstruction.
861 pub async fn spawn_from_snapshot_with_daemon<F>(
862 &self,
863 identity: Identity,
864 snapshot: StateSnapshot,
865 config: DaemonHostConfig,
866 daemon: Box<dyn MeshDaemon>,
867 kind_factory: F,
868 ) -> Result<DaemonHandle, DaemonError>
869 where
870 F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
871 {
872 self.require_ready()?;
873 let keypair = identity.keypair().as_ref().clone();
874 let origin_hash = keypair.origin_hash();
875 let entity_id = keypair.entity_id().clone();
876
877 // Full `entity_id` comparison — see `spawn_from_snapshot` for
878 // the birthday-collision rationale.
879 if snapshot.entity_id != entity_id {
880 return Err(DaemonError::SnapshotIdentityMismatch {
881 snapshot: snapshot.entity_id.origin_hash(),
882 identity: origin_hash,
883 });
884 }
885
886 self.inner
887 .factory_registry
888 .register(keypair.clone(), config.clone(), kind_factory)
889 .map_err(DaemonError::Core)?;
890
891 let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
892 Ok(h) => h,
893 Err(e) => {
894 self.inner.factory_registry.remove(origin_hash);
895 return Err(DaemonError::Core(e));
896 }
897 };
898
899 if let Err(e) = self.inner.registry.register(host) {
900 self.inner.factory_registry.remove(origin_hash);
901 return Err(DaemonError::Core(e));
902 }
903
904 if self.state() == State::ShuttingDown {
905 let _ = self.inner.registry.unregister(origin_hash);
906 self.inner.factory_registry.remove(origin_hash);
907 return Err(DaemonError::ShuttingDown);
908 }
909
910 Ok(DaemonHandle {
911 origin_hash,
912 entity_id,
913 inner: self.inner.clone(),
914 })
915 }
916
917 /// Spawn a daemon of `kind` and restore its state from `snapshot`.
918 /// The snapshot's `entity_id` must match the caller's
919 /// [`Identity`]; mismatch returns
920 /// [`DaemonError::SnapshotIdentityMismatch`] before any side
921 /// effects land.
922 pub async fn spawn_from_snapshot(
923 &self,
924 kind: &str,
925 identity: Identity,
926 snapshot: StateSnapshot,
927 config: DaemonHostConfig,
928 ) -> Result<DaemonHandle, DaemonError> {
929 self.require_ready()?;
930 let factory = self.factory_for_kind(kind)?;
931 let keypair = identity.keypair().as_ref().clone();
932 let origin_hash = keypair.origin_hash();
933 let entity_id = keypair.entity_id().clone();
934
935 // Compare the **full** 32-byte `entity_id`, not just the
936 // 64-bit `origin_hash` projection. `origin_hash` is a
937 // birthday-bounded 64-bit hash of the ed25519 public key;
938 // two legitimately-different identities can collide on
939 // `origin_hash` with probability ~2^-32 after ~4B daemons.
940 // A collision would let the *wrong* identity restore the
941 // snapshot, producing a daemon signed under one pubkey
942 // but claiming to be another — downstream signatures then
943 // verify against the wrong key and the daemon silently
944 // produces outputs no peer accepts.
945 if snapshot.entity_id != entity_id {
946 return Err(DaemonError::SnapshotIdentityMismatch {
947 snapshot: snapshot.entity_id.origin_hash(),
948 identity: origin_hash,
949 });
950 }
951
952 let daemon = (factory)();
953 // Atomic register: collision here means some other daemon
954 // (live or a previous spawn_from_snapshot in-flight) already
955 // owns this `origin_hash`. Bail without touching the other
956 // daemon's state — the later rollback would otherwise remove
957 // the victim's factory entry and silently break its future
958 // migratability.
959 let factory_for_core = factory.clone();
960 self.inner
961 .factory_registry
962 .register(keypair.clone(), config.clone(), move || {
963 (factory_for_core)()
964 })
965 .map_err(DaemonError::Core)?;
966
967 let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
968 Ok(h) => h,
969 Err(e) => {
970 // Same rollback-safety argument as in `spawn`:
971 // atomic `factory_registry::register` above means
972 // we exclusively own the slot, and nothing else
973 // can replace an occupied slot. Removing here
974 // cleans up strictly our insert — an adjacent
975 // pre-existing placeholder (e.g. from
976 // `expect_migration`) would have made our
977 // register fail atomically upstream, and we'd
978 // never have reached this branch.
979 self.inner.factory_registry.remove(origin_hash);
980 return Err(DaemonError::Core(e));
981 }
982 };
983
984 if let Err(e) = self.inner.registry.register(host) {
985 // Same ownership argument — the atomic register
986 // above means we own this slot exclusively.
987 self.inner.factory_registry.remove(origin_hash);
988 return Err(DaemonError::Core(e));
989 }
990
991 // Post-insert fence against a concurrent `shutdown` — see
992 // the matching comment in `spawn`.
993 if self.state() == State::ShuttingDown {
994 let _ = self.inner.registry.unregister(origin_hash);
995 self.inner.factory_registry.remove(origin_hash);
996 return Err(DaemonError::ShuttingDown);
997 }
998
999 Ok(DaemonHandle {
1000 origin_hash,
1001 entity_id,
1002 inner: self.inner.clone(),
1003 })
1004 }
1005
1006 /// Stop a daemon, removing it from the runtime's registry. Valid
1007 /// while `Ready` and (idempotently) during `ShuttingDown` —
1008 /// `ShuttingDown` paths through here are a no-op because the
1009 /// shutdown sweep has already drained the registry.
1010 pub async fn stop(&self, origin_hash: u64) -> Result<(), DaemonError> {
1011 if self.state() == State::Registering {
1012 return Err(DaemonError::NotReady);
1013 }
1014 // Treat a missing daemon as success during ShuttingDown —
1015 // the shutdown sweep drained it.
1016 match self.inner.registry.unregister(origin_hash) {
1017 Ok(_) => {
1018 self.inner.factory_registry.remove(origin_hash);
1019 Ok(())
1020 }
1021 Err(CoreDaemonError::NotFound(_)) if self.state() == State::ShuttingDown => Ok(()),
1022 Err(e) => Err(DaemonError::Core(e)),
1023 }
1024 }
1025
1026 /// Take a snapshot of a running daemon by `origin_hash`. Returns
1027 /// `Ok(None)` when the daemon is stateless.
1028 pub async fn snapshot(&self, origin_hash: u64) -> Result<Option<StateSnapshot>, DaemonError> {
1029 self.require_ready()?;
1030 self.inner
1031 .registry
1032 .snapshot(origin_hash)
1033 .map_err(DaemonError::Core)
1034 }
1035
1036 /// Deliver one causal event to the daemon identified by
1037 /// `origin_hash`, returning the daemon's outputs wrapped in the
1038 /// host's causal chain.
1039 ///
1040 /// Stage 1 convenience — Stage 2 adds mesh-dispatched delivery
1041 /// via the causal subprotocol, and this direct path becomes
1042 /// testing sugar rather than the primary ingress.
1043 pub fn deliver(
1044 &self,
1045 origin_hash: u64,
1046 event: &CausalEvent,
1047 ) -> Result<Vec<CausalEvent>, DaemonError> {
1048 self.require_ready()?;
1049 let outputs = self
1050 .inner
1051 .registry
1052 .deliver(origin_hash, event)
1053 .map_err(DaemonError::Core)?;
1054
1055 // Fire post-delivery observers (e.g. `StandbyGroup`'s replay
1056 // buffer). Observers run AFTER a successful `process` so
1057 // replay on promotion doesn't re-apply events the active
1058 // rejected. Snapshot the observer list into a local `Vec`
1059 // so each callback runs without the map's mutex held —
1060 // prevents a misbehaving observer from blocking unrelated
1061 // deliveries and rules out re-entrant-lock deadlocks if an
1062 // observer ever calls back into `deliver`.
1063 let to_fire: Vec<DeliverObserver> = {
1064 let map = self.inner.observers.lock();
1065 map.get(&origin_hash)
1066 .map(|v| v.iter().map(|(_, cb)| cb.clone()).collect())
1067 .unwrap_or_default()
1068 };
1069 for cb in to_fire {
1070 cb(event);
1071 }
1072 Ok(outputs)
1073 }
1074
1075 /// Number of daemons currently registered.
1076 pub fn daemon_count(&self) -> usize {
1077 self.inner.registry.count()
1078 }
1079
1080 /// Current orchestrator-side migration phase for `origin_hash`,
1081 /// or `None` when no migration record exists (either never
1082 /// started here or already reached its terminal state and was
1083 /// removed). Useful for tests that assert the migration reached
1084 /// true completion (record gone via `ActivateAck`) rather than
1085 /// simply advancing to the `Complete` phase.
1086 pub fn migration_phase(&self, origin_hash: u64) -> Option<MigrationPhase> {
1087 self.inner.orchestrator.status(origin_hash)
1088 }
1089
1090 /// **Test-only.** Peek at the cached failure reason for
1091 /// `origin_hash` without consuming it.
1092 ///
1093 /// Normal `MigrationHandle::wait` code path pops the entry when
1094 /// it hits status=None; this accessor exists so regression tests
1095 /// can observe the cache's lifecycle directly (e.g. assert the
1096 /// cache is cleared by `start_migration_with`).
1097 ///
1098 /// Exposed publicly because SDK integration tests live in a
1099 /// separate crate; not part of the stable surface.
1100 #[doc(hidden)]
1101 pub fn peek_migration_failure(&self, origin_hash: u64) -> Option<MigrationFailureReason> {
1102 self.inner.recent_failures.lock().get(&origin_hash).cloned()
1103 }
1104
1105 /// **Test-only.** Inject a failure reason into the cache for
1106 /// `origin_hash`. Lets tests stage a "stale entry from a prior
1107 /// attempt" scenario deterministically, without having to run
1108 /// a whole losing migration to populate it.
1109 ///
1110 /// Same caveat as [`Self::peek_migration_failure`] — visible
1111 /// because SDK integration tests live out-of-crate; not part of
1112 /// the stable surface.
1113 #[doc(hidden)]
1114 pub fn inject_migration_failure(&self, origin_hash: u64, reason: MigrationFailureReason) {
1115 self.inner
1116 .recent_failures
1117 .lock()
1118 .insert(origin_hash, reason);
1119 }
1120
1121 /// Snapshot the daemon's subscription ledger — a cloned view of
1122 /// every `(publisher, channel)` pair the daemon has subscribed
1123 /// to via [`Self::subscribe_channel`]. Used by the migration
1124 /// target path to drive replay and by tests / operators to
1125 /// observe what a daemon is subscribed to.
1126 pub fn subscriptions(&self, origin_hash: u64) -> Result<Vec<SubscriptionBinding>, DaemonError> {
1127 self.inner
1128 .registry
1129 .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1130 .map_err(DaemonError::Core)
1131 }
1132
1133 /// Subscribe a specific daemon to a channel on a remote
1134 /// publisher. Routes through the mesh's membership subprotocol
1135 /// and **records the subscription in the daemon's ledger** so
1136 /// a migration target can replay it after cutover. Users should
1137 /// use this method (rather than reaching through
1138 /// `rt.mesh().inner().subscribe_channel_*`) for daemon-owned
1139 /// subscriptions; otherwise the subscription travels with the
1140 /// node, not the daemon, and silently drops on migration.
1141 ///
1142 /// Flow:
1143 /// 1. Hit the publisher's membership endpoint via
1144 /// `Mesh::subscribe_channel_with_token` (or the
1145 /// no-token variant).
1146 /// 2. On success, record
1147 /// `(publisher, channel) → SubscriptionBinding` in the
1148 /// host's ledger.
1149 /// 3. On wire failure, no ledger mutation.
1150 ///
1151 /// `token` is the caller-owned [`PermissionToken`] for
1152 /// token-gated channels; `None` for open channels.
1153 pub async fn subscribe_channel(
1154 &self,
1155 origin_hash: u64,
1156 publisher: u64,
1157 channel: ChannelName,
1158 token: Option<PermissionToken>,
1159 ) -> Result<(), DaemonError> {
1160 self.require_ready()?;
1161 if !self.inner.registry.contains(origin_hash) {
1162 return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1163 }
1164 // Capture serialized token bytes for the ledger BEFORE
1165 // handing ownership to the mesh. The mesh call consumes the
1166 // token by value on the token-path.
1167 let token_bytes = token.as_ref().map(|t| t.to_bytes().to_vec());
1168 let result = match token {
1169 Some(tok) => {
1170 self.inner
1171 .mesh
1172 .inner()
1173 .subscribe_channel_with_token(publisher, channel.clone(), tok)
1174 .await
1175 }
1176 None => {
1177 self.inner
1178 .mesh
1179 .inner()
1180 .subscribe_channel(publisher, channel.clone())
1181 .await
1182 }
1183 };
1184 result.map_err(|e| {
1185 DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1186 "subscribe_channel failed: {e}"
1187 )))
1188 })?;
1189
1190 // Mesh accepted the subscribe — record in the ledger.
1191 // `with_host` reaches into the registry's per-daemon mutex;
1192 // since this is SDK-level code we do the minimum work
1193 // under the lock (a single DashMap insert).
1194 if let Err(e) = self.inner.registry.with_host(origin_hash, |host| {
1195 host.record_subscription(publisher, channel, token_bytes);
1196 }) {
1197 return Err(DaemonError::Core(e));
1198 }
1199 Ok(())
1200 }
1201
1202 /// Unsubscribe a specific daemon from a channel. Symmetric to
1203 /// [`Self::subscribe_channel`]: mesh wire call first, then
1204 /// ledger update.
1205 pub async fn unsubscribe_channel(
1206 &self,
1207 origin_hash: u64,
1208 publisher: u64,
1209 channel: ChannelName,
1210 ) -> Result<(), DaemonError> {
1211 self.require_ready()?;
1212 if !self.inner.registry.contains(origin_hash) {
1213 return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1214 }
1215 self.inner
1216 .mesh
1217 .inner()
1218 .unsubscribe_channel(publisher, channel.clone())
1219 .await
1220 .map_err(|e| {
1221 DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1222 "unsubscribe_channel failed: {e}"
1223 )))
1224 })?;
1225 let _ = self.inner.registry.with_host(origin_hash, |host| {
1226 host.forget_subscription(publisher, &channel);
1227 });
1228 Ok(())
1229 }
1230
1231 /// Pre-register a factory on the target node keyed by the
1232 /// daemon's `origin_hash`, using the caller-supplied `Identity`
1233 /// as the **fallback** keypair.
1234 ///
1235 /// Use this when:
1236 /// - The caller genuinely has the daemon's keypair on hand
1237 /// (typical: test harnesses that share the same
1238 /// `Identity` between source and target runtimes).
1239 /// - Migration runs with `transport_identity = false`, so the
1240 /// snapshot carries no envelope and the target needs a
1241 /// matching keypair pre-provisioned.
1242 ///
1243 /// For the common envelope-transport case where the target
1244 /// doesn't know the daemon's private key ahead of time,
1245 /// prefer [`Self::expect_migration`] — it registers a
1246 /// placeholder factory keyed only on `origin_hash`, and the
1247 /// envelope in the migration snapshot supplies the real
1248 /// keypair at restore time.
1249 pub fn register_migration_target_identity(
1250 &self,
1251 kind: &str,
1252 identity: Identity,
1253 config: DaemonHostConfig,
1254 ) -> Result<(), DaemonError> {
1255 if self.state() == State::ShuttingDown {
1256 return Err(DaemonError::ShuttingDown);
1257 }
1258 let factory = self.factory_for_kind(kind)?;
1259 let keypair = identity.keypair().as_ref().clone();
1260 let origin_hash = keypair.origin_hash();
1261 let factory_clone = factory.clone();
1262 self.inner
1263 .factory_registry
1264 .register(keypair, config, move || (factory_clone)())
1265 .map_err(DaemonError::Core)?;
1266 // Post-insert fence — a concurrent `shutdown` may have
1267 // raced past the initial state check. Roll back so no
1268 // factory entry outlives the torn-down runtime.
1269 if self.state() == State::ShuttingDown {
1270 self.inner.factory_registry.remove(origin_hash);
1271 return Err(DaemonError::ShuttingDown);
1272 }
1273 Ok(())
1274 }
1275
1276 /// Declare on the target that this node expects a migration
1277 /// for `origin_hash` of the given `kind`. Registers a
1278 /// **placeholder** factory in the core registry — no matching
1279 /// keypair required, because the migration snapshot's
1280 /// [`IdentityEnvelope`](::net::adapter::net::identity::IdentityEnvelope)
1281 /// carries the real keypair and the dispatcher overrides the
1282 /// placeholder at restore time.
1283 ///
1284 /// Fails cleanly if the source migrates without an envelope
1285 /// (e.g., `MigrationOpts { transport_identity: false }`) —
1286 /// the target's factory has no keypair and the dispatcher
1287 /// emits `IdentityTransportFailed`. Use
1288 /// [`Self::register_migration_target_identity`] with a shared
1289 /// identity for the explicit public-identity-migration case.
1290 ///
1291 /// Landing this method closes the seam documented in the
1292 /// `envelope_overrides_target_placeholder_keypair` test of
1293 /// Stage 5b of the identity-migration plan — targets can now
1294 /// pre-register for a migration by `origin_hash` alone.
1295 pub fn expect_migration(
1296 &self,
1297 kind: &str,
1298 origin_hash: u64,
1299 config: DaemonHostConfig,
1300 ) -> Result<(), DaemonError> {
1301 if self.state() == State::ShuttingDown {
1302 return Err(DaemonError::ShuttingDown);
1303 }
1304 let factory = self.factory_for_kind(kind)?;
1305 let factory_clone = factory.clone();
1306 self.inner
1307 .factory_registry
1308 .register_placeholder(origin_hash, config, move || (factory_clone)())
1309 .map_err(DaemonError::Core)?;
1310 // Post-insert fence against a concurrent `shutdown` — see
1311 // the matching comment in `register_migration_target_identity`.
1312 if self.state() == State::ShuttingDown {
1313 self.inner.factory_registry.remove(origin_hash);
1314 return Err(DaemonError::ShuttingDown);
1315 }
1316 Ok(())
1317 }
1318
1319 /// Start migrating a daemon from `source_node` to `target_node`.
1320 /// The orchestrator runs on this node regardless of who owns
1321 /// the daemon — call this on whichever node wants to drive the
1322 /// migration state machine.
1323 ///
1324 /// Returns a [`MigrationHandle`] whose [`MigrationHandle::wait`]
1325 /// resolves when the migration reaches a terminal state
1326 /// (`Complete` on success, `MigrationError` on abort / failure).
1327 ///
1328 /// For the common local-source case (`source_node ==
1329 /// mesh.node_id()`), the snapshot is taken synchronously inside
1330 /// this call and `SnapshotReady` is shipped to the target. For
1331 /// a remote source, the orchestrator sends `TakeSnapshot` to
1332 /// the source and drives the rest of the state machine from
1333 /// inbound wire messages.
1334 pub async fn start_migration(
1335 &self,
1336 origin_hash: u64,
1337 source_node: u64,
1338 target_node: u64,
1339 ) -> Result<MigrationHandle, DaemonError> {
1340 self.start_migration_with(
1341 origin_hash,
1342 source_node,
1343 target_node,
1344 MigrationOpts::default(),
1345 )
1346 .await
1347 }
1348
1349 /// `start_migration` with caller-supplied options. Stage 6 of
1350 /// [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1351 /// lets the caller opt out of identity transport when the daemon
1352 /// doesn't need to sign anything on the target.
1353 pub async fn start_migration_with(
1354 &self,
1355 origin_hash: u64,
1356 source_node: u64,
1357 target_node: u64,
1358 opts: MigrationOpts,
1359 ) -> Result<MigrationHandle, DaemonError> {
1360 self.require_ready()?;
1361 // Clear any stale failure reason from a prior migration
1362 // attempt for this same `origin_hash`. Without this, if the
1363 // previous attempt's `MigrationHandle` was dropped before
1364 // `wait()` ran (or never `wait()`ed at all), the dispatcher's
1365 // failure callback left an entry in `recent_failures` that
1366 // would leak into THIS attempt's `wait()` — a successful
1367 // new migration would incorrectly surface the old reason
1368 // when `wait_one_attempt` hits its None-status branch and
1369 // pops `recent_failures`.
1370 self.inner.recent_failures.lock().remove(&origin_hash);
1371 let msgs = self
1372 .inner
1373 .orchestrator
1374 .start_migration(origin_hash, source_node, target_node)
1375 .map_err(DaemonError::Migration)?;
1376
1377 // Local-source path: `start_migration` builds chunked
1378 // `SnapshotReady` messages synchronously from the local
1379 // registry. If the caller asked for identity transport, seal
1380 // the envelope HERE — the dispatcher's source-side seal only
1381 // fires on the TakeSnapshot path (remote source). Sealing
1382 // operates on the whole snapshot, so reassemble the chunks,
1383 // seal, and rechunk.
1384 let msgs = if opts.transport_identity {
1385 self.maybe_seal_chunked_snapshot(origin_hash, target_node, msgs)
1386 .await?
1387 } else {
1388 msgs
1389 };
1390
1391 // Determine dest_node from the first message variant and
1392 // send all messages in order. `start_migration` returns
1393 // `TakeSnapshot` (single message) when source is remote, or
1394 // a non-empty run of `SnapshotReady` chunks when source is
1395 // local.
1396 let dest_node = match msgs.first() {
1397 Some(MigrationMessage::TakeSnapshot { .. }) => source_node,
1398 Some(MigrationMessage::SnapshotReady { .. }) => target_node,
1399 Some(other) => {
1400 let _ = self
1401 .inner
1402 .orchestrator
1403 .abort_migration(origin_hash, "unexpected initial message".into());
1404 return Err(DaemonError::Migration(MigrationError::StateFailed(
1405 format!(
1406 "orchestrator returned unexpected initial migration message: {:?}",
1407 other
1408 ),
1409 )));
1410 }
1411 None => {
1412 let _ = self
1413 .inner
1414 .orchestrator
1415 .abort_migration(origin_hash, "orchestrator returned no messages".into());
1416 return Err(DaemonError::Migration(MigrationError::StateFailed(
1417 "orchestrator returned no migration messages".into(),
1418 )));
1419 }
1420 };
1421
1422 for msg in &msgs {
1423 if let Err(e) = self.send_migration_message(dest_node, msg).await {
1424 let _ = self
1425 .inner
1426 .orchestrator
1427 .abort_migration(origin_hash, format!("initial send failed: {e}"));
1428 return Err(e);
1429 }
1430 }
1431
1432 Ok(MigrationHandle {
1433 origin_hash,
1434 source_node,
1435 target_node,
1436 runtime: self.clone(),
1437 opts,
1438 })
1439 }
1440
1441 /// Decode `snapshot_bytes`, seal an identity envelope using
1442 /// the local daemon's keypair + target's X25519 static pubkey,
1443 /// and re-encode.
1444 ///
1445 /// Called only when the caller opted into envelope transport
1446 /// via `MigrationOpts { transport_identity: true }`. The
1447 /// caller committed to the stronger guarantee at that opt-in
1448 /// point; this helper must never silently downgrade.
1449 ///
1450 /// Resolution:
1451 /// - `Ok(None)`: the snapshot already carries an envelope
1452 /// (e.g. pre-sealed upstream). Caller proceeds with the
1453 /// existing bytes — this is not a downgrade, the envelope
1454 /// is already there.
1455 /// - `Ok(Some(new_bytes))`: sealed successfully; caller should
1456 /// replace the snapshot payload.
1457 /// - `Err(_)`: any missing prerequisite (peer X25519 static
1458 /// unknown, daemon keypair absent) or seal-crypto failure.
1459 /// The caller **must** abort — silently falling back to
1460 /// unsealed transport would break the caller's opt-in
1461 /// guarantee, and the target would restore under whatever
1462 /// pre-provisioned keypair the factory registry carries
1463 /// (possibly stale, possibly absent).
1464 ///
1465 /// The NKpsk0-responder case (target was the handshake
1466 /// responder, its peer static is not surfaced by `snow`) is a
1467 /// concrete prerequisite-missing scenario that now fails
1468 /// here. Callers in that topology should use `transport_identity:
1469 /// false` explicitly, which signals "I know identity transport
1470 /// isn't reachable; proceed unsealed."
1471 fn maybe_seal_local_snapshot(
1472 &self,
1473 daemon_origin: u64,
1474 target_node: u64,
1475 snapshot_bytes: &[u8],
1476 ) -> Result<Option<Vec<u8>>, DaemonError> {
1477 let snapshot = StateSnapshot::from_bytes(snapshot_bytes).ok_or_else(|| {
1478 DaemonError::Migration(MigrationError::StateFailed(
1479 "failed to decode local snapshot for envelope sealing".into(),
1480 ))
1481 })?;
1482 if snapshot.identity_envelope.is_some() {
1483 // Upstream already sealed — not a downgrade, the
1484 // envelope is present.
1485 return Ok(None);
1486 }
1487 let Some(target_pub) = self.inner.mesh.inner().peer_static_x25519(target_node) else {
1488 return Err(DaemonError::Migration(MigrationError::StateFailed(
1489 format!(
1490 "identity transport requested but peer X25519 static for \
1491 {target_node:#x} is unknown (e.g. NKpsk0-responder \
1492 side) — cannot seal envelope; use \
1493 `transport_identity: false` to proceed unsealed"
1494 ),
1495 )));
1496 };
1497 let Some(kp) = self.inner.registry.daemon_keypair(daemon_origin) else {
1498 return Err(DaemonError::Migration(MigrationError::StateFailed(
1499 format!(
1500 "identity transport requested but daemon {daemon_origin:#x} has \
1501 no local keypair to seal with"
1502 ),
1503 )));
1504 };
1505 snapshot
1506 .with_identity_envelope(&kp, target_pub)
1507 .map(|sealed| Some(sealed.to_bytes()))
1508 .map_err(|e| {
1509 DaemonError::Migration(MigrationError::StateFailed(format!(
1510 "identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
1511 )))
1512 })
1513 }
1514
1515 /// Reassemble chunked `SnapshotReady` messages into the full
1516 /// snapshot, seal the identity envelope, and rechunk.
1517 ///
1518 /// `start_migration` returns chunked `SnapshotReady` messages on
1519 /// the local-source path; sealing operates on the whole snapshot
1520 /// so chunks must be reassembled first. If `msgs` does not start
1521 /// with `SnapshotReady` (i.e. remote-source `TakeSnapshot` path)
1522 /// the messages are returned unchanged — the dispatcher seals
1523 /// on that path.
1524 ///
1525 /// On any seal failure the orchestrator record is aborted so a
1526 /// retry starts from phase 0.
1527 async fn maybe_seal_chunked_snapshot(
1528 &self,
1529 origin_hash: u64,
1530 target_node: u64,
1531 mut msgs: Vec<MigrationMessage>,
1532 ) -> Result<Vec<MigrationMessage>, DaemonError> {
1533 if !matches!(msgs.first(), Some(MigrationMessage::SnapshotReady { .. })) {
1534 return Ok(msgs);
1535 }
1536
1537 let seq_through = match msgs.first() {
1538 Some(MigrationMessage::SnapshotReady { seq_through, .. }) => *seq_through,
1539 _ => unreachable!("checked by matches! above"),
1540 };
1541
1542 // `chunk_snapshot` emits chunks in `chunk_index` order, but
1543 // sort defensively in case a future caller hands us a
1544 // pre-mixed Vec.
1545 msgs.sort_by_key(|m| match m {
1546 MigrationMessage::SnapshotReady { chunk_index, .. } => *chunk_index,
1547 _ => 0,
1548 });
1549 let mut reassembled: Vec<u8> = Vec::new();
1550 for m in &msgs {
1551 if let MigrationMessage::SnapshotReady { snapshot_bytes, .. } = m {
1552 reassembled.extend_from_slice(snapshot_bytes);
1553 }
1554 }
1555
1556 // `transport_identity: true` is a strict opt-in:
1557 // prerequisites-missing (e.g. NKpsk0-responder) surfaces as
1558 // `Err` and aborts the migration, not a silent downgrade to
1559 // unsealed. `Ok(None)` is reserved for "snapshot already
1560 // carries an envelope" — return the original chunks.
1561 match self.maybe_seal_local_snapshot(origin_hash, target_node, &reassembled) {
1562 Ok(Some(sealed)) => chunk_snapshot(origin_hash, sealed, seq_through).map_err(|e| {
1563 let _ = self
1564 .inner
1565 .orchestrator
1566 .abort_migration(origin_hash, format!("rechunk after seal failed: {e}"));
1567 DaemonError::Migration(e)
1568 }),
1569 Ok(None) => Ok(msgs),
1570 Err(e) => {
1571 let _ = self
1572 .inner
1573 .orchestrator
1574 .abort_migration(origin_hash, format!("envelope seal failed: {e}"));
1575 Err(e)
1576 }
1577 }
1578 }
1579
1580 async fn send_migration_message(
1581 &self,
1582 dest_node: u64,
1583 msg: &MigrationMessage,
1584 ) -> Result<(), DaemonError> {
1585 let addr = self
1586 .inner
1587 .mesh
1588 .inner()
1589 .peer_addr(dest_node)
1590 .ok_or(DaemonError::Migration(MigrationError::TargetUnavailable(
1591 dest_node,
1592 )))?;
1593 let bytes = migration_wire::encode(msg).map_err(DaemonError::Migration)?;
1594 self.inner
1595 .mesh
1596 .inner()
1597 .send_subprotocol(addr, SUBPROTOCOL_MIGRATION, &bytes)
1598 .await
1599 .map_err(|e| {
1600 DaemonError::Migration(MigrationError::StateFailed(format!(
1601 "send_subprotocol failed: {e}"
1602 )))
1603 })
1604 }
1605
1606 /// Underlying mesh. Exposed read-only so the caller can still
1607 /// reach the channel / subscribe / publish surface without
1608 /// reaching around the runtime.
1609 pub fn mesh(&self) -> &Arc<Mesh> {
1610 &self.inner.mesh
1611 }
1612
1613 // ------------ internal helpers ------------
1614
1615 fn state(&self) -> State {
1616 State::from_u8(self.inner.state.load(Ordering::Acquire))
1617 }
1618
1619 fn require_ready(&self) -> Result<(), DaemonError> {
1620 match self.state() {
1621 State::Ready => Ok(()),
1622 State::Registering => Err(DaemonError::NotReady),
1623 State::ShuttingDown => Err(DaemonError::ShuttingDown),
1624 }
1625 }
1626
1627 fn factory_for_kind(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1628 self.inner
1629 .factories
1630 .read()
1631 .get(kind)
1632 .cloned()
1633 .ok_or_else(|| DaemonError::FactoryNotFound(kind.to_string()))
1634 }
1635
1636 // ─── `groups` feature accessors ─────────────────────────────────
1637 //
1638 // These surface the internal `Scheduler` / `DaemonRegistry` /
1639 // factory closures that the `groups` module (ReplicaGroup /
1640 // ForkGroup / StandbyGroup) needs to wire into core group
1641 // constructors. Kept `pub(crate)` so they don't leak into the
1642 // SDK's public API — group types wrap them in their own
1643 // ergonomic surfaces and callers stay on the clean side of the
1644 // boundary.
1645
1646 /// Shared scheduler for capability-based placement.
1647 #[cfg(feature = "groups")]
1648 pub(crate) fn scheduler_arc(&self) -> Arc<Scheduler> {
1649 self.inner.scheduler.clone()
1650 }
1651
1652 /// Shared migration orchestrator. The
1653 /// `OrchestratorMigrationSnapshotSource` adapter wraps this
1654 /// `Arc` to bridge the compute-layer orchestrator's
1655 /// in-flight state into the MeshOS snapshot's
1656 /// `in_flight_migrations` field. Exposed `pub(crate)` so
1657 /// the in-crate `testing` harness builds the source
1658 /// out-of-band; the adapter type itself lives in
1659 /// `net::adapter::net::behavior::meshos::migration_snapshot_source`.
1660 #[cfg(feature = "testing")]
1661 pub(crate) fn migration_orchestrator_arc(&self) -> Arc<MigrationOrchestrator> {
1662 self.inner.orchestrator.clone()
1663 }
1664
1665 /// Shared daemon registry (group members register/unregister
1666 /// here alongside direct-spawn daemons).
1667 #[cfg(feature = "groups")]
1668 pub(crate) fn registry_arc(&self) -> Arc<DaemonRegistry> {
1669 self.inner.registry.clone()
1670 }
1671
1672 /// Look up a factory closure by kind. Used by group constructors
1673 /// to build members via the same factory the caller registered
1674 /// with `register_factory`.
1675 #[cfg(feature = "groups")]
1676 pub(crate) fn factory_for_kind_pub(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1677 self.factory_for_kind(kind)
1678 }
1679
1680 /// `true` iff the runtime is in the `Ready` state. Groups use
1681 /// this to gate `spawn` (rejecting early-spawn calls with a
1682 /// typed `GroupError::NotReady`).
1683 #[cfg(feature = "groups")]
1684 pub(crate) fn is_ready_pub(&self) -> bool {
1685 self.state() == State::Ready
1686 }
1687}
1688
1689/// Handle to a running daemon. Clone-safe; dropping does not stop
1690/// the daemon — call [`DaemonRuntime::stop`] explicitly.
1691#[derive(Clone)]
1692pub struct DaemonHandle {
1693 /// Daemon's 64-bit origin hash. Stable for the daemon's lifetime
1694 /// and across migrations.
1695 pub origin_hash: u64,
1696 /// Daemon's full 32-byte entity id.
1697 pub entity_id: EntityId,
1698 inner: Arc<Inner>,
1699}
1700
1701impl DaemonHandle {
1702 /// Read the daemon's current stats.
1703 pub fn stats(&self) -> Result<DaemonStats, DaemonError> {
1704 self.inner
1705 .registry
1706 .stats(self.origin_hash)
1707 .map_err(DaemonError::Core)
1708 }
1709
1710 /// Take a snapshot of the daemon's current state. `Ok(None)` for
1711 /// stateless daemons.
1712 pub async fn snapshot(&self) -> Result<Option<StateSnapshot>, DaemonError> {
1713 self.inner
1714 .registry
1715 .snapshot(self.origin_hash)
1716 .map_err(DaemonError::Core)
1717 }
1718}
1719
1720impl std::fmt::Debug for DaemonRuntime {
1721 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1722 let factory_count = self.inner.factories.read().len();
1723 f.debug_struct("DaemonRuntime")
1724 .field("state", &self.state())
1725 .field("factories", &factory_count)
1726 .field("daemons", &self.inner.registry.count())
1727 .finish()
1728 }
1729}
1730
1731impl std::fmt::Debug for DaemonHandle {
1732 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1733 f.debug_struct("DaemonHandle")
1734 .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
1735 .field("entity_id", &self.entity_id)
1736 .finish()
1737 }
1738}
1739
1740/// Stage 3 of `DAEMON_CHANNEL_REBIND_PLAN.md` — after a migration
1741/// target restores a daemon, walk its subscription ledger and
1742/// re-send each `subscribe_channel` to the matching publisher so
1743/// messages flow to the target without waiting for the source's
1744/// entry to age out of the publisher's roster. Errors are
1745/// per-subscription; one publisher being offline doesn't fail the
1746/// rest.
1747///
1748/// Runs in a tokio task spawned by the post-restore callback
1749/// installed on `MigrationSubprotocolHandler`.
1750async fn replay_subscriptions(inner: Arc<Inner>, origin_hash: u64) {
1751 let bindings = match inner
1752 .registry
1753 .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1754 {
1755 Ok(list) => list,
1756 Err(_) => return,
1757 };
1758 for sub in bindings {
1759 let token = sub
1760 .token_bytes
1761 .as_deref()
1762 .and_then(|bytes| PermissionToken::from_bytes(bytes).ok());
1763 let result = match token {
1764 Some(tok) => {
1765 inner
1766 .mesh
1767 .inner()
1768 .subscribe_channel_with_token(sub.publisher, sub.channel.clone(), tok)
1769 .await
1770 }
1771 None => {
1772 inner
1773 .mesh
1774 .inner()
1775 .subscribe_channel(sub.publisher, sub.channel.clone())
1776 .await
1777 }
1778 };
1779 if let Err(e) = result {
1780 // Non-fatal: one subscription failing (publisher
1781 // offline, token expired, etc.) must not take down the
1782 // rest of the ledger's replay. The SDK doesn't depend
1783 // on `tracing`; drop the failure to stderr via
1784 // `eprintln!` so operators running `RUST_LOG=warn`
1785 // still see it. Future work can add a `ReplayPartial`
1786 // event on the migration phase stream (plan §
1787 // *Error surface*) to surface failures programmatically.
1788 eprintln!(
1789 "channel re-bind replay failed: daemon={:#x} channel={} publisher={:#x} error={}",
1790 origin_hash, sub.channel, sub.publisher, e,
1791 );
1792 }
1793 }
1794}
1795
1796/// Stage 4 of `DAEMON_CHANNEL_REBIND_PLAN.md` — fires at `Cutover`
1797/// on the source node (just before daemon cleanup). Walks the
1798/// daemon's ledger and sends `unsubscribe_channel` to each
1799/// publisher so rosters drop the source without waiting for
1800/// session-timeout (~30 s). Fire-and-forget: we don't block the
1801/// cutover dispatch on acks.
1802async fn teardown_subscriptions(inner: Arc<Inner>, bindings: Vec<SubscriptionBinding>) {
1803 for sub in bindings {
1804 if let Err(e) = inner
1805 .mesh
1806 .inner()
1807 .unsubscribe_channel(sub.publisher, sub.channel.clone())
1808 .await
1809 {
1810 // Non-fatal; the publisher's session timeout will
1811 // eventually clean up our stale roster entry even
1812 // without an explicit Unsubscribe.
1813 eprintln!(
1814 "channel re-bind teardown failed: channel={} publisher={:#x} error={}",
1815 sub.channel, sub.publisher, e,
1816 );
1817 }
1818 }
1819}
1820
1821/// Options for [`DaemonRuntime::start_migration_with`].
1822///
1823/// - Stage 6 of
1824/// [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1825/// the `transport_identity` flag. Default `true`.
1826/// - Stages 3 + 4 of
1827/// [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
1828/// the `retry_not_ready` budget. When the migration target
1829/// responds `NotReady` (runtime still in `Registering`), the
1830/// source backs off + re-initiates up to this total elapsed
1831/// time. `None` disables retry; the first `NotReady` surfaces
1832/// immediately.
1833#[derive(Debug, Clone)]
1834pub struct MigrationOpts {
1835 /// If `true` (default), the source node seals its daemon's
1836 /// ed25519 seed into the outbound snapshot using the target's
1837 /// X25519 static pubkey. The target unseals on arrival and the
1838 /// migrated daemon keeps its full signing capability.
1839 ///
1840 /// If `false`, the envelope is omitted and the target
1841 /// reconstructs the daemon with a `public_only` keypair —
1842 /// identity queries (`entity_id`, `origin_hash`) work, but
1843 /// `sign` calls fail with `EntityError::ReadOnly`. Appropriate
1844 /// for pure compute daemons that only consume events and emit
1845 /// payloads, and do NOT need to mint capability announcements
1846 /// or issue permission tokens from the target.
1847 pub transport_identity: bool,
1848
1849 /// Retry budget for [`MigrationFailureReason::NotReady`].
1850 ///
1851 /// `Some(d)` (default 30 s): on `NotReady`, the source backs
1852 /// off (500 ms → 1 s → 2 s → 4 s → 8 s, capped at 16 s) and
1853 /// re-initiates the migration. The total retry clock is
1854 /// capped at `d`; after that, the caller sees
1855 /// [`MigrationFailureReason::NotReadyTimeout`].
1856 ///
1857 /// `None`: no retry. The first `NotReady` surfaces as a
1858 /// terminal failure to the caller.
1859 pub retry_not_ready: Option<std::time::Duration>,
1860}
1861
1862impl Default for MigrationOpts {
1863 fn default() -> Self {
1864 Self {
1865 transport_identity: true,
1866 retry_not_ready: Some(std::time::Duration::from_secs(30)),
1867 }
1868 }
1869}
1870
1871/// Exponential backoff for the i-th `NotReady` retry attempt.
1872/// First retry waits 500 ms, subsequent retries double up to a
1873/// 16 s cap — total budget is controlled separately by
1874/// [`MigrationOpts::retry_not_ready`]. Matching the schedule in
1875/// `DAEMON_RUNTIME_READINESS_PLAN.md` § *Source-side retry*.
1876fn not_ready_backoff(attempt: u8) -> std::time::Duration {
1877 use std::time::Duration;
1878 let ms = 500u64 << (attempt.saturating_sub(1).min(5));
1879 Duration::from_millis(ms.min(16_000))
1880}
1881
1882/// Handle to an in-flight migration. Drop the handle and the
1883/// orchestrator continues driving the migration to completion in the
1884/// background; keep it to observe phase transitions or request abort.
1885///
1886/// Cheap to clone — the backing state is shared with the
1887/// [`DaemonRuntime`] that produced it.
1888#[derive(Clone)]
1889pub struct MigrationHandle {
1890 /// Daemon being migrated.
1891 pub origin_hash: u64,
1892 /// Source node that currently hosts the daemon.
1893 pub source_node: u64,
1894 /// Target node that will host the daemon after cutover.
1895 pub target_node: u64,
1896 /// Runtime the orchestrator lives on. Used by [`Self::phase`]
1897 /// and [`Self::wait`] to poll migration state.
1898 runtime: DaemonRuntime,
1899 /// Options the migration was initiated with. Drives retry
1900 /// policy on `NotReady` + the identity-transport flag for
1901 /// re-initiated attempts.
1902 opts: MigrationOpts,
1903}
1904
1905impl MigrationHandle {
1906 /// Current migration phase, or `None` once the migration has
1907 /// left the orchestrator's records (either via `Complete` → auto
1908 /// cleanup, or via explicit abort). Callers distinguish the two
1909 /// by remembering the last non-None phase.
1910 pub fn phase(&self) -> Option<MigrationPhase> {
1911 self.runtime.inner.orchestrator.status(self.origin_hash)
1912 }
1913
1914 /// Block until the migration reaches a terminal state.
1915 ///
1916 /// Returns `Ok(())` on normal completion (saw `Complete`, then
1917 /// the orchestrator cleaned up). Returns `Err(MigrationError)`
1918 /// if the orchestrator's record disappeared without the caller
1919 /// ever having seen `Complete` — either an explicit abort or a
1920 /// failure at some upstream stage.
1921 ///
1922 /// This method does **not** enforce any wall-clock timeout. A
1923 /// migration that stalls waiting on an unresponsive peer will
1924 /// block indefinitely; callers that want a bound should use
1925 /// [`Self::wait_with_timeout`] instead.
1926 ///
1927 /// Polls every 50 ms. The implementation is deliberately
1928 /// simple — Stage 2 of `DAEMON_IDENTITY_MIGRATION_PLAN.md` and
1929 /// the V2 iteration of this plan will swap this for a
1930 /// broadcast-channel push, but 50 ms polling is plenty for the
1931 /// use cases a migration API sees today.
1932 pub async fn wait(self) -> Result<(), DaemonError> {
1933 self.wait_until(None).await
1934 }
1935
1936 /// Like [`Self::wait`] with a caller-controlled timeout. A
1937 /// timeout aborts the orchestrator-side record and returns
1938 /// `Err(MigrationError::StateFailed)`; a graceful `Complete`
1939 /// returns `Ok`.
1940 pub async fn wait_with_timeout(self, timeout: std::time::Duration) -> Result<(), DaemonError> {
1941 let deadline = tokio::time::Instant::now() + timeout;
1942 self.wait_until(Some(deadline)).await
1943 }
1944
1945 /// Inner wait loop with an optional deadline. `None` = block
1946 /// forever until the migration reaches a terminal state;
1947 /// `Some(d)` = give up and abort at `d`.
1948 ///
1949 /// Centralised here so `wait` and `wait_with_timeout` can't
1950 /// drift on the retry/backoff semantics, and so the
1951 /// "hidden 60 s ceiling" cannot reappear under `wait`.
1952 async fn wait_until(
1953 self,
1954 overall_deadline: Option<tokio::time::Instant>,
1955 ) -> Result<(), DaemonError> {
1956 let start = tokio::time::Instant::now();
1957 let retry_deadline = self.opts.retry_not_ready.map(|b| start + b);
1958 let mut attempts: u8 = 1; // first attempt initiated by start_migration_with
1959 loop {
1960 match self.wait_one_attempt(overall_deadline).await {
1961 Ok(()) => return Ok(()),
1962 Err(DaemonError::MigrationFailed(reason)) if reason.is_retriable() => {
1963 // Retry decision.
1964 let Some(retry_d) = retry_deadline else {
1965 // Opts explicitly disable retry — surface
1966 // the NotReady verbatim.
1967 return Err(DaemonError::MigrationFailed(reason));
1968 };
1969 let now = tokio::time::Instant::now();
1970 let overall_exhausted = overall_deadline.map(|d| now >= d).unwrap_or(false);
1971 if now >= retry_d || overall_exhausted {
1972 // Budget exhausted. `NotReadyTimeout` carries
1973 // the attempt count for operator diagnosis.
1974 return Err(DaemonError::MigrationFailed(
1975 MigrationFailureReason::NotReadyTimeout { attempts },
1976 ));
1977 }
1978 // Back off and re-initiate.
1979 let backoff = not_ready_backoff(attempts);
1980 tokio::time::sleep(backoff).await;
1981 attempts = attempts.saturating_add(1);
1982 // Re-initiate the migration by calling the
1983 // orchestrator fresh. The previous record has
1984 // been cleaned up by the dispatcher's
1985 // MigrationFailed handler, so this starts a new
1986 // attempt from phase 0.
1987 self.reinitiate_attempt().await?;
1988 // Loop; poll this new attempt's outcome.
1989 }
1990 Err(e) => return Err(e),
1991 }
1992 }
1993 }
1994
1995 /// Poll a single migration attempt's outcome. Returns:
1996 /// - `Ok(())` on Complete.
1997 /// - `Err(MigrationFailed(reason))` when the dispatcher
1998 /// observed a structured failure.
1999 /// - `Err(Migration(_))` on overall-timeout or unknown abort.
2000 ///
2001 /// `overall_deadline = None` disables the deadline check — the
2002 /// poll loop only returns via a terminal status transition.
2003 async fn wait_one_attempt(
2004 &self,
2005 overall_deadline: Option<tokio::time::Instant>,
2006 ) -> Result<(), DaemonError> {
2007 loop {
2008 let current_phase = self.runtime.inner.orchestrator.status(self.origin_hash);
2009 match current_phase {
2010 Some(phase) => {
2011 if phase == MigrationPhase::Complete {
2012 // Give the dispatcher a beat to finish
2013 // cleanup, then surface success.
2014 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2015 return Ok(());
2016 }
2017 }
2018 None => {
2019 // A recorded failure is authoritative — the
2020 // dispatcher populates `recent_failures` before
2021 // the orchestrator removes the record, so
2022 // status=None + recorded-reason unambiguously
2023 // means abort.
2024 if let Some(reason) = self.take_recent_failure() {
2025 return Err(DaemonError::MigrationFailed(reason));
2026 }
2027 // No recorded failure. The orchestrator removes
2028 // records via two paths:
2029 // 1. `on_activate_ack` — success.
2030 // 2. `abort_migration_with_reason` — failure,
2031 // which rides through the dispatcher's
2032 // `MigrationFailed` handler *before* the
2033 // record is dropped, so `recent_failures`
2034 // is populated first.
2035 // Therefore status=None + no-recorded-failure is
2036 // unambiguously success, regardless of what
2037 // phase we last observed. This matters when the
2038 // dispatcher runs the tail of the migration
2039 // (Cutover → Complete → ActivateAck) entirely
2040 // between two 50 ms polls — we may never observe
2041 // `Complete` explicitly.
2042 //
2043 // Synchronous abort paths that bypass the
2044 // dispatcher (`wait_one_attempt`'s own timeout,
2045 // `start_migration_with`'s send-failure path)
2046 // return `Err` to the caller *before* the wait
2047 // loop can observe the None status, so they
2048 // don't trip this branch.
2049 return Ok(());
2050 }
2051 }
2052 if let Some(d) = overall_deadline {
2053 if tokio::time::Instant::now() >= d {
2054 let _ = self
2055 .runtime
2056 .inner
2057 .orchestrator
2058 .abort_migration(self.origin_hash, "timeout".into());
2059 return Err(DaemonError::Migration(MigrationError::StateFailed(
2060 format!("migration timed out in phase {:?}", current_phase),
2061 )));
2062 }
2063 }
2064 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2065 }
2066 }
2067
2068 /// Re-initiate a migration attempt for the same daemon after a
2069 /// retriable failure. Calls the orchestrator fresh + sends the
2070 /// first wire message; mirrors the tail of
2071 /// [`DaemonRuntime::start_migration_with`] but without building
2072 /// a new handle (we keep the existing one).
2073 async fn reinitiate_attempt(&self) -> Result<(), DaemonError> {
2074 let msgs = self
2075 .runtime
2076 .inner
2077 .orchestrator
2078 .start_migration(self.origin_hash, self.source_node, self.target_node)
2079 .map_err(DaemonError::Migration)?;
2080
2081 let msgs = if self.opts.transport_identity {
2082 self.runtime
2083 .maybe_seal_chunked_snapshot(self.origin_hash, self.target_node, msgs)
2084 .await?
2085 } else {
2086 msgs
2087 };
2088
2089 let dest_node = match msgs.first() {
2090 Some(MigrationMessage::TakeSnapshot { .. }) => self.source_node,
2091 Some(MigrationMessage::SnapshotReady { .. }) => self.target_node,
2092 Some(other) => {
2093 let _ = self
2094 .runtime
2095 .inner
2096 .orchestrator
2097 .abort_migration(self.origin_hash, "unexpected retry message".into());
2098 return Err(DaemonError::Migration(MigrationError::StateFailed(
2099 format!("unexpected retry initial message: {other:?}"),
2100 )));
2101 }
2102 None => {
2103 let _ = self.runtime.inner.orchestrator.abort_migration(
2104 self.origin_hash,
2105 "orchestrator returned no retry messages".into(),
2106 );
2107 return Err(DaemonError::Migration(MigrationError::StateFailed(
2108 "orchestrator returned no migration messages on retry".into(),
2109 )));
2110 }
2111 };
2112
2113 for msg in &msgs {
2114 self.runtime.send_migration_message(dest_node, msg).await?;
2115 }
2116 Ok(())
2117 }
2118
2119 /// Pop the most recent `MigrationFailureReason` the dispatcher
2120 /// observed for this migration (if any). Consumed: subsequent
2121 /// calls see `None` until the next failure arrives.
2122 fn take_recent_failure(&self) -> Option<MigrationFailureReason> {
2123 self.runtime
2124 .inner
2125 .recent_failures
2126 .lock()
2127 .remove(&self.origin_hash)
2128 }
2129
2130 /// Request abort. The orchestrator emits a `MigrationFailed`
2131 /// message to involved nodes and clears its record; the target
2132 /// rolls back via its own handler. Best-effort — a migration
2133 /// past `Cutover` cannot be undone cleanly because routing has
2134 /// already flipped.
2135 pub async fn cancel(&self) -> Result<(), DaemonError> {
2136 let msg = self
2137 .runtime
2138 .inner
2139 .orchestrator
2140 .abort_migration(self.origin_hash, "cancel requested".into())
2141 .map_err(DaemonError::Migration)?;
2142 // Best-effort notify — ignore send errors, the orchestrator
2143 // record is already gone on our side.
2144 let _ = self
2145 .runtime
2146 .send_migration_message(self.source_node, &msg)
2147 .await;
2148 let _ = self
2149 .runtime
2150 .send_migration_message(self.target_node, &msg)
2151 .await;
2152 Ok(())
2153 }
2154}
2155
2156impl std::fmt::Debug for MigrationHandle {
2157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2158 f.debug_struct("MigrationHandle")
2159 .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
2160 .field("source_node", &format_args!("{:#x}", self.source_node))
2161 .field("target_node", &format_args!("{:#x}", self.target_node))
2162 .field("phase", &self.phase())
2163 .finish()
2164 }
2165}