net_sdk/compute.rs
1//! Compute surface — `MeshDaemon` + `DaemonRuntime`.
2//!
3//! Users implement [`MeshDaemon`] and hand it to a [`DaemonRuntime`]
4//! tied to a [`Mesh`] node. The runtime holds the
5//! kind-keyed factory table, the per-daemon host registry, and the
6//! lifecycle gate that decides when inbound migrations may land.
7//!
8//! This file is Stage 1 of
9//! [`SDK_COMPUTE_SURFACE_PLAN.md`](../../../docs/SDK_COMPUTE_SURFACE_PLAN.md)
10//! plus the lifecycle half of
11//! [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
12//! local spawn / snapshot / stop, with an explicit
13//! `Registering → Ready → ShuttingDown` fence. Migration is Stage 2;
14//! the wire-level half of the readiness plan
15//! (`MigrationFailureReason`) ships alongside it.
16//!
17//! # Example
18//!
19//! ```no_run
20//! use std::sync::Arc;
21//! use bytes::Bytes;
22//! use net_sdk::{Identity, Mesh};
23//! use net_sdk::compute::{
24//! CausalEvent, DaemonHostConfig, DaemonRuntime, MeshDaemon,
25//! };
26//! use net_sdk::capabilities::CapabilityFilter;
27//! use net::adapter::net::compute::DaemonError as CoreDaemonError;
28//!
29//! struct EchoDaemon;
30//! impl MeshDaemon for EchoDaemon {
31//! fn name(&self) -> &str { "echo" }
32//! fn requirements(&self) -> CapabilityFilter { CapabilityFilter::default() }
33//! fn process(&mut self, event: &CausalEvent) -> Result<Vec<Bytes>, CoreDaemonError> {
34//! Ok(vec![event.payload.clone()])
35//! }
36//! }
37//!
38//! # async fn example(mesh: Arc<Mesh>) -> Result<(), Box<dyn std::error::Error>> {
39//! let rt = DaemonRuntime::new(mesh);
40//! rt.register_factory("echo", || Box::new(EchoDaemon))?;
41//! rt.start().await?;
42//! let handle = rt.spawn("echo", Identity::generate(), DaemonHostConfig::default()).await?;
43//! # Ok(())
44//! # }
45//! ```
46
47use std::collections::HashMap;
48use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
49use std::sync::{Arc, Mutex, RwLock};
50
51use thiserror::Error;
52
53pub use ::net::adapter::net::compute::{
54 DaemonBindings, DaemonError as CoreDaemonError, DaemonHostConfig, DaemonStats, MeshDaemon,
55 MigrationError, MigrationFailureReason, MigrationPhase, PlacementDecision, SchedulerError,
56 SubscriptionBinding, SUBPROTOCOL_MIGRATION,
57};
58pub use ::net::adapter::net::state::causal::{CausalEvent, CausalLink};
59pub use ::net::adapter::net::state::snapshot::StateSnapshot;
60
61use ::net::adapter::net::channel::ChannelName;
62use ::net::adapter::net::identity::PermissionToken;
63
64use ::net::adapter::net::behavior::capability::CapabilitySet;
65use ::net::adapter::net::compute::{
66 chunk_snapshot, orchestrator::wire as migration_wire, DaemonFactoryRegistry, DaemonHost,
67 DaemonRegistry, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
68 MigrationTargetHandler, Scheduler,
69};
70use ::net::adapter::net::identity::EntityId;
71use ::net::adapter::net::subprotocol::{
72 FailureCallback, MigrationHandlerHooks, MigrationSubprotocolHandler, PostRestoreCallback,
73 PreCleanupCallback, ReadinessCallback,
74};
75
76use crate::identity::Identity;
77use crate::mesh::Mesh;
78
79/// Arc-wrapped factory closure. Kind-keyed at the SDK layer; cloned
80/// into the core `DaemonFactoryRegistry` at `spawn` time so a future
81/// migration target can reconstruct the daemon by `origin_hash`.
82type FactoryFn = Arc<dyn Fn() -> Box<dyn MeshDaemon> + Send + Sync>;
83
84/// Errors from the SDK daemon runtime.
85#[derive(Debug, Error)]
86pub enum DaemonError {
87 /// `start()` has not been called yet; the runtime is still in
88 /// `Registering` and will not accept spawns or migrations.
89 #[error("daemon runtime is not ready — call DaemonRuntime::start() first")]
90 NotReady,
91 /// `shutdown()` has been called; the runtime is permanently
92 /// non-functional.
93 #[error("daemon runtime has been shut down")]
94 ShuttingDown,
95 /// Two `register_factory` calls used the same `kind` string.
96 #[error("factory for kind '{0}' is already registered")]
97 FactoryAlreadyRegistered(String),
98 /// `spawn` / `spawn_from_snapshot` referenced an unregistered kind.
99 #[error("no factory registered for kind '{0}'")]
100 FactoryNotFound(String),
101 /// The snapshot's `entity_id.origin_hash` does not match the
102 /// identity handed to `spawn_from_snapshot`.
103 #[error(
104 "snapshot/identity mismatch: snapshot origin {snapshot:#x} != identity origin {identity:#x}"
105 )]
106 SnapshotIdentityMismatch { snapshot: u64, identity: u64 },
107 /// Pass-through for errors surfaced by the core compute layer.
108 #[error(transparent)]
109 Core(#[from] CoreDaemonError),
110 /// Pass-through for migration-layer errors.
111 #[error("migration failed: {0}")]
112 Migration(#[from] MigrationError),
113 /// Structured failure reason surfaced by the migration
114 /// dispatcher on the source side. Use
115 /// [`MigrationFailureReason::is_retriable`] to decide whether
116 /// the caller should back off and retry rather than propagating.
117 #[error("migration failed: {0}")]
118 MigrationFailed(MigrationFailureReason),
119}
120
121// Runtime state machine. Encoded as `u8` so it rides in an
122// `AtomicU8` without an extra layer of indirection. Values are
123// stable across the lifetime of a `DaemonRuntime` and must not be
124// reordered (release / acquire cmpxchg compares by value).
125#[repr(u8)]
126#[derive(Copy, Clone, Debug, Eq, PartialEq)]
127enum State {
128 Registering = 0,
129 Ready = 1,
130 ShuttingDown = 2,
131}
132
133impl State {
134 fn from_u8(v: u8) -> Self {
135 match v {
136 0 => State::Registering,
137 1 => State::Ready,
138 2 => State::ShuttingDown,
139 // `AtomicU8` is only written through `State::*` variants,
140 // so any other value means memory corruption. Panic
141 // loudly rather than silently misinterpret.
142 other => panic!("daemon runtime: corrupt state byte {other}"),
143 }
144 }
145}
146
147/// Per-mesh compute runtime.
148///
149/// Holds the kind-keyed factory table, the per-daemon host registry,
150/// and the `Registering → Ready → ShuttingDown` lifecycle gate. One
151/// `DaemonRuntime` per [`Mesh`]; clone the handle freely — the inner
152/// state is `Arc`-shared.
153#[derive(Clone)]
154pub struct DaemonRuntime {
155 inner: Arc<Inner>,
156}
157
158struct Inner {
159 mesh: Arc<Mesh>,
160 state: AtomicU8,
161 /// SDK-side kind → factory map. The migration target path reaches
162 /// into the core `factory_registry` below by `origin_hash`;
163 /// `kind` is SDK sugar so the *caller* can spawn without knowing
164 /// the underlying keypair.
165 factories: RwLock<HashMap<String, FactoryFn>>,
166 /// Core registry — shared with the migration target handler so
167 /// daemons restored from an inbound snapshot land in the same
168 /// map a local `spawn` uses.
169 registry: Arc<DaemonRegistry>,
170 /// Core scheduler — built once at `new()` from the mesh's
171 /// shared `CapabilityIndex`. Used by the `groups` feature's
172 /// `ReplicaGroup` / `ForkGroup` / `StandbyGroup` for
173 /// capability-based placement.
174 ///
175 /// `local_caps` is `CapabilitySet::default()` because the mesh
176 /// doesn't currently expose the locally-announced caps as a
177 /// readable snapshot. The only behavioral impact is that
178 /// capability-filtered daemons which could run locally won't
179 /// get `LocalPreferred` short-circuit placement — they fall
180 /// through to the `CapabilityIndex::query` path, which still
181 /// returns the local node (with the right announcements) and
182 /// places correctly. Revisit when a `MeshNode::local_caps()`
183 /// getter lands.
184 #[cfg_attr(not(feature = "groups"), allow(dead_code))]
185 scheduler: Arc<Scheduler>,
186 /// Core factory registry, keyed by `origin_hash`. `spawn` mirrors
187 /// each SDK-side kind registration into this map with the
188 /// concrete keypair attached so the migration target restores
189 /// through the existing `DaemonFactoryRegistry::construct` path.
190 factory_registry: Arc<DaemonFactoryRegistry>,
191 /// Migration orchestrator, owned by this node. Orchestrates the
192 /// 6-phase state machine when this node initiates a migration.
193 orchestrator: Arc<MigrationOrchestrator>,
194 /// Migration source handler — drives the source side when THIS
195 /// node is the source of a migration.
196 source_handler: Arc<MigrationSourceHandler>,
197 /// Migration target handler — drives the target side when THIS
198 /// node is the target of a migration.
199 target_handler: Arc<MigrationTargetHandler>,
200 /// Most recent `MigrationFailureReason` observed on the source
201 /// side for each migration, keyed by `daemon_origin`. Populated
202 /// by the dispatcher's failure callback; consumed by
203 /// `MigrationHandle::wait` to surface the reason to the caller.
204 /// Mutex because the SDK's dependency set doesn't include
205 /// `dashmap` directly, and this map sees low write frequency
206 /// (one entry per failed migration).
207 recent_failures: Mutex<HashMap<u64, MigrationFailureReason>>,
208 /// Test-only knob: when set to `true`, the readiness callback
209 /// reports "not ready" even when the runtime is in `Ready`.
210 /// Lets integration tests drive the `NotReady` retry path
211 /// without racing against runtime startup. Defaults to `false`;
212 /// production code should not touch it.
213 simulate_not_ready: AtomicBool,
214 /// Test-only stall injected into `spawn` between the
215 /// `require_ready` check and the registry inserts. Measured
216 /// in milliseconds; `0` = no stall (production default). See
217 /// [`DaemonRuntime::set_spawn_stall_ms`].
218 spawn_stall_ms: std::sync::atomic::AtomicU32,
219 /// Test-only stall injected into `start` between installing
220 /// the migration handler and the Registering→Ready CAS.
221 /// Measured in milliseconds; `0` = no stall.
222 start_stall_ms: std::sync::atomic::AtomicU32,
223 /// Per-origin post-delivery observers. Fired on every successful
224 /// `deliver(origin_hash, event)`, inside the registry call and
225 /// after the daemon's `process` returns Ok.
226 ///
227 /// **Why this exists.** `StandbyGroup` needs every event routed
228 /// to its active member captured in a replay buffer so a
229 /// promoted standby can reapply the window between the last
230 /// `sync_standbys` and the failure. A manual `on_event_delivered`
231 /// hook relied on the caller pairing every `deliver` with the
232 /// buffering call, which silently lost events on omission.
233 /// Observers close that gap: a group installs a weak-ref
234 /// closure that pushes the event into its buffer automatically,
235 /// with no contract on the caller.
236 ///
237 /// `pub(crate)` so `sdk/src/groups/*` can register; not a
238 /// stable public API. The observer list is a `Vec` so unrelated
239 /// subsystems (future audit / metrics hooks) could coexist on
240 /// the same origin without one overwriting another.
241 observers: Mutex<HashMap<u64, Vec<(u64, DeliverObserver)>>>,
242 /// Monotonic id minted by `register_deliver_observer`, used by
243 /// the returned `ObserverHandle` to identify its entry on
244 /// `Drop`/`unregister`.
245 #[cfg_attr(not(feature = "groups"), allow(dead_code))]
246 observer_id_counter: std::sync::atomic::AtomicU64,
247}
248
249/// A post-delivery observer closure registered against an
250/// `origin_hash`. Fires on every successful `DaemonRuntime::deliver`
251/// for that origin.
252///
253/// Observers MUST be cheap — they run on the delivery thread, after
254/// the daemon's `process` but before `deliver` returns. A slow or
255/// panicking observer stalls delivery. The in-tree use (standby
256/// replay buffer) is a single `VecDeque::push_back` under a
257/// short-lived lock; new observers should aim for similar.
258pub(crate) type DeliverObserver = Arc<dyn Fn(&CausalEvent) + Send + Sync>;
259
260/// Handle returned by
261/// [`DaemonRuntime::register_deliver_observer`]. Dropping the
262/// handle removes the observer from the runtime's per-origin
263/// observer list. Safe to drop after the runtime itself has been
264/// dropped: the `Weak` upgrade in `Drop` silently no-ops.
265#[cfg_attr(not(feature = "groups"), allow(dead_code))]
266pub(crate) struct ObserverHandle {
267 runtime: std::sync::Weak<Inner>,
268 origin: u64,
269 id: u64,
270}
271
272impl Drop for ObserverHandle {
273 fn drop(&mut self) {
274 if let Some(inner) = self.runtime.upgrade() {
275 if let Ok(mut map) = inner.observers.lock() {
276 if let Some(v) = map.get_mut(&self.origin) {
277 v.retain(|(id, _)| *id != self.id);
278 if v.is_empty() {
279 map.remove(&self.origin);
280 }
281 }
282 }
283 }
284 }
285}
286
287impl DaemonRuntime {
288 /// Attach a runtime to an existing [`Mesh`]. Stage 1 does not
289 /// consume the `Mesh` — users keep their `Arc<Mesh>` for channel
290 /// registration, subscription, and the rest of the non-compute
291 /// surface. Stage 2 will install the migration subprotocol
292 /// handler when [`Self::start`] runs; until then inbound
293 /// migration messages (if any) are silently dropped by the core,
294 /// same as today.
295 pub fn new(mesh: Arc<Mesh>) -> Self {
296 let local_node_id = mesh.inner().node_id();
297 let registry = Arc::new(DaemonRegistry::new());
298 let factory_registry = Arc::new(DaemonFactoryRegistry::new());
299 let source_handler = Arc::new(MigrationSourceHandler::new(registry.clone()));
300 // Wire `source_handler` into the orchestrator so
301 // local-source migrations register the migration in the
302 // source-side handler — without this, post-snapshot events
303 // are silently mutated into the source daemon's state and
304 // lost at cutover. See
305 // `MigrationOrchestrator::with_source_handler`.
306 let orchestrator = Arc::new(
307 MigrationOrchestrator::new(registry.clone(), local_node_id)
308 .with_source_handler(source_handler.clone()),
309 );
310 let target_handler = Arc::new(MigrationTargetHandler::new_with_factories(
311 registry.clone(),
312 factory_registry.clone(),
313 ));
314 // Scheduler shares the mesh's `CapabilityIndex`, so
315 // announcements the mesh publishes become visible to
316 // placement queries immediately. `CapabilitySet::default()`
317 // for `local_caps` is a known gap — see the `scheduler`
318 // field's docstring on [`Inner`].
319 let scheduler = Arc::new(Scheduler::new(
320 mesh.inner().capability_index().clone(),
321 local_node_id,
322 CapabilitySet::default(),
323 ));
324 Self {
325 inner: Arc::new(Inner {
326 mesh,
327 state: AtomicU8::new(State::Registering as u8),
328 factories: RwLock::new(HashMap::new()),
329 registry,
330 scheduler,
331 factory_registry,
332 orchestrator,
333 source_handler,
334 target_handler,
335 recent_failures: Mutex::new(HashMap::new()),
336 simulate_not_ready: AtomicBool::new(false),
337 spawn_stall_ms: std::sync::atomic::AtomicU32::new(0),
338 start_stall_ms: std::sync::atomic::AtomicU32::new(0),
339 observers: Mutex::new(HashMap::new()),
340 observer_id_counter: std::sync::atomic::AtomicU64::new(1),
341 }),
342 }
343 }
344
345 /// Register a post-delivery observer for `origin_hash`. Every
346 /// successful `deliver(origin_hash, event)` fires the closure
347 /// after the daemon's `process` returns Ok. Returns an
348 /// [`ObserverHandle`] whose `Drop` unregisters the observer.
349 ///
350 /// `pub(crate)` — only the SDK's own group wrappers use this.
351 /// See the `observers` field on [`Inner`] for the design rationale.
352 #[cfg_attr(not(feature = "groups"), allow(dead_code))]
353 pub(crate) fn register_deliver_observer(
354 &self,
355 origin: u64,
356 cb: DeliverObserver,
357 ) -> ObserverHandle {
358 let id = self
359 .inner
360 .observer_id_counter
361 .fetch_add(1, Ordering::Relaxed);
362 {
363 let mut map = self
364 .inner
365 .observers
366 .lock()
367 .expect("DaemonRuntime observers mutex poisoned");
368 map.entry(origin).or_default().push((id, cb));
369 }
370 ObserverHandle {
371 runtime: Arc::downgrade(&self.inner),
372 origin,
373 id,
374 }
375 }
376
377 /// Register a factory for a daemon type. `kind` is a user-chosen
378 /// string shared across every node that may host this daemon.
379 /// Second registrations of the same `kind` return
380 /// [`DaemonError::FactoryAlreadyRegistered`].
381 ///
382 /// Valid in both `Registering` and `Ready` states; the runtime
383 /// permits new kinds to appear at runtime. Only `ShuttingDown`
384 /// rejects.
385 ///
386 /// # Migration targeting
387 ///
388 /// `register_factory` alone is **not sufficient** to accept
389 /// inbound migrations — it registers the kind-to-closure mapping
390 /// only on the SDK side. The core migration dispatcher looks up
391 /// factories by `origin_hash` (the daemon's identity), not by
392 /// `kind`, because the migration wire protocol doesn't carry a
393 /// kind string; the target couldn't pick the right factory from
394 /// an inbound snapshot without an explicit binding.
395 ///
396 /// To accept migrations for a specific daemon, the target must
397 /// ALSO call one of:
398 ///
399 /// - [`Self::expect_migration`] `(kind, origin_hash, config)` —
400 /// placeholder factory keyed by `origin_hash`; the envelope
401 /// on the snapshot supplies the keypair at restore time.
402 /// This is the common case.
403 /// - [`Self::register_migration_target_identity`] `(kind,
404 /// identity, config)` — pre-provisions the keypair as a
405 /// fallback when the source migrates with
406 /// `transport_identity: false`.
407 ///
408 /// Or spawn the daemon locally first (via [`Self::spawn`]);
409 /// spawn seeds both the SDK map and the core registry, so a
410 /// daemon that migrated out and migrates back in on the same
411 /// node is covered without extra calls.
412 pub fn register_factory<F>(&self, kind: &str, factory: F) -> Result<(), DaemonError>
413 where
414 F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
415 {
416 if self.state() == State::ShuttingDown {
417 return Err(DaemonError::ShuttingDown);
418 }
419 // `contains_key` + `insert` would be atomic under the
420 // single `write` guard, but the `entry` API makes
421 // atomicity self-evident in one expression — no opening
422 // for a future reviewer to wonder whether the check and
423 // the insert can drift across two separately-acquired
424 // guards.
425 let mut map = self.inner.factories.write().expect("factory map poisoned");
426 match map.entry(kind.to_string()) {
427 std::collections::hash_map::Entry::Occupied(_) => {
428 Err(DaemonError::FactoryAlreadyRegistered(kind.to_string()))
429 }
430 std::collections::hash_map::Entry::Vacant(slot) => {
431 slot.insert(Arc::new(factory));
432 Ok(())
433 }
434 }
435 }
436
437 /// Promote to `Ready`. Idempotent — a second call on an already-
438 /// `Ready` runtime is a no-op; a call on a `ShuttingDown` runtime
439 /// returns [`DaemonError::ShuttingDown`].
440 ///
441 /// Wires the migration subprotocol (`0x0500`) handler into the
442 /// mesh so inbound `TakeSnapshot` / `SnapshotReady` / etc.
443 /// messages reach the orchestrator / source / target handlers
444 /// owned by this runtime. Installing is idempotent w.r.t.
445 /// multiple `start` calls — the `ArcSwapOption` on the mesh
446 /// swaps the same handler in on each call.
447 pub async fn start(&self) -> Result<(), DaemonError> {
448 loop {
449 match self.state() {
450 State::Registering => {
451 // Install the migration subprotocol handler
452 // **before** publishing `Ready`. Other threads
453 // that observe `Ready` must be able to rely on
454 // the handler being live: the previous ordering
455 // (CAS → install) left a window where a
456 // concurrent caller read `Ready`, began a
457 // migration, and sent `SnapshotReady` onto a
458 // mesh whose handler slot was still empty —
459 // the dispatcher's no-handler fallback would
460 // synthesise `ComputeNotSupported`, aborting
461 // the migration nondeterministically during
462 // startup.
463 //
464 // Double-install is safe: `set_migration_handler`
465 // is an `ArcSwap` store, so if two concurrent
466 // `start()`s both reach this point the later
467 // store just wins and the CAS picks one caller
468 // to return first. Both built handlers are
469 // functionally equivalent (same registry,
470 // orchestrator, hooks).
471 let handler = Arc::new(self.build_migration_handler());
472 self.inner.mesh.inner().set_migration_handler(handler);
473
474 // Test-only stall between install and CAS so
475 // integration tests can race `shutdown` in. In
476 // production the atomic load is 0 and this is
477 // a no-op.
478 let stall_ms = self.inner.start_stall_ms.load(Ordering::Acquire);
479 if stall_ms > 0 {
480 tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
481 }
482
483 let swap = self.inner.state.compare_exchange(
484 State::Registering as u8,
485 State::Ready as u8,
486 Ordering::AcqRel,
487 Ordering::Acquire,
488 );
489 if swap.is_ok() {
490 return Ok(());
491 }
492 // Lost the CAS. State is now either `Ready`
493 // (another `start` caller beat us) or
494 // `ShuttingDown` (a concurrent `shutdown`
495 // raced past our install). Handle the second
496 // case explicitly — otherwise a torn-down
497 // runtime would leave a live handler on the
498 // mesh, accepting migration traffic and firing
499 // callbacks against stale registry state.
500 //
501 // Under the `Ready` case we leave the handler
502 // installed: the winning `start` caller also
503 // installed an equivalent handler (same
504 // registry, orchestrator, hooks), so whichever
505 // `ArcSwap` store lands last is indistinguishable
506 // from the winner's.
507 if self.state() == State::ShuttingDown {
508 self.inner.mesh.inner().clear_migration_handler();
509 return Err(DaemonError::ShuttingDown);
510 }
511 // CAS lost to another `start` that won the
512 // race — loop to re-classify and return Ok on
513 // the `Ready` arm.
514 }
515 State::Ready => return Ok(()),
516 State::ShuttingDown => return Err(DaemonError::ShuttingDown),
517 }
518 }
519 }
520
521 /// Construct the migration subprotocol handler with every
522 /// hook wired — identity context, channel-rebind replay,
523 /// unsubscribe teardown, readiness predicate, failure
524 /// observer. Extracted so [`Self::start`] can install it
525 /// before the atomic state flip (race fix) without burying
526 /// the construction inline.
527 fn build_migration_handler(&self) -> MigrationSubprotocolHandler {
528 let local_node_id = self.inner.mesh.inner().node_id();
529 // Ask the core crate to build the context. The Noise static
530 // private key is captured inside closures the core owns and
531 // never crosses this boundary as raw bytes — see
532 // `MeshNode::migration_identity_context`.
533 let ctx = self.inner.mesh.inner().migration_identity_context();
534 let inner_for_rebind = self.inner.clone();
535 let post_restore: PostRestoreCallback = Arc::new(move |origin_hash: u64| {
536 let inner = inner_for_rebind.clone();
537 tokio::spawn(async move {
538 replay_subscriptions(inner, origin_hash).await;
539 });
540 });
541 let inner_for_teardown = self.inner.clone();
542 let pre_cleanup: PreCleanupCallback = Arc::new(move |origin_hash: u64| {
543 // Snapshot the ledger BEFORE cleanup drops the host —
544 // after that, the ledger is gone. Spawn async
545 // unsubscribes so the dispatcher thread returns
546 // immediately.
547 let bindings = inner_for_teardown
548 .registry
549 .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
550 .unwrap_or_default();
551 if bindings.is_empty() {
552 return;
553 }
554 let inner = inner_for_teardown.clone();
555 tokio::spawn(async move {
556 teardown_subscriptions(inner, bindings).await;
557 });
558 });
559 let inner_for_readiness = self.inner.clone();
560 let readiness: ReadinessCallback = Arc::new(move || {
561 // Test-only: `simulate_not_ready` flips the predicate
562 // to false regardless of the underlying lifecycle
563 // state. Honour it first so integration tests can
564 // drive the NotReady retry path.
565 if inner_for_readiness
566 .simulate_not_ready
567 .load(Ordering::Acquire)
568 {
569 return false;
570 }
571 inner_for_readiness.state.load(Ordering::Acquire) == State::Ready as u8
572 });
573 let inner_for_failure = self.inner.clone();
574 let failure: FailureCallback =
575 Arc::new(move |origin_hash: u64, reason: MigrationFailureReason| {
576 if let Ok(mut map) = inner_for_failure.recent_failures.lock() {
577 map.insert(origin_hash, reason);
578 }
579 });
580 MigrationSubprotocolHandler::with_hooks(
581 self.inner.orchestrator.clone(),
582 self.inner.source_handler.clone(),
583 self.inner.target_handler.clone(),
584 local_node_id,
585 MigrationHandlerHooks {
586 identity: Some(ctx),
587 post_restore: Some(post_restore),
588 pre_cleanup: Some(pre_cleanup),
589 readiness: Some(readiness),
590 failure: Some(failure),
591 },
592 )
593 }
594
595 /// Tear down the runtime. Unregisters every local daemon host,
596 /// clears the factory registry, and transitions state to
597 /// `ShuttingDown`. Subsequent calls on this runtime fail with
598 /// [`DaemonError::ShuttingDown`]. A second `shutdown` is a no-op.
599 pub async fn shutdown(&self) -> Result<(), DaemonError> {
600 // Mark ShuttingDown first so new spawns / registrations
601 // immediately short-circuit. The store isn't a CAS — a
602 // `ShuttingDown → ShuttingDown` re-store is cheap and
603 // benign.
604 self.inner
605 .state
606 .store(State::ShuttingDown as u8, Ordering::Release);
607
608 // Drain the registry. `list()` snapshots origin_hashes under
609 // its internal read guard; iterating is safe because we own
610 // the unregister path.
611 let origins: Vec<u64> = self
612 .inner
613 .registry
614 .list()
615 .into_iter()
616 .map(|(origin, _)| origin)
617 .collect();
618 for origin in origins {
619 let _ = self.inner.registry.unregister(origin);
620 self.inner.factory_registry.remove(origin);
621 }
622 // Drop any leftover migration-failure entries so they
623 // don't count against the process's memory footprint after
624 // shutdown. The runtime is permanently non-functional at
625 // this point, so no one will consume them.
626 if let Ok(mut map) = self.inner.recent_failures.lock() {
627 map.clear();
628 }
629 // Uninstall the migration subprotocol handler. The
630 // handler carries `Arc` clones into our `Inner` — leaving
631 // it installed keeps the runtime's internals alive via
632 // the mesh even after we've drained every registry, and
633 // would accept inbound migration traffic that now
634 // unconditionally fails (empty registry). The happy-path
635 // teardown should leave the mesh in the same shape it
636 // had before `start()`.
637 self.inner.mesh.inner().clear_migration_handler();
638 Ok(())
639 }
640
641 /// **Test-only.** Force the readiness predicate seen by the
642 /// migration dispatcher to return `false` regardless of
643 /// lifecycle state — simulates a target that's still in
644 /// `Registering` even after `start()` has run. Lets
645 /// integration tests exercise the `NotReady` retry path
646 /// without racing against runtime startup.
647 ///
648 /// No effect on `is_ready()` or `spawn` / `stop` — those use
649 /// the underlying `state` directly. Only the dispatcher's
650 /// readiness predicate is affected.
651 pub fn simulate_not_ready(&self, flag: bool) {
652 self.inner.simulate_not_ready.store(flag, Ordering::Release);
653 }
654
655 /// **Test-only.** Inject a sleep inside `spawn` between the
656 /// initial `require_ready` check and the registry inserts,
657 /// giving integration tests a deterministic window to race
658 /// `shutdown` against. Duration is stored as millis; `0`
659 /// disables.
660 #[doc(hidden)]
661 pub fn set_spawn_stall_ms(&self, millis: u32) {
662 self.inner.spawn_stall_ms.store(millis, Ordering::Release);
663 }
664
665 /// **Test-only.** Inject a sleep inside `start` between the
666 /// handler install and the Registering→Ready CAS. Used to
667 /// deterministically race `shutdown` against a mid-flight
668 /// `start` so tests can verify the handler-cleanup path.
669 #[doc(hidden)]
670 pub fn set_start_stall_ms(&self, millis: u32) {
671 self.inner.start_stall_ms.store(millis, Ordering::Release);
672 }
673
674 /// Readiness accessor for tests + operators. `true` iff the
675 /// runtime has transitioned to `Ready` and has not yet begun
676 /// shutting down.
677 pub fn is_ready(&self) -> bool {
678 self.state() == State::Ready
679 }
680
681 /// Spawn a daemon of `kind` under the caller-provided
682 /// [`Identity`]. The identity's keypair seeds the daemon's
683 /// `origin_hash` + `entity_id`; the runtime registers both the
684 /// live host and a kind-keyed factory in the core registry so a
685 /// future migration target can reconstruct the daemon through
686 /// the existing `DaemonFactoryRegistry::construct` path.
687 ///
688 /// The returned [`DaemonHandle`] is clone-safe; dropping it does
689 /// not stop the daemon. Call [`Self::stop`] explicitly.
690 pub async fn spawn(
691 &self,
692 kind: &str,
693 identity: Identity,
694 config: DaemonHostConfig,
695 ) -> Result<DaemonHandle, DaemonError> {
696 self.require_ready()?;
697 // Test-only stall between the readiness check and the
698 // registry inserts. In production `spawn_stall_ms` is
699 // always 0 and this branch is a no-op.
700 let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
701 if stall_ms > 0 {
702 tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
703 }
704 let factory = self.factory_for_kind(kind)?;
705 let daemon = (factory)();
706 let keypair = identity.keypair().as_ref().clone();
707 let origin_hash = keypair.origin_hash();
708 let entity_id = keypair.entity_id().clone();
709
710 // Mirror the factory into the core registry BEFORE registering
711 // the host, so a migration-target handler that catches up on
712 // this origin mid-spawn always sees a consistent view. Atomic
713 // on collision: if another daemon already claims this
714 // `origin_hash`, bail without mutating either registry —
715 // otherwise the rollback below would strip the factory entry
716 // for the *existing* daemon and silently break its future
717 // migratability.
718 let factory_for_core = factory.clone();
719 self.inner
720 .factory_registry
721 .register(keypair.clone(), config.clone(), move || {
722 (factory_for_core)()
723 })
724 .map_err(DaemonError::Core)?;
725
726 let host = DaemonHost::new(daemon, keypair, config);
727 // `DaemonRegistry::register` errors on origin_hash collisions
728 // — two daemons can't share the same identity.
729 //
730 // Rolling back our `factory_registry` insert is safe
731 // because `factory_registry::register` is atomic on
732 // collision: since our call above *succeeded*, the slot
733 // was empty and we exclusively own it. No other code path
734 // replaces an occupied slot (`register` /
735 // `register_placeholder` both error on collision;
736 // `remove` / `take` only remove their caller's own
737 // entries), so the entry we're about to remove is still
738 // ours. The rollback cannot affect a pre-existing
739 // placeholder or another daemon's factory entry.
740 if let Err(e) = self.inner.registry.register(host) {
741 self.inner.factory_registry.remove(origin_hash);
742 return Err(DaemonError::Core(e));
743 }
744
745 // Post-insert fence: `shutdown` may have raced past our
746 // initial `require_ready()` and already swept the
747 // registries. In that case our entries are either already
748 // gone (shutdown saw them in its sweep) or will outlive
749 // shutdown's sweep (we inserted after `list()`). Roll back
750 // unconditionally under the ShuttingDown branch so no
751 // zombie daemon survives the torn-down runtime; the
752 // rollback's `unregister` / `remove` calls are idempotent
753 // no-ops if shutdown already drained our slot.
754 if self.state() == State::ShuttingDown {
755 let _ = self.inner.registry.unregister(origin_hash);
756 self.inner.factory_registry.remove(origin_hash);
757 return Err(DaemonError::ShuttingDown);
758 }
759
760 Ok(DaemonHandle {
761 origin_hash,
762 entity_id,
763 inner: self.inner.clone(),
764 })
765 }
766
767 /// Spawn a daemon with a caller-supplied `MeshDaemon` instance,
768 /// bypassing the SDK-side kind-factory lookup.
769 ///
770 /// Used by language-binding layers (currently: the NAPI `compute`
771 /// module) that build daemon instances via cross-FFI dispatch —
772 /// the factory closure for such a daemon can't be a plain
773 /// `Fn() -> Box<dyn MeshDaemon>` because constructing the
774 /// daemon requires an awaitable call into the host language.
775 /// The binding does the await itself, hands in the resulting
776 /// `Box<dyn MeshDaemon>`, and this method does the rest of
777 /// what [`Self::spawn`] does: register the `(origin_hash →
778 /// kind-factory)` mirror in the core registry so future
779 /// migrations can reconstruct the daemon, insert the host,
780 /// and run the same shutdown-race fence.
781 ///
782 /// `kind_factory` is the closure the core registry stores for
783 /// migration-target reconstruction; it must be re-callable
784 /// (migration targets call it when they restore the daemon
785 /// on another node). Bindings typically build this by cloning
786 /// the same TSFN used for the initial spawn.
787 pub async fn spawn_with_daemon<F>(
788 &self,
789 identity: Identity,
790 config: DaemonHostConfig,
791 daemon: Box<dyn MeshDaemon>,
792 kind_factory: F,
793 ) -> Result<DaemonHandle, DaemonError>
794 where
795 F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
796 {
797 self.require_ready()?;
798 // Same test-only stall as `spawn` — tests race shutdown
799 // against this path too.
800 let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
801 if stall_ms > 0 {
802 tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
803 }
804 let keypair = identity.keypair().as_ref().clone();
805 let origin_hash = keypair.origin_hash();
806 let entity_id = keypair.entity_id().clone();
807
808 // Mirror the caller-supplied kind_factory into the core
809 // registry. Same atomic-register semantics as `spawn` —
810 // if another daemon already claims this origin_hash, bail
811 // without touching state.
812 self.inner
813 .factory_registry
814 .register(keypair.clone(), config.clone(), kind_factory)
815 .map_err(DaemonError::Core)?;
816
817 let host = DaemonHost::new(daemon, keypair, config);
818 if let Err(e) = self.inner.registry.register(host) {
819 self.inner.factory_registry.remove(origin_hash);
820 return Err(DaemonError::Core(e));
821 }
822
823 // Post-insert shutdown fence, matching `spawn`. Without
824 // this, a concurrent `shutdown()` that raced past our
825 // `require_ready` check and already swept the registries
826 // would leave a zombie daemon live in the torn-down
827 // runtime.
828 if self.state() == State::ShuttingDown {
829 let _ = self.inner.registry.unregister(origin_hash);
830 self.inner.factory_registry.remove(origin_hash);
831 return Err(DaemonError::ShuttingDown);
832 }
833
834 Ok(DaemonHandle {
835 origin_hash,
836 entity_id,
837 inner: self.inner.clone(),
838 })
839 }
840
841 /// Spawn a daemon from a caller-supplied instance and restore its
842 /// state from `snapshot`, bypassing the SDK-side kind-factory
843 /// lookup. Parallels [`Self::spawn_with_daemon`] for restore.
844 ///
845 /// Used by language-binding layers whose daemons are built via
846 /// cross-FFI dispatch — construction goes through the host
847 /// language, then the binding hands the built `Box<dyn MeshDaemon>`
848 /// (already wired to its TSFN bridge) plus the `kind_factory`
849 /// closure used by the core registry for migration-target
850 /// reconstruction.
851 pub async fn spawn_from_snapshot_with_daemon<F>(
852 &self,
853 identity: Identity,
854 snapshot: StateSnapshot,
855 config: DaemonHostConfig,
856 daemon: Box<dyn MeshDaemon>,
857 kind_factory: F,
858 ) -> Result<DaemonHandle, DaemonError>
859 where
860 F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
861 {
862 self.require_ready()?;
863 let keypair = identity.keypair().as_ref().clone();
864 let origin_hash = keypair.origin_hash();
865 let entity_id = keypair.entity_id().clone();
866
867 // Full `entity_id` comparison — see `spawn_from_snapshot` for
868 // the birthday-collision rationale.
869 if snapshot.entity_id != entity_id {
870 return Err(DaemonError::SnapshotIdentityMismatch {
871 snapshot: snapshot.entity_id.origin_hash(),
872 identity: origin_hash,
873 });
874 }
875
876 self.inner
877 .factory_registry
878 .register(keypair.clone(), config.clone(), kind_factory)
879 .map_err(DaemonError::Core)?;
880
881 let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
882 Ok(h) => h,
883 Err(e) => {
884 self.inner.factory_registry.remove(origin_hash);
885 return Err(DaemonError::Core(e));
886 }
887 };
888
889 if let Err(e) = self.inner.registry.register(host) {
890 self.inner.factory_registry.remove(origin_hash);
891 return Err(DaemonError::Core(e));
892 }
893
894 if self.state() == State::ShuttingDown {
895 let _ = self.inner.registry.unregister(origin_hash);
896 self.inner.factory_registry.remove(origin_hash);
897 return Err(DaemonError::ShuttingDown);
898 }
899
900 Ok(DaemonHandle {
901 origin_hash,
902 entity_id,
903 inner: self.inner.clone(),
904 })
905 }
906
907 /// Spawn a daemon of `kind` and restore its state from `snapshot`.
908 /// The snapshot's `entity_id` must match the caller's
909 /// [`Identity`]; mismatch returns
910 /// [`DaemonError::SnapshotIdentityMismatch`] before any side
911 /// effects land.
912 pub async fn spawn_from_snapshot(
913 &self,
914 kind: &str,
915 identity: Identity,
916 snapshot: StateSnapshot,
917 config: DaemonHostConfig,
918 ) -> Result<DaemonHandle, DaemonError> {
919 self.require_ready()?;
920 let factory = self.factory_for_kind(kind)?;
921 let keypair = identity.keypair().as_ref().clone();
922 let origin_hash = keypair.origin_hash();
923 let entity_id = keypair.entity_id().clone();
924
925 // Compare the **full** 32-byte `entity_id`, not just the
926 // 64-bit `origin_hash` projection. `origin_hash` is a
927 // birthday-bounded 64-bit hash of the ed25519 public key;
928 // two legitimately-different identities can collide on
929 // `origin_hash` with probability ~2^-32 after ~4B daemons.
930 // A collision would let the *wrong* identity restore the
931 // snapshot, producing a daemon signed under one pubkey
932 // but claiming to be another — downstream signatures then
933 // verify against the wrong key and the daemon silently
934 // produces outputs no peer accepts.
935 if snapshot.entity_id != entity_id {
936 return Err(DaemonError::SnapshotIdentityMismatch {
937 snapshot: snapshot.entity_id.origin_hash(),
938 identity: origin_hash,
939 });
940 }
941
942 let daemon = (factory)();
943 // Atomic register: collision here means some other daemon
944 // (live or a previous spawn_from_snapshot in-flight) already
945 // owns this `origin_hash`. Bail without touching the other
946 // daemon's state — the later rollback would otherwise remove
947 // the victim's factory entry and silently break its future
948 // migratability.
949 let factory_for_core = factory.clone();
950 self.inner
951 .factory_registry
952 .register(keypair.clone(), config.clone(), move || {
953 (factory_for_core)()
954 })
955 .map_err(DaemonError::Core)?;
956
957 let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
958 Ok(h) => h,
959 Err(e) => {
960 // Same rollback-safety argument as in `spawn`:
961 // atomic `factory_registry::register` above means
962 // we exclusively own the slot, and nothing else
963 // can replace an occupied slot. Removing here
964 // cleans up strictly our insert — an adjacent
965 // pre-existing placeholder (e.g. from
966 // `expect_migration`) would have made our
967 // register fail atomically upstream, and we'd
968 // never have reached this branch.
969 self.inner.factory_registry.remove(origin_hash);
970 return Err(DaemonError::Core(e));
971 }
972 };
973
974 if let Err(e) = self.inner.registry.register(host) {
975 // Same ownership argument — the atomic register
976 // above means we own this slot exclusively.
977 self.inner.factory_registry.remove(origin_hash);
978 return Err(DaemonError::Core(e));
979 }
980
981 // Post-insert fence against a concurrent `shutdown` — see
982 // the matching comment in `spawn`.
983 if self.state() == State::ShuttingDown {
984 let _ = self.inner.registry.unregister(origin_hash);
985 self.inner.factory_registry.remove(origin_hash);
986 return Err(DaemonError::ShuttingDown);
987 }
988
989 Ok(DaemonHandle {
990 origin_hash,
991 entity_id,
992 inner: self.inner.clone(),
993 })
994 }
995
996 /// Stop a daemon, removing it from the runtime's registry. Valid
997 /// while `Ready` and (idempotently) during `ShuttingDown` —
998 /// `ShuttingDown` paths through here are a no-op because the
999 /// shutdown sweep has already drained the registry.
1000 pub async fn stop(&self, origin_hash: u64) -> Result<(), DaemonError> {
1001 if self.state() == State::Registering {
1002 return Err(DaemonError::NotReady);
1003 }
1004 // Treat a missing daemon as success during ShuttingDown —
1005 // the shutdown sweep drained it.
1006 match self.inner.registry.unregister(origin_hash) {
1007 Ok(_) => {
1008 self.inner.factory_registry.remove(origin_hash);
1009 Ok(())
1010 }
1011 Err(CoreDaemonError::NotFound(_)) if self.state() == State::ShuttingDown => Ok(()),
1012 Err(e) => Err(DaemonError::Core(e)),
1013 }
1014 }
1015
1016 /// Take a snapshot of a running daemon by `origin_hash`. Returns
1017 /// `Ok(None)` when the daemon is stateless.
1018 pub async fn snapshot(&self, origin_hash: u64) -> Result<Option<StateSnapshot>, DaemonError> {
1019 self.require_ready()?;
1020 self.inner
1021 .registry
1022 .snapshot(origin_hash)
1023 .map_err(DaemonError::Core)
1024 }
1025
1026 /// Deliver one causal event to the daemon identified by
1027 /// `origin_hash`, returning the daemon's outputs wrapped in the
1028 /// host's causal chain.
1029 ///
1030 /// Stage 1 convenience — Stage 2 adds mesh-dispatched delivery
1031 /// via the causal subprotocol, and this direct path becomes
1032 /// testing sugar rather than the primary ingress.
1033 pub fn deliver(
1034 &self,
1035 origin_hash: u64,
1036 event: &CausalEvent,
1037 ) -> Result<Vec<CausalEvent>, DaemonError> {
1038 self.require_ready()?;
1039 let outputs = self
1040 .inner
1041 .registry
1042 .deliver(origin_hash, event)
1043 .map_err(DaemonError::Core)?;
1044
1045 // Fire post-delivery observers (e.g. `StandbyGroup`'s replay
1046 // buffer). Observers run AFTER a successful `process` so
1047 // replay on promotion doesn't re-apply events the active
1048 // rejected. Snapshot the observer list into a local `Vec`
1049 // so each callback runs without the map's mutex held —
1050 // prevents a misbehaving observer from blocking unrelated
1051 // deliveries and rules out re-entrant-lock deadlocks if an
1052 // observer ever calls back into `deliver`.
1053 let to_fire: Vec<DeliverObserver> = {
1054 let map = self
1055 .inner
1056 .observers
1057 .lock()
1058 .expect("DaemonRuntime observers mutex poisoned");
1059 map.get(&origin_hash)
1060 .map(|v| v.iter().map(|(_, cb)| cb.clone()).collect())
1061 .unwrap_or_default()
1062 };
1063 for cb in to_fire {
1064 cb(event);
1065 }
1066 Ok(outputs)
1067 }
1068
1069 /// Number of daemons currently registered.
1070 pub fn daemon_count(&self) -> usize {
1071 self.inner.registry.count()
1072 }
1073
1074 /// Current orchestrator-side migration phase for `origin_hash`,
1075 /// or `None` when no migration record exists (either never
1076 /// started here or already reached its terminal state and was
1077 /// removed). Useful for tests that assert the migration reached
1078 /// true completion (record gone via `ActivateAck`) rather than
1079 /// simply advancing to the `Complete` phase.
1080 pub fn migration_phase(&self, origin_hash: u64) -> Option<MigrationPhase> {
1081 self.inner.orchestrator.status(origin_hash)
1082 }
1083
1084 /// **Test-only.** Peek at the cached failure reason for
1085 /// `origin_hash` without consuming it.
1086 ///
1087 /// Normal `MigrationHandle::wait` code path pops the entry when
1088 /// it hits status=None; this accessor exists so regression tests
1089 /// can observe the cache's lifecycle directly (e.g. assert the
1090 /// cache is cleared by `start_migration_with`).
1091 ///
1092 /// Exposed publicly because SDK integration tests live in a
1093 /// separate crate; not part of the stable surface.
1094 #[doc(hidden)]
1095 pub fn peek_migration_failure(&self, origin_hash: u64) -> Option<MigrationFailureReason> {
1096 self.inner
1097 .recent_failures
1098 .lock()
1099 .ok()
1100 .and_then(|m| m.get(&origin_hash).cloned())
1101 }
1102
1103 /// **Test-only.** Inject a failure reason into the cache for
1104 /// `origin_hash`. Lets tests stage a "stale entry from a prior
1105 /// attempt" scenario deterministically, without having to run
1106 /// a whole losing migration to populate it.
1107 ///
1108 /// Same caveat as [`Self::peek_migration_failure`] — visible
1109 /// because SDK integration tests live out-of-crate; not part of
1110 /// the stable surface.
1111 #[doc(hidden)]
1112 pub fn inject_migration_failure(&self, origin_hash: u64, reason: MigrationFailureReason) {
1113 if let Ok(mut m) = self.inner.recent_failures.lock() {
1114 m.insert(origin_hash, reason);
1115 }
1116 }
1117
1118 /// Snapshot the daemon's subscription ledger — a cloned view of
1119 /// every `(publisher, channel)` pair the daemon has subscribed
1120 /// to via [`Self::subscribe_channel`]. Used by the migration
1121 /// target path to drive replay and by tests / operators to
1122 /// observe what a daemon is subscribed to.
1123 pub fn subscriptions(&self, origin_hash: u64) -> Result<Vec<SubscriptionBinding>, DaemonError> {
1124 self.inner
1125 .registry
1126 .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1127 .map_err(DaemonError::Core)
1128 }
1129
1130 /// Subscribe a specific daemon to a channel on a remote
1131 /// publisher. Routes through the mesh's membership subprotocol
1132 /// and **records the subscription in the daemon's ledger** so
1133 /// a migration target can replay it after cutover. Users should
1134 /// use this method (rather than reaching through
1135 /// `rt.mesh().inner().subscribe_channel_*`) for daemon-owned
1136 /// subscriptions; otherwise the subscription travels with the
1137 /// node, not the daemon, and silently drops on migration.
1138 ///
1139 /// Flow:
1140 /// 1. Hit the publisher's membership endpoint via
1141 /// `Mesh::subscribe_channel_with_token` (or the
1142 /// no-token variant).
1143 /// 2. On success, record
1144 /// `(publisher, channel) → SubscriptionBinding` in the
1145 /// host's ledger.
1146 /// 3. On wire failure, no ledger mutation.
1147 ///
1148 /// `token` is the caller-owned [`PermissionToken`] for
1149 /// token-gated channels; `None` for open channels.
1150 pub async fn subscribe_channel(
1151 &self,
1152 origin_hash: u64,
1153 publisher: u64,
1154 channel: ChannelName,
1155 token: Option<PermissionToken>,
1156 ) -> Result<(), DaemonError> {
1157 self.require_ready()?;
1158 if !self.inner.registry.contains(origin_hash) {
1159 return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1160 }
1161 // Capture serialized token bytes for the ledger BEFORE
1162 // handing ownership to the mesh. The mesh call consumes the
1163 // token by value on the token-path.
1164 let token_bytes = token.as_ref().map(|t| t.to_bytes().to_vec());
1165 let result = match token {
1166 Some(tok) => {
1167 self.inner
1168 .mesh
1169 .inner()
1170 .subscribe_channel_with_token(publisher, channel.clone(), tok)
1171 .await
1172 }
1173 None => {
1174 self.inner
1175 .mesh
1176 .inner()
1177 .subscribe_channel(publisher, channel.clone())
1178 .await
1179 }
1180 };
1181 result.map_err(|e| {
1182 DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1183 "subscribe_channel failed: {e}"
1184 )))
1185 })?;
1186
1187 // Mesh accepted the subscribe — record in the ledger.
1188 // `with_host` reaches into the registry's per-daemon mutex;
1189 // since this is SDK-level code we do the minimum work
1190 // under the lock (a single DashMap insert).
1191 if let Err(e) = self.inner.registry.with_host(origin_hash, |host| {
1192 host.record_subscription(publisher, channel, token_bytes);
1193 }) {
1194 return Err(DaemonError::Core(e));
1195 }
1196 Ok(())
1197 }
1198
1199 /// Unsubscribe a specific daemon from a channel. Symmetric to
1200 /// [`Self::subscribe_channel`]: mesh wire call first, then
1201 /// ledger update.
1202 pub async fn unsubscribe_channel(
1203 &self,
1204 origin_hash: u64,
1205 publisher: u64,
1206 channel: ChannelName,
1207 ) -> Result<(), DaemonError> {
1208 self.require_ready()?;
1209 if !self.inner.registry.contains(origin_hash) {
1210 return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
1211 }
1212 self.inner
1213 .mesh
1214 .inner()
1215 .unsubscribe_channel(publisher, channel.clone())
1216 .await
1217 .map_err(|e| {
1218 DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
1219 "unsubscribe_channel failed: {e}"
1220 )))
1221 })?;
1222 let _ = self.inner.registry.with_host(origin_hash, |host| {
1223 host.forget_subscription(publisher, &channel);
1224 });
1225 Ok(())
1226 }
1227
1228 /// Pre-register a factory on the target node keyed by the
1229 /// daemon's `origin_hash`, using the caller-supplied `Identity`
1230 /// as the **fallback** keypair.
1231 ///
1232 /// Use this when:
1233 /// - The caller genuinely has the daemon's keypair on hand
1234 /// (typical: test harnesses that share the same
1235 /// `Identity` between source and target runtimes).
1236 /// - Migration runs with `transport_identity = false`, so the
1237 /// snapshot carries no envelope and the target needs a
1238 /// matching keypair pre-provisioned.
1239 ///
1240 /// For the common envelope-transport case where the target
1241 /// doesn't know the daemon's private key ahead of time,
1242 /// prefer [`Self::expect_migration`] — it registers a
1243 /// placeholder factory keyed only on `origin_hash`, and the
1244 /// envelope in the migration snapshot supplies the real
1245 /// keypair at restore time.
1246 pub fn register_migration_target_identity(
1247 &self,
1248 kind: &str,
1249 identity: Identity,
1250 config: DaemonHostConfig,
1251 ) -> Result<(), DaemonError> {
1252 if self.state() == State::ShuttingDown {
1253 return Err(DaemonError::ShuttingDown);
1254 }
1255 let factory = self.factory_for_kind(kind)?;
1256 let keypair = identity.keypair().as_ref().clone();
1257 let origin_hash = keypair.origin_hash();
1258 let factory_clone = factory.clone();
1259 self.inner
1260 .factory_registry
1261 .register(keypair, config, move || (factory_clone)())
1262 .map_err(DaemonError::Core)?;
1263 // Post-insert fence — a concurrent `shutdown` may have
1264 // raced past the initial state check. Roll back so no
1265 // factory entry outlives the torn-down runtime.
1266 if self.state() == State::ShuttingDown {
1267 self.inner.factory_registry.remove(origin_hash);
1268 return Err(DaemonError::ShuttingDown);
1269 }
1270 Ok(())
1271 }
1272
1273 /// Declare on the target that this node expects a migration
1274 /// for `origin_hash` of the given `kind`. Registers a
1275 /// **placeholder** factory in the core registry — no matching
1276 /// keypair required, because the migration snapshot's
1277 /// [`IdentityEnvelope`](::net::adapter::net::identity::IdentityEnvelope)
1278 /// carries the real keypair and the dispatcher overrides the
1279 /// placeholder at restore time.
1280 ///
1281 /// Fails cleanly if the source migrates without an envelope
1282 /// (e.g., `MigrationOpts { transport_identity: false }`) —
1283 /// the target's factory has no keypair and the dispatcher
1284 /// emits `IdentityTransportFailed`. Use
1285 /// [`Self::register_migration_target_identity`] with a shared
1286 /// identity for the explicit public-identity-migration case.
1287 ///
1288 /// Landing this method closes the seam documented in the
1289 /// `envelope_overrides_target_placeholder_keypair` test of
1290 /// Stage 5b of the identity-migration plan — targets can now
1291 /// pre-register for a migration by `origin_hash` alone.
1292 pub fn expect_migration(
1293 &self,
1294 kind: &str,
1295 origin_hash: u64,
1296 config: DaemonHostConfig,
1297 ) -> Result<(), DaemonError> {
1298 if self.state() == State::ShuttingDown {
1299 return Err(DaemonError::ShuttingDown);
1300 }
1301 let factory = self.factory_for_kind(kind)?;
1302 let factory_clone = factory.clone();
1303 self.inner
1304 .factory_registry
1305 .register_placeholder(origin_hash, config, move || (factory_clone)())
1306 .map_err(DaemonError::Core)?;
1307 // Post-insert fence against a concurrent `shutdown` — see
1308 // the matching comment in `register_migration_target_identity`.
1309 if self.state() == State::ShuttingDown {
1310 self.inner.factory_registry.remove(origin_hash);
1311 return Err(DaemonError::ShuttingDown);
1312 }
1313 Ok(())
1314 }
1315
1316 /// Start migrating a daemon from `source_node` to `target_node`.
1317 /// The orchestrator runs on this node regardless of who owns
1318 /// the daemon — call this on whichever node wants to drive the
1319 /// migration state machine.
1320 ///
1321 /// Returns a [`MigrationHandle`] whose [`MigrationHandle::wait`]
1322 /// resolves when the migration reaches a terminal state
1323 /// (`Complete` on success, `MigrationError` on abort / failure).
1324 ///
1325 /// For the common local-source case (`source_node ==
1326 /// mesh.node_id()`), the snapshot is taken synchronously inside
1327 /// this call and `SnapshotReady` is shipped to the target. For
1328 /// a remote source, the orchestrator sends `TakeSnapshot` to
1329 /// the source and drives the rest of the state machine from
1330 /// inbound wire messages.
1331 pub async fn start_migration(
1332 &self,
1333 origin_hash: u64,
1334 source_node: u64,
1335 target_node: u64,
1336 ) -> Result<MigrationHandle, DaemonError> {
1337 self.start_migration_with(
1338 origin_hash,
1339 source_node,
1340 target_node,
1341 MigrationOpts::default(),
1342 )
1343 .await
1344 }
1345
1346 /// `start_migration` with caller-supplied options. Stage 6 of
1347 /// [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1348 /// lets the caller opt out of identity transport when the daemon
1349 /// doesn't need to sign anything on the target.
1350 pub async fn start_migration_with(
1351 &self,
1352 origin_hash: u64,
1353 source_node: u64,
1354 target_node: u64,
1355 opts: MigrationOpts,
1356 ) -> Result<MigrationHandle, DaemonError> {
1357 self.require_ready()?;
1358 // Clear any stale failure reason from a prior migration
1359 // attempt for this same `origin_hash`. Without this, if the
1360 // previous attempt's `MigrationHandle` was dropped before
1361 // `wait()` ran (or never `wait()`ed at all), the dispatcher's
1362 // failure callback left an entry in `recent_failures` that
1363 // would leak into THIS attempt's `wait()` — a successful
1364 // new migration would incorrectly surface the old reason
1365 // when `wait_one_attempt` hits its None-status branch and
1366 // pops `recent_failures`.
1367 if let Ok(mut map) = self.inner.recent_failures.lock() {
1368 map.remove(&origin_hash);
1369 }
1370 let msgs = self
1371 .inner
1372 .orchestrator
1373 .start_migration(origin_hash, source_node, target_node)
1374 .map_err(DaemonError::Migration)?;
1375
1376 // Local-source path: `start_migration` builds chunked
1377 // `SnapshotReady` messages synchronously from the local
1378 // registry. If the caller asked for identity transport, seal
1379 // the envelope HERE — the dispatcher's source-side seal only
1380 // fires on the TakeSnapshot path (remote source). Sealing
1381 // operates on the whole snapshot, so reassemble the chunks,
1382 // seal, and rechunk.
1383 let msgs = if opts.transport_identity {
1384 self.maybe_seal_chunked_snapshot(origin_hash, target_node, msgs)
1385 .await?
1386 } else {
1387 msgs
1388 };
1389
1390 // Determine dest_node from the first message variant and
1391 // send all messages in order. `start_migration` returns
1392 // `TakeSnapshot` (single message) when source is remote, or
1393 // a non-empty run of `SnapshotReady` chunks when source is
1394 // local.
1395 let dest_node = match msgs.first() {
1396 Some(MigrationMessage::TakeSnapshot { .. }) => source_node,
1397 Some(MigrationMessage::SnapshotReady { .. }) => target_node,
1398 Some(other) => {
1399 let _ = self
1400 .inner
1401 .orchestrator
1402 .abort_migration(origin_hash, "unexpected initial message".into());
1403 return Err(DaemonError::Migration(MigrationError::StateFailed(
1404 format!(
1405 "orchestrator returned unexpected initial migration message: {:?}",
1406 other
1407 ),
1408 )));
1409 }
1410 None => {
1411 let _ = self
1412 .inner
1413 .orchestrator
1414 .abort_migration(origin_hash, "orchestrator returned no messages".into());
1415 return Err(DaemonError::Migration(MigrationError::StateFailed(
1416 "orchestrator returned no migration messages".into(),
1417 )));
1418 }
1419 };
1420
1421 for msg in &msgs {
1422 if let Err(e) = self.send_migration_message(dest_node, msg).await {
1423 let _ = self
1424 .inner
1425 .orchestrator
1426 .abort_migration(origin_hash, format!("initial send failed: {e}"));
1427 return Err(e);
1428 }
1429 }
1430
1431 Ok(MigrationHandle {
1432 origin_hash,
1433 source_node,
1434 target_node,
1435 runtime: self.clone(),
1436 opts,
1437 })
1438 }
1439
1440 /// Decode `snapshot_bytes`, seal an identity envelope using
1441 /// the local daemon's keypair + target's X25519 static pubkey,
1442 /// and re-encode.
1443 ///
1444 /// Called only when the caller opted into envelope transport
1445 /// via `MigrationOpts { transport_identity: true }`. The
1446 /// caller committed to the stronger guarantee at that opt-in
1447 /// point; this helper must never silently downgrade.
1448 ///
1449 /// Resolution:
1450 /// - `Ok(None)`: the snapshot already carries an envelope
1451 /// (e.g. pre-sealed upstream). Caller proceeds with the
1452 /// existing bytes — this is not a downgrade, the envelope
1453 /// is already there.
1454 /// - `Ok(Some(new_bytes))`: sealed successfully; caller should
1455 /// replace the snapshot payload.
1456 /// - `Err(_)`: any missing prerequisite (peer X25519 static
1457 /// unknown, daemon keypair absent) or seal-crypto failure.
1458 /// The caller **must** abort — silently falling back to
1459 /// unsealed transport would break the caller's opt-in
1460 /// guarantee, and the target would restore under whatever
1461 /// pre-provisioned keypair the factory registry carries
1462 /// (possibly stale, possibly absent).
1463 ///
1464 /// The NKpsk0-responder case (target was the handshake
1465 /// responder, its peer static is not surfaced by `snow`) is a
1466 /// concrete prerequisite-missing scenario that now fails
1467 /// here. Callers in that topology should use `transport_identity:
1468 /// false` explicitly, which signals "I know identity transport
1469 /// isn't reachable; proceed unsealed."
1470 fn maybe_seal_local_snapshot(
1471 &self,
1472 daemon_origin: u64,
1473 target_node: u64,
1474 snapshot_bytes: &[u8],
1475 ) -> Result<Option<Vec<u8>>, DaemonError> {
1476 let snapshot = StateSnapshot::from_bytes(snapshot_bytes).ok_or_else(|| {
1477 DaemonError::Migration(MigrationError::StateFailed(
1478 "failed to decode local snapshot for envelope sealing".into(),
1479 ))
1480 })?;
1481 if snapshot.identity_envelope.is_some() {
1482 // Upstream already sealed — not a downgrade, the
1483 // envelope is present.
1484 return Ok(None);
1485 }
1486 let Some(target_pub) = self.inner.mesh.inner().peer_static_x25519(target_node) else {
1487 return Err(DaemonError::Migration(MigrationError::StateFailed(
1488 format!(
1489 "identity transport requested but peer X25519 static for \
1490 {target_node:#x} is unknown (e.g. NKpsk0-responder \
1491 side) — cannot seal envelope; use \
1492 `transport_identity: false` to proceed unsealed"
1493 ),
1494 )));
1495 };
1496 let Some(kp) = self.inner.registry.daemon_keypair(daemon_origin) else {
1497 return Err(DaemonError::Migration(MigrationError::StateFailed(
1498 format!(
1499 "identity transport requested but daemon {daemon_origin:#x} has \
1500 no local keypair to seal with"
1501 ),
1502 )));
1503 };
1504 snapshot
1505 .with_identity_envelope(&kp, target_pub)
1506 .map(|sealed| Some(sealed.to_bytes()))
1507 .map_err(|e| {
1508 DaemonError::Migration(MigrationError::StateFailed(format!(
1509 "identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
1510 )))
1511 })
1512 }
1513
1514 /// Reassemble chunked `SnapshotReady` messages into the full
1515 /// snapshot, seal the identity envelope, and rechunk.
1516 ///
1517 /// `start_migration` returns chunked `SnapshotReady` messages on
1518 /// the local-source path; sealing operates on the whole snapshot
1519 /// so chunks must be reassembled first. If `msgs` does not start
1520 /// with `SnapshotReady` (i.e. remote-source `TakeSnapshot` path)
1521 /// the messages are returned unchanged — the dispatcher seals
1522 /// on that path.
1523 ///
1524 /// On any seal failure the orchestrator record is aborted so a
1525 /// retry starts from phase 0.
1526 async fn maybe_seal_chunked_snapshot(
1527 &self,
1528 origin_hash: u64,
1529 target_node: u64,
1530 mut msgs: Vec<MigrationMessage>,
1531 ) -> Result<Vec<MigrationMessage>, DaemonError> {
1532 if !matches!(msgs.first(), Some(MigrationMessage::SnapshotReady { .. })) {
1533 return Ok(msgs);
1534 }
1535
1536 let seq_through = match msgs.first() {
1537 Some(MigrationMessage::SnapshotReady { seq_through, .. }) => *seq_through,
1538 _ => unreachable!("checked by matches! above"),
1539 };
1540
1541 // `chunk_snapshot` emits chunks in `chunk_index` order, but
1542 // sort defensively in case a future caller hands us a
1543 // pre-mixed Vec.
1544 msgs.sort_by_key(|m| match m {
1545 MigrationMessage::SnapshotReady { chunk_index, .. } => *chunk_index,
1546 _ => 0,
1547 });
1548 let mut reassembled: Vec<u8> = Vec::new();
1549 for m in &msgs {
1550 if let MigrationMessage::SnapshotReady { snapshot_bytes, .. } = m {
1551 reassembled.extend_from_slice(snapshot_bytes);
1552 }
1553 }
1554
1555 // `transport_identity: true` is a strict opt-in:
1556 // prerequisites-missing (e.g. NKpsk0-responder) surfaces as
1557 // `Err` and aborts the migration, not a silent downgrade to
1558 // unsealed. `Ok(None)` is reserved for "snapshot already
1559 // carries an envelope" — return the original chunks.
1560 match self.maybe_seal_local_snapshot(origin_hash, target_node, &reassembled) {
1561 Ok(Some(sealed)) => chunk_snapshot(origin_hash, sealed, seq_through).map_err(|e| {
1562 let _ = self
1563 .inner
1564 .orchestrator
1565 .abort_migration(origin_hash, format!("rechunk after seal failed: {e}"));
1566 DaemonError::Migration(e)
1567 }),
1568 Ok(None) => Ok(msgs),
1569 Err(e) => {
1570 let _ = self
1571 .inner
1572 .orchestrator
1573 .abort_migration(origin_hash, format!("envelope seal failed: {e}"));
1574 Err(e)
1575 }
1576 }
1577 }
1578
1579 async fn send_migration_message(
1580 &self,
1581 dest_node: u64,
1582 msg: &MigrationMessage,
1583 ) -> Result<(), DaemonError> {
1584 let addr = self
1585 .inner
1586 .mesh
1587 .inner()
1588 .peer_addr(dest_node)
1589 .ok_or(DaemonError::Migration(MigrationError::TargetUnavailable(
1590 dest_node,
1591 )))?;
1592 let bytes = migration_wire::encode(msg).map_err(DaemonError::Migration)?;
1593 self.inner
1594 .mesh
1595 .inner()
1596 .send_subprotocol(addr, SUBPROTOCOL_MIGRATION, &bytes)
1597 .await
1598 .map_err(|e| {
1599 DaemonError::Migration(MigrationError::StateFailed(format!(
1600 "send_subprotocol failed: {e}"
1601 )))
1602 })
1603 }
1604
1605 /// Underlying mesh. Exposed read-only so the caller can still
1606 /// reach the channel / subscribe / publish surface without
1607 /// reaching around the runtime.
1608 pub fn mesh(&self) -> &Arc<Mesh> {
1609 &self.inner.mesh
1610 }
1611
1612 // ------------ internal helpers ------------
1613
1614 fn state(&self) -> State {
1615 State::from_u8(self.inner.state.load(Ordering::Acquire))
1616 }
1617
1618 fn require_ready(&self) -> Result<(), DaemonError> {
1619 match self.state() {
1620 State::Ready => Ok(()),
1621 State::Registering => Err(DaemonError::NotReady),
1622 State::ShuttingDown => Err(DaemonError::ShuttingDown),
1623 }
1624 }
1625
1626 fn factory_for_kind(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1627 self.inner
1628 .factories
1629 .read()
1630 .expect("factory map poisoned")
1631 .get(kind)
1632 .cloned()
1633 .ok_or_else(|| DaemonError::FactoryNotFound(kind.to_string()))
1634 }
1635
1636 // ─── `groups` feature accessors ─────────────────────────────────
1637 //
1638 // These surface the internal `Scheduler` / `DaemonRegistry` /
1639 // factory closures that the `groups` module (ReplicaGroup /
1640 // ForkGroup / StandbyGroup) needs to wire into core group
1641 // constructors. Kept `pub(crate)` so they don't leak into the
1642 // SDK's public API — group types wrap them in their own
1643 // ergonomic surfaces and callers stay on the clean side of the
1644 // boundary.
1645
1646 /// Shared scheduler for capability-based placement.
1647 #[cfg(feature = "groups")]
1648 pub(crate) fn scheduler_arc(&self) -> Arc<Scheduler> {
1649 self.inner.scheduler.clone()
1650 }
1651
1652 /// Shared migration orchestrator. The
1653 /// `OrchestratorMigrationSnapshotSource` adapter wraps this
1654 /// `Arc` to bridge the compute-layer orchestrator's
1655 /// in-flight state into the MeshOS snapshot's
1656 /// `in_flight_migrations` field. Exposed `pub(crate)` so
1657 /// the in-crate `testing` harness builds the source
1658 /// out-of-band; the adapter type itself lives in
1659 /// `net::adapter::net::behavior::meshos::migration_snapshot_source`.
1660 #[cfg(feature = "testing")]
1661 pub(crate) fn migration_orchestrator_arc(&self) -> Arc<MigrationOrchestrator> {
1662 self.inner.orchestrator.clone()
1663 }
1664
1665 /// Shared daemon registry (group members register/unregister
1666 /// here alongside direct-spawn daemons).
1667 #[cfg(feature = "groups")]
1668 pub(crate) fn registry_arc(&self) -> Arc<DaemonRegistry> {
1669 self.inner.registry.clone()
1670 }
1671
1672 /// Look up a factory closure by kind. Used by group constructors
1673 /// to build members via the same factory the caller registered
1674 /// with `register_factory`.
1675 #[cfg(feature = "groups")]
1676 pub(crate) fn factory_for_kind_pub(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
1677 self.factory_for_kind(kind)
1678 }
1679
1680 /// `true` iff the runtime is in the `Ready` state. Groups use
1681 /// this to gate `spawn` (rejecting early-spawn calls with a
1682 /// typed `GroupError::NotReady`).
1683 #[cfg(feature = "groups")]
1684 pub(crate) fn is_ready_pub(&self) -> bool {
1685 self.state() == State::Ready
1686 }
1687}
1688
1689/// Handle to a running daemon. Clone-safe; dropping does not stop
1690/// the daemon — call [`DaemonRuntime::stop`] explicitly.
1691#[derive(Clone)]
1692pub struct DaemonHandle {
1693 /// Daemon's 64-bit origin hash. Stable for the daemon's lifetime
1694 /// and across migrations.
1695 pub origin_hash: u64,
1696 /// Daemon's full 32-byte entity id.
1697 pub entity_id: EntityId,
1698 inner: Arc<Inner>,
1699}
1700
1701impl DaemonHandle {
1702 /// Read the daemon's current stats.
1703 pub fn stats(&self) -> Result<DaemonStats, DaemonError> {
1704 self.inner
1705 .registry
1706 .stats(self.origin_hash)
1707 .map_err(DaemonError::Core)
1708 }
1709
1710 /// Take a snapshot of the daemon's current state. `Ok(None)` for
1711 /// stateless daemons.
1712 pub async fn snapshot(&self) -> Result<Option<StateSnapshot>, DaemonError> {
1713 self.inner
1714 .registry
1715 .snapshot(self.origin_hash)
1716 .map_err(DaemonError::Core)
1717 }
1718}
1719
1720impl std::fmt::Debug for DaemonRuntime {
1721 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1722 let factory_count = self
1723 .inner
1724 .factories
1725 .read()
1726 .map(|m| m.len())
1727 .unwrap_or_default();
1728 f.debug_struct("DaemonRuntime")
1729 .field("state", &self.state())
1730 .field("factories", &factory_count)
1731 .field("daemons", &self.inner.registry.count())
1732 .finish()
1733 }
1734}
1735
1736impl std::fmt::Debug for DaemonHandle {
1737 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1738 f.debug_struct("DaemonHandle")
1739 .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
1740 .field("entity_id", &self.entity_id)
1741 .finish()
1742 }
1743}
1744
1745/// Stage 3 of `DAEMON_CHANNEL_REBIND_PLAN.md` — after a migration
1746/// target restores a daemon, walk its subscription ledger and
1747/// re-send each `subscribe_channel` to the matching publisher so
1748/// messages flow to the target without waiting for the source's
1749/// entry to age out of the publisher's roster. Errors are
1750/// per-subscription; one publisher being offline doesn't fail the
1751/// rest.
1752///
1753/// Runs in a tokio task spawned by the post-restore callback
1754/// installed on `MigrationSubprotocolHandler`.
1755async fn replay_subscriptions(inner: Arc<Inner>, origin_hash: u64) {
1756 let bindings = match inner
1757 .registry
1758 .with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
1759 {
1760 Ok(list) => list,
1761 Err(_) => return,
1762 };
1763 for sub in bindings {
1764 let token = sub
1765 .token_bytes
1766 .as_deref()
1767 .and_then(|bytes| PermissionToken::from_bytes(bytes).ok());
1768 let result = match token {
1769 Some(tok) => {
1770 inner
1771 .mesh
1772 .inner()
1773 .subscribe_channel_with_token(sub.publisher, sub.channel.clone(), tok)
1774 .await
1775 }
1776 None => {
1777 inner
1778 .mesh
1779 .inner()
1780 .subscribe_channel(sub.publisher, sub.channel.clone())
1781 .await
1782 }
1783 };
1784 if let Err(e) = result {
1785 // Non-fatal: one subscription failing (publisher
1786 // offline, token expired, etc.) must not take down the
1787 // rest of the ledger's replay. The SDK doesn't depend
1788 // on `tracing`; drop the failure to stderr via
1789 // `eprintln!` so operators running `RUST_LOG=warn`
1790 // still see it. Future work can add a `ReplayPartial`
1791 // event on the migration phase stream (plan §
1792 // *Error surface*) to surface failures programmatically.
1793 eprintln!(
1794 "channel re-bind replay failed: daemon={:#x} channel={} publisher={:#x} error={}",
1795 origin_hash, sub.channel, sub.publisher, e,
1796 );
1797 }
1798 }
1799}
1800
1801/// Stage 4 of `DAEMON_CHANNEL_REBIND_PLAN.md` — fires at `Cutover`
1802/// on the source node (just before daemon cleanup). Walks the
1803/// daemon's ledger and sends `unsubscribe_channel` to each
1804/// publisher so rosters drop the source without waiting for
1805/// session-timeout (~30 s). Fire-and-forget: we don't block the
1806/// cutover dispatch on acks.
1807async fn teardown_subscriptions(inner: Arc<Inner>, bindings: Vec<SubscriptionBinding>) {
1808 for sub in bindings {
1809 if let Err(e) = inner
1810 .mesh
1811 .inner()
1812 .unsubscribe_channel(sub.publisher, sub.channel.clone())
1813 .await
1814 {
1815 // Non-fatal; the publisher's session timeout will
1816 // eventually clean up our stale roster entry even
1817 // without an explicit Unsubscribe.
1818 eprintln!(
1819 "channel re-bind teardown failed: channel={} publisher={:#x} error={}",
1820 sub.channel, sub.publisher, e,
1821 );
1822 }
1823 }
1824}
1825
1826/// Options for [`DaemonRuntime::start_migration_with`].
1827///
1828/// - Stage 6 of
1829/// [`DAEMON_IDENTITY_MIGRATION_PLAN.md`](../../../docs/DAEMON_IDENTITY_MIGRATION_PLAN.md):
1830/// the `transport_identity` flag. Default `true`.
1831/// - Stages 3 + 4 of
1832/// [`DAEMON_RUNTIME_READINESS_PLAN.md`](../../../docs/DAEMON_RUNTIME_READINESS_PLAN.md):
1833/// the `retry_not_ready` budget. When the migration target
1834/// responds `NotReady` (runtime still in `Registering`), the
1835/// source backs off + re-initiates up to this total elapsed
1836/// time. `None` disables retry; the first `NotReady` surfaces
1837/// immediately.
1838#[derive(Debug, Clone)]
1839pub struct MigrationOpts {
1840 /// If `true` (default), the source node seals its daemon's
1841 /// ed25519 seed into the outbound snapshot using the target's
1842 /// X25519 static pubkey. The target unseals on arrival and the
1843 /// migrated daemon keeps its full signing capability.
1844 ///
1845 /// If `false`, the envelope is omitted and the target
1846 /// reconstructs the daemon with a `public_only` keypair —
1847 /// identity queries (`entity_id`, `origin_hash`) work, but
1848 /// `sign` calls fail with `EntityError::ReadOnly`. Appropriate
1849 /// for pure compute daemons that only consume events and emit
1850 /// payloads, and do NOT need to mint capability announcements
1851 /// or issue permission tokens from the target.
1852 pub transport_identity: bool,
1853
1854 /// Retry budget for [`MigrationFailureReason::NotReady`].
1855 ///
1856 /// `Some(d)` (default 30 s): on `NotReady`, the source backs
1857 /// off (500 ms → 1 s → 2 s → 4 s → 8 s, capped at 16 s) and
1858 /// re-initiates the migration. The total retry clock is
1859 /// capped at `d`; after that, the caller sees
1860 /// [`MigrationFailureReason::NotReadyTimeout`].
1861 ///
1862 /// `None`: no retry. The first `NotReady` surfaces as a
1863 /// terminal failure to the caller.
1864 pub retry_not_ready: Option<std::time::Duration>,
1865}
1866
1867impl Default for MigrationOpts {
1868 fn default() -> Self {
1869 Self {
1870 transport_identity: true,
1871 retry_not_ready: Some(std::time::Duration::from_secs(30)),
1872 }
1873 }
1874}
1875
1876/// Exponential backoff for the i-th `NotReady` retry attempt.
1877/// First retry waits 500 ms, subsequent retries double up to a
1878/// 16 s cap — total budget is controlled separately by
1879/// [`MigrationOpts::retry_not_ready`]. Matching the schedule in
1880/// `DAEMON_RUNTIME_READINESS_PLAN.md` § *Source-side retry*.
1881fn not_ready_backoff(attempt: u8) -> std::time::Duration {
1882 use std::time::Duration;
1883 let ms = 500u64 << (attempt.saturating_sub(1).min(5));
1884 Duration::from_millis(ms.min(16_000))
1885}
1886
1887/// Handle to an in-flight migration. Drop the handle and the
1888/// orchestrator continues driving the migration to completion in the
1889/// background; keep it to observe phase transitions or request abort.
1890///
1891/// Cheap to clone — the backing state is shared with the
1892/// [`DaemonRuntime`] that produced it.
1893#[derive(Clone)]
1894pub struct MigrationHandle {
1895 /// Daemon being migrated.
1896 pub origin_hash: u64,
1897 /// Source node that currently hosts the daemon.
1898 pub source_node: u64,
1899 /// Target node that will host the daemon after cutover.
1900 pub target_node: u64,
1901 /// Runtime the orchestrator lives on. Used by [`Self::phase`]
1902 /// and [`Self::wait`] to poll migration state.
1903 runtime: DaemonRuntime,
1904 /// Options the migration was initiated with. Drives retry
1905 /// policy on `NotReady` + the identity-transport flag for
1906 /// re-initiated attempts.
1907 opts: MigrationOpts,
1908}
1909
1910impl MigrationHandle {
1911 /// Current migration phase, or `None` once the migration has
1912 /// left the orchestrator's records (either via `Complete` → auto
1913 /// cleanup, or via explicit abort). Callers distinguish the two
1914 /// by remembering the last non-None phase.
1915 pub fn phase(&self) -> Option<MigrationPhase> {
1916 self.runtime.inner.orchestrator.status(self.origin_hash)
1917 }
1918
1919 /// Block until the migration reaches a terminal state.
1920 ///
1921 /// Returns `Ok(())` on normal completion (saw `Complete`, then
1922 /// the orchestrator cleaned up). Returns `Err(MigrationError)`
1923 /// if the orchestrator's record disappeared without the caller
1924 /// ever having seen `Complete` — either an explicit abort or a
1925 /// failure at some upstream stage.
1926 ///
1927 /// This method does **not** enforce any wall-clock timeout. A
1928 /// migration that stalls waiting on an unresponsive peer will
1929 /// block indefinitely; callers that want a bound should use
1930 /// [`Self::wait_with_timeout`] instead.
1931 ///
1932 /// Polls every 50 ms. The implementation is deliberately
1933 /// simple — Stage 2 of `DAEMON_IDENTITY_MIGRATION_PLAN.md` and
1934 /// the V2 iteration of this plan will swap this for a
1935 /// broadcast-channel push, but 50 ms polling is plenty for the
1936 /// use cases a migration API sees today.
1937 pub async fn wait(self) -> Result<(), DaemonError> {
1938 self.wait_until(None).await
1939 }
1940
1941 /// Like [`Self::wait`] with a caller-controlled timeout. A
1942 /// timeout aborts the orchestrator-side record and returns
1943 /// `Err(MigrationError::StateFailed)`; a graceful `Complete`
1944 /// returns `Ok`.
1945 pub async fn wait_with_timeout(self, timeout: std::time::Duration) -> Result<(), DaemonError> {
1946 let deadline = tokio::time::Instant::now() + timeout;
1947 self.wait_until(Some(deadline)).await
1948 }
1949
1950 /// Inner wait loop with an optional deadline. `None` = block
1951 /// forever until the migration reaches a terminal state;
1952 /// `Some(d)` = give up and abort at `d`.
1953 ///
1954 /// Centralised here so `wait` and `wait_with_timeout` can't
1955 /// drift on the retry/backoff semantics, and so the
1956 /// "hidden 60 s ceiling" cannot reappear under `wait`.
1957 async fn wait_until(
1958 self,
1959 overall_deadline: Option<tokio::time::Instant>,
1960 ) -> Result<(), DaemonError> {
1961 let start = tokio::time::Instant::now();
1962 let retry_deadline = self.opts.retry_not_ready.map(|b| start + b);
1963 let mut attempts: u8 = 1; // first attempt initiated by start_migration_with
1964 loop {
1965 match self.wait_one_attempt(overall_deadline).await {
1966 Ok(()) => return Ok(()),
1967 Err(DaemonError::MigrationFailed(reason)) if reason.is_retriable() => {
1968 // Retry decision.
1969 let Some(retry_d) = retry_deadline else {
1970 // Opts explicitly disable retry — surface
1971 // the NotReady verbatim.
1972 return Err(DaemonError::MigrationFailed(reason));
1973 };
1974 let now = tokio::time::Instant::now();
1975 let overall_exhausted = overall_deadline.map(|d| now >= d).unwrap_or(false);
1976 if now >= retry_d || overall_exhausted {
1977 // Budget exhausted. `NotReadyTimeout` carries
1978 // the attempt count for operator diagnosis.
1979 return Err(DaemonError::MigrationFailed(
1980 MigrationFailureReason::NotReadyTimeout { attempts },
1981 ));
1982 }
1983 // Back off and re-initiate.
1984 let backoff = not_ready_backoff(attempts);
1985 tokio::time::sleep(backoff).await;
1986 attempts = attempts.saturating_add(1);
1987 // Re-initiate the migration by calling the
1988 // orchestrator fresh. The previous record has
1989 // been cleaned up by the dispatcher's
1990 // MigrationFailed handler, so this starts a new
1991 // attempt from phase 0.
1992 self.reinitiate_attempt().await?;
1993 // Loop; poll this new attempt's outcome.
1994 }
1995 Err(e) => return Err(e),
1996 }
1997 }
1998 }
1999
2000 /// Poll a single migration attempt's outcome. Returns:
2001 /// - `Ok(())` on Complete.
2002 /// - `Err(MigrationFailed(reason))` when the dispatcher
2003 /// observed a structured failure.
2004 /// - `Err(Migration(_))` on overall-timeout or unknown abort.
2005 ///
2006 /// `overall_deadline = None` disables the deadline check — the
2007 /// poll loop only returns via a terminal status transition.
2008 async fn wait_one_attempt(
2009 &self,
2010 overall_deadline: Option<tokio::time::Instant>,
2011 ) -> Result<(), DaemonError> {
2012 loop {
2013 let current_phase = self.runtime.inner.orchestrator.status(self.origin_hash);
2014 match current_phase {
2015 Some(phase) => {
2016 if phase == MigrationPhase::Complete {
2017 // Give the dispatcher a beat to finish
2018 // cleanup, then surface success.
2019 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2020 return Ok(());
2021 }
2022 }
2023 None => {
2024 // A recorded failure is authoritative — the
2025 // dispatcher populates `recent_failures` before
2026 // the orchestrator removes the record, so
2027 // status=None + recorded-reason unambiguously
2028 // means abort.
2029 if let Some(reason) = self.take_recent_failure() {
2030 return Err(DaemonError::MigrationFailed(reason));
2031 }
2032 // No recorded failure. The orchestrator removes
2033 // records via two paths:
2034 // 1. `on_activate_ack` — success.
2035 // 2. `abort_migration_with_reason` — failure,
2036 // which rides through the dispatcher's
2037 // `MigrationFailed` handler *before* the
2038 // record is dropped, so `recent_failures`
2039 // is populated first.
2040 // Therefore status=None + no-recorded-failure is
2041 // unambiguously success, regardless of what
2042 // phase we last observed. This matters when the
2043 // dispatcher runs the tail of the migration
2044 // (Cutover → Complete → ActivateAck) entirely
2045 // between two 50 ms polls — we may never observe
2046 // `Complete` explicitly.
2047 //
2048 // Synchronous abort paths that bypass the
2049 // dispatcher (`wait_one_attempt`'s own timeout,
2050 // `start_migration_with`'s send-failure path)
2051 // return `Err` to the caller *before* the wait
2052 // loop can observe the None status, so they
2053 // don't trip this branch.
2054 return Ok(());
2055 }
2056 }
2057 if let Some(d) = overall_deadline {
2058 if tokio::time::Instant::now() >= d {
2059 let _ = self
2060 .runtime
2061 .inner
2062 .orchestrator
2063 .abort_migration(self.origin_hash, "timeout".into());
2064 return Err(DaemonError::Migration(MigrationError::StateFailed(
2065 format!("migration timed out in phase {:?}", current_phase),
2066 )));
2067 }
2068 }
2069 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2070 }
2071 }
2072
2073 /// Re-initiate a migration attempt for the same daemon after a
2074 /// retriable failure. Calls the orchestrator fresh + sends the
2075 /// first wire message; mirrors the tail of
2076 /// [`DaemonRuntime::start_migration_with`] but without building
2077 /// a new handle (we keep the existing one).
2078 async fn reinitiate_attempt(&self) -> Result<(), DaemonError> {
2079 let msgs = self
2080 .runtime
2081 .inner
2082 .orchestrator
2083 .start_migration(self.origin_hash, self.source_node, self.target_node)
2084 .map_err(DaemonError::Migration)?;
2085
2086 let msgs = if self.opts.transport_identity {
2087 self.runtime
2088 .maybe_seal_chunked_snapshot(self.origin_hash, self.target_node, msgs)
2089 .await?
2090 } else {
2091 msgs
2092 };
2093
2094 let dest_node = match msgs.first() {
2095 Some(MigrationMessage::TakeSnapshot { .. }) => self.source_node,
2096 Some(MigrationMessage::SnapshotReady { .. }) => self.target_node,
2097 Some(other) => {
2098 let _ = self
2099 .runtime
2100 .inner
2101 .orchestrator
2102 .abort_migration(self.origin_hash, "unexpected retry message".into());
2103 return Err(DaemonError::Migration(MigrationError::StateFailed(
2104 format!("unexpected retry initial message: {other:?}"),
2105 )));
2106 }
2107 None => {
2108 let _ = self.runtime.inner.orchestrator.abort_migration(
2109 self.origin_hash,
2110 "orchestrator returned no retry messages".into(),
2111 );
2112 return Err(DaemonError::Migration(MigrationError::StateFailed(
2113 "orchestrator returned no migration messages on retry".into(),
2114 )));
2115 }
2116 };
2117
2118 for msg in &msgs {
2119 self.runtime.send_migration_message(dest_node, msg).await?;
2120 }
2121 Ok(())
2122 }
2123
2124 /// Pop the most recent `MigrationFailureReason` the dispatcher
2125 /// observed for this migration (if any). Consumed: subsequent
2126 /// calls see `None` until the next failure arrives.
2127 fn take_recent_failure(&self) -> Option<MigrationFailureReason> {
2128 self.runtime
2129 .inner
2130 .recent_failures
2131 .lock()
2132 .ok()?
2133 .remove(&self.origin_hash)
2134 }
2135
2136 /// Request abort. The orchestrator emits a `MigrationFailed`
2137 /// message to involved nodes and clears its record; the target
2138 /// rolls back via its own handler. Best-effort — a migration
2139 /// past `Cutover` cannot be undone cleanly because routing has
2140 /// already flipped.
2141 pub async fn cancel(&self) -> Result<(), DaemonError> {
2142 let msg = self
2143 .runtime
2144 .inner
2145 .orchestrator
2146 .abort_migration(self.origin_hash, "cancel requested".into())
2147 .map_err(DaemonError::Migration)?;
2148 // Best-effort notify — ignore send errors, the orchestrator
2149 // record is already gone on our side.
2150 let _ = self
2151 .runtime
2152 .send_migration_message(self.source_node, &msg)
2153 .await;
2154 let _ = self
2155 .runtime
2156 .send_migration_message(self.target_node, &msg)
2157 .await;
2158 Ok(())
2159 }
2160}
2161
2162impl std::fmt::Debug for MigrationHandle {
2163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2164 f.debug_struct("MigrationHandle")
2165 .field("origin_hash", &format_args!("{:#x}", self.origin_hash))
2166 .field("source_node", &format_args!("{:#x}", self.source_node))
2167 .field("target_node", &format_args!("{:#x}", self.target_node))
2168 .field("phase", &self.phase())
2169 .finish()
2170 }
2171}