Skip to main content

net/adapter/net/behavior/meshos/
sdk.rs

1//! MeshOS SDK — Rust surface. The canonical daemon-author API
2//! per [`MESHOS_SDK_PLAN.md`](../../../../../../docs/plans/MESHOS_SDK_PLAN.md).
3//!
4//! [`MeshOsDaemonHandle`] wraps a registered daemon's lifecycle:
5//! a per-daemon control-event receiver, a read-only metadata
6//! view, a capability-publish path, and a graceful-shutdown
7//! sequence. Built directly on top of the substrate primitives
8//! (`MeshOsRuntime`, `DaemonRegistry`, `DaemonHost`, `MeshDaemon`,
9//! `CapabilitySet`) — the SDK is composition + ergonomics, not
10//! new mechanism.
11//!
12//! Use pattern:
13//!
14//! ```ignore
15//! use net::adapter::net::behavior::meshos::{MeshOsRuntime, MeshOsConfig};
16//! use net::adapter::net::behavior::meshos::sdk::{MeshOsDaemonSdk, MeshOsDaemonHandle};
17//! use net::adapter::net::compute::DaemonControl;
18//!
19//! // Wrap a user dispatcher with the SDK's routing layer.
20//! let sdk = MeshOsDaemonSdk::start(MeshOsConfig::default(), my_dispatcher);
21//!
22//! // Register a daemon; receive control events via the handle.
23//! let mut handle = sdk.register_daemon(Box::new(my_daemon), keypair)?;
24//! while let Some(ev) = handle.next_control().await {
25//!     match ev {
26//!         DaemonControl::Shutdown { .. } => break,
27//!         _ => { /* react */ }
28//!     }
29//! }
30//! handle.graceful_shutdown(std::time::Duration::from_secs(5)).await?;
31//! sdk.shutdown().await?;
32//! ```
33//!
34//! Locked decisions from the plan:
35//!
36//! - **Daemon-side only.** No placement / admin / scheduler /
37//!   replica APIs in the SDK surface; consumers issue daemon
38//!   work + receive supervisor signals.
39//! - **`DaemonControl` is the wire form.** SDK consumers see
40//!   the WASM-friendly relative-ms form, not the loop-internal
41//!   `Instant`-anchored `MeshOsControl`.
42//! - **Snapshot / restore is opaque bytes.** SDK never inspects
43//!   the daemon's state shape.
44//! - **At-most-once control delivery.** When the daemon doesn't
45//!   consume a control event before the next one fires for the
46//!   same daemon, the older event drops + the router's drop
47//!   counter increments. The SDK doesn't queue control events
48//!   indefinitely on the daemon's behalf.
49//! - **Error kinds use `<<meshos-sdk-kind:KIND>>MSG`.** Matches
50//!   the discriminator format every cross-language SDK uses.
51
52use std::collections::BTreeMap;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56
57use futures::future::BoxFuture;
58use parking_lot::RwLock;
59use tokio::sync::mpsc;
60
61use crate::adapter::net::behavior::capability::CapabilitySet;
62use crate::adapter::net::compute::{
63    DaemonControl, DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
64};
65use crate::adapter::net::identity::EntityKeypair;
66
67use super::action::MeshOsAction;
68use super::config::MeshOsConfig;
69use super::event::NodeId;
70use super::executor::{ActionDispatcher, DispatchError};
71use super::maintenance::MaintenanceState;
72use super::runtime::{MeshOsRuntime, RuntimeShutdownError, RuntimeStats};
73use super::snapshot::PeerSnapshot;
74
75/// Default capacity for the per-daemon control-event channel.
76/// At-most-once delivery: if the daemon doesn't consume an event
77/// before this many newer ones queue, the oldest drops.
78pub const DEFAULT_CONTROL_CHANNEL_CAPACITY: usize = 8;
79
80/// Default grace window passed to [`MeshOsDaemonHandle::graceful_shutdown`]
81/// when no explicit value is supplied (the macro path uses this).
82pub const DEFAULT_GRACEFUL_SHUTDOWN: Duration = Duration::from_secs(5);
83
84/// SDK error surface. Carries the operator-readable message + a
85/// kind discriminator usable from cross-language consumers.
86#[derive(Clone, Debug, thiserror::Error)]
87#[error("<<meshos-sdk-kind:{kind}>>{message}")]
88pub struct SdkError {
89    /// Stable kind discriminator. Lowercase + underscore-only;
90    /// the cross-language SDKs parse the surrounding
91    /// `<<meshos-sdk-kind:…>>` envelope to extract this verbatim.
92    pub kind: &'static str,
93    /// Operator-readable message.
94    pub message: String,
95}
96
97impl SdkError {
98    fn new(kind: &'static str, message: impl Into<String>) -> Self {
99        Self {
100            kind,
101            message: message.into(),
102        }
103    }
104}
105
106impl From<DaemonError> for SdkError {
107    fn from(err: DaemonError) -> Self {
108        Self::new("register_failed", err.to_string())
109    }
110}
111
112/// Read-only view of the cluster context the daemon can observe.
113/// Built from the runtime's latest [`super::snapshot::MeshOsSnapshot`]
114/// at handle-construction time + refreshed on demand via
115/// [`MeshOsDaemonHandle::refresh_metadata`].
116#[derive(Clone, Debug)]
117pub struct MetadataView {
118    /// This node's identifier (`MeshOsConfig::this_node`).
119    pub node_id: NodeId,
120    /// The registered daemon's substrate identifier
121    /// (the keypair's `origin_hash`).
122    pub daemon_id: u64,
123    /// The daemon's `MeshDaemon::name()` at registration.
124    pub daemon_name: String,
125    /// This node's own maintenance state, snapshotted at the
126    /// last `refresh_metadata` call.
127    pub maintenance_state: MaintenanceStateView,
128    /// Per-peer summary — RTT, health, maintenance mirror.
129    pub peers: BTreeMap<NodeId, PeerSnapshot>,
130}
131
132/// Bounded WASM-friendly projection of [`MaintenanceState`].
133/// Carries the discriminator + relative-ms `since` so daemons
134/// without `Instant` access can still reason about transitions.
135#[derive(Clone, Debug, Eq, PartialEq)]
136#[non_exhaustive]
137pub enum MaintenanceStateView {
138    /// Normal participation.
139    Active,
140    /// Entering maintenance — replicas migrating, daemons draining.
141    EnteringMaintenance {
142        /// Milliseconds since the transition was entered.
143        since_ms: u64,
144        /// Milliseconds remaining until the deadline elapses,
145        /// or `None` for no deadline.
146        deadline_remaining_ms: Option<u64>,
147    },
148    /// Steady-state isolated.
149    Maintenance {
150        /// Milliseconds since the state was entered.
151        since_ms: u64,
152    },
153    /// Exiting maintenance — health revalidation + capability refresh.
154    ExitingMaintenance {
155        /// Milliseconds since the state was entered.
156        since_ms: u64,
157    },
158    /// Drain failed; operator warning state.
159    DrainFailed {
160        /// Milliseconds since the failure was recorded.
161        since_ms: u64,
162        /// Operator-readable reason.
163        reason: String,
164    },
165    /// Recovery ramp-up window.
166    Recovery {
167        /// Milliseconds since the ramp started.
168        since_ms: u64,
169    },
170}
171
172impl MaintenanceStateView {
173    /// Build a view from the substrate-side [`MaintenanceState`]
174    /// against a `now` reference for relative-ms conversion.
175    /// Useful when a consumer holds the substrate form directly
176    /// rather than reading through a snapshot.
177    pub fn from_state(state: &MaintenanceState, now: Instant) -> Self {
178        match state {
179            MaintenanceState::Active => Self::Active,
180            MaintenanceState::EnteringMaintenance { since, deadline } => {
181                Self::EnteringMaintenance {
182                    since_ms: now.saturating_duration_since(*since).as_millis() as u64,
183                    deadline_remaining_ms: deadline
184                        .map(|d| d.saturating_duration_since(now).as_millis() as u64),
185                }
186            }
187            MaintenanceState::Maintenance { since } => Self::Maintenance {
188                since_ms: now.saturating_duration_since(*since).as_millis() as u64,
189            },
190            MaintenanceState::ExitingMaintenance { since } => Self::ExitingMaintenance {
191                since_ms: now.saturating_duration_since(*since).as_millis() as u64,
192            },
193            MaintenanceState::DrainFailed { since, reason } => Self::DrainFailed {
194                since_ms: now.saturating_duration_since(*since).as_millis() as u64,
195                reason: reason.clone(),
196            },
197            MaintenanceState::Recovery { since } => Self::Recovery {
198                since_ms: now.saturating_duration_since(*since).as_millis() as u64,
199            },
200        }
201    }
202}
203
204/// Per-daemon control-event channel. The router (held by the
205/// SDK) keeps one of these per registered daemon and pushes
206/// translated [`DaemonControl`] events when the executor
207/// dispatches daemon-targeted actions.
208#[derive(Debug)]
209struct DaemonControlSlot {
210    tx: mpsc::Sender<DaemonControl>,
211    /// Total events the slot has dropped because the daemon
212    /// wasn't keeping up (channel full).
213    dropped: AtomicU64,
214}
215
216/// Shareable per-runtime cell mapping `daemon_id` → control
217/// channel. The SDK's routing dispatcher reads this on every
218/// daemon-targeted action to push the translated
219/// [`DaemonControl`].
220#[derive(Clone, Default)]
221pub struct DaemonControlRouter {
222    inner: Arc<RwLock<BTreeMap<u64, Arc<DaemonControlSlot>>>>,
223    /// Cluster-wide backpressure broadcast targets — populated
224    /// once per registered daemon. Read by the routing
225    /// dispatcher when an `MeshOsControl::BackpressureOn/Off`
226    /// fans out.
227    broadcast: Arc<RwLock<Vec<Arc<DaemonControlSlot>>>>,
228}
229
230impl DaemonControlRouter {
231    /// Build an empty router.
232    pub fn new() -> Self {
233        Self::default()
234    }
235
236    /// Register a daemon's control channel. Returns the receiver
237    /// the SDK hands to the daemon handle.
238    fn register(&self, daemon_id: u64, capacity: usize) -> mpsc::Receiver<DaemonControl> {
239        let (tx, rx) = mpsc::channel(capacity);
240        let slot = Arc::new(DaemonControlSlot {
241            tx,
242            dropped: AtomicU64::new(0),
243        });
244        self.inner.write().insert(daemon_id, Arc::clone(&slot));
245        self.broadcast.write().push(slot);
246        rx
247    }
248
249    /// Unregister a daemon's control channel. Subsequent
250    /// dispatches against this `daemon_id` drop with no
251    /// destination.
252    fn unregister(&self, daemon_id: u64) {
253        let removed = self.inner.write().remove(&daemon_id);
254        if let Some(removed) = removed {
255            self.broadcast.write().retain(|s| !Arc::ptr_eq(s, &removed));
256        }
257    }
258
259    /// Push a control event to a specific daemon. At-most-once:
260    /// when the channel is full, the event drops + the slot's
261    /// counter increments.
262    fn route(&self, daemon_id: u64, event: DaemonControl) {
263        let slot = self.inner.read().get(&daemon_id).cloned();
264        if let Some(slot) = slot {
265            if slot.tx.try_send(event).is_err() {
266                slot.dropped.fetch_add(1, Ordering::Relaxed);
267            }
268        }
269    }
270
271    /// Broadcast a control event to every registered daemon.
272    /// Used for cluster-wide signals
273    /// (`BackpressureOn`/`BackpressureOff`).
274    fn broadcast(&self, event: DaemonControl) {
275        let slots = self.broadcast.read().clone();
276        for slot in slots {
277            if slot.tx.try_send(event.clone()).is_err() {
278                slot.dropped.fetch_add(1, Ordering::Relaxed);
279            }
280        }
281    }
282
283    /// Sample the total dropped-event count across all daemons.
284    /// Diagnostic; the SDK exposes per-daemon drop counts on
285    /// the handle.
286    pub fn total_dropped(&self) -> u64 {
287        let map = self.inner.read();
288        map.values()
289            .map(|slot| slot.dropped.load(Ordering::Relaxed))
290            .sum()
291    }
292}
293
294/// Wraps the user's [`ActionDispatcher`] with daemon-control
295/// routing. Intercepts daemon-targeted actions, translates each
296/// to the WASM-friendly [`DaemonControl`] form, pushes to the
297/// per-daemon channel via [`DaemonControlRouter`], then
298/// delegates the original action to the user dispatcher.
299///
300/// Constructed by [`MeshOsDaemonSdk::start`]; consumers that
301/// build their own runtime can construct one manually to opt
302/// into the same routing layer.
303pub struct SdkRoutingDispatcher<D: ActionDispatcher> {
304    inner: Arc<D>,
305    router: DaemonControlRouter,
306}
307
308impl<D: ActionDispatcher> SdkRoutingDispatcher<D> {
309    /// Wrap `inner` with `router`-driven control routing.
310    pub fn new(inner: Arc<D>, router: DaemonControlRouter) -> Self {
311        Self { inner, router }
312    }
313}
314
315impl<D: ActionDispatcher> ActionDispatcher for SdkRoutingDispatcher<D> {
316    fn dispatch<'a>(&'a self, action: MeshOsAction) -> BoxFuture<'a, Result<(), DispatchError>> {
317        let router = self.router.clone();
318        let action_clone = action.clone();
319        let inner = Arc::clone(&self.inner);
320        Box::pin(async move {
321            translate_to_control(&router, &action_clone);
322            inner.dispatch(action).await
323        })
324    }
325}
326
327fn translate_to_control(router: &DaemonControlRouter, action: &MeshOsAction) {
328    let now = Instant::now();
329    if let MeshOsAction::StopDaemon {
330        daemon, deadline, ..
331    } = action
332    {
333        let grace_period_ms = deadline.saturating_duration_since(now).as_millis() as u64;
334        router.route(daemon.id, DaemonControl::Shutdown { grace_period_ms });
335    }
336    // Maintenance / drain fan-out arrives via the substrate's
337    // `ControlSink` (installed at runtime construction); the
338    // dispatcher only catches per-daemon, action-driven signals
339    // here.
340}
341
342/// [`ControlSink`] implementation that broadcasts each
343/// substrate-emitted [`MeshOsControl`] to every registered
344/// daemon via the [`DaemonControlRouter`]. The loop owns the
345/// emit cadence; this adapter just translates the SDK-internal
346/// form to the wire form daemons see.
347pub(super) struct RouterControlSink {
348    router: DaemonControlRouter,
349}
350
351impl RouterControlSink {
352    pub(super) fn new(router: DaemonControlRouter) -> Self {
353        Self { router }
354    }
355}
356
357impl super::control::ControlSink for RouterControlSink {
358    fn emit(&self, event: super::control::MeshOsControl) {
359        let now = Instant::now();
360        self.router.broadcast(event.to_daemon_control(now));
361    }
362}
363
364/// Per-daemon handle. Owns the control-event receiver +
365/// publish-capabilities surface + graceful-shutdown sequence.
366pub struct MeshOsDaemonHandle {
367    daemon_id: u64,
368    daemon_name: String,
369    control_rx: mpsc::Receiver<DaemonControl>,
370    registry: Arc<DaemonRegistry>,
371    router: DaemonControlRouter,
372    metadata: MetadataView,
373    runtime_snapshot_reader: super::event_loop::MeshOsSnapshotReader,
374    /// Mesh-loop publish handle. Used for daemon-author
375    /// surfaces that need to push events into the loop (log
376    /// emission, future capability announcements).
377    mesh_handle: super::event_loop::MeshOsHandle,
378    this_node: NodeId,
379    /// Becomes `true` after `unregister` runs — guards against
380    /// double-unregister on `Drop` + `graceful_shutdown`.
381    unregistered: bool,
382}
383
384impl MeshOsDaemonHandle {
385    /// Receive the next control event. Async — parks until the
386    /// supervisor emits a signal, the handle is unregistered,
387    /// or the runtime shuts down (returns `None`).
388    pub async fn next_control(&mut self) -> Option<DaemonControl> {
389        self.control_rx.recv().await
390    }
391
392    /// Try to receive a control event without parking. Returns
393    /// `None` immediately when the channel is empty.
394    pub fn try_next_control(&mut self) -> Option<DaemonControl> {
395        self.control_rx.try_recv().ok()
396    }
397
398    /// The daemon's substrate identifier (the keypair's
399    /// `origin_hash`). Stable across the handle's lifetime.
400    pub fn daemon_id(&self) -> u64 {
401        self.daemon_id
402    }
403
404    /// The daemon's `MeshDaemon::name()` at registration.
405    pub fn daemon_name(&self) -> &str {
406        &self.daemon_name
407    }
408
409    /// Borrow the cached metadata view. Refresh via
410    /// [`Self::refresh_metadata`] when reading freshness matters.
411    pub fn metadata(&self) -> &MetadataView {
412        &self.metadata
413    }
414
415    /// Rebuild the metadata view from the runtime's latest
416    /// snapshot. Cheap — one `ArcSwap::load_full` plus a `BTreeMap`
417    /// clone of the peer entries.
418    pub fn refresh_metadata(&mut self) -> &MetadataView {
419        let snap = self.runtime_snapshot_reader.read();
420        let maint = match snap.local_maintenance {
421            super::snapshot::MaintenanceStateSnapshot::Active => MaintenanceStateView::Active,
422            super::snapshot::MaintenanceStateSnapshot::EnteringMaintenance {
423                since_ms,
424                deadline_remaining_ms,
425            } => MaintenanceStateView::EnteringMaintenance {
426                since_ms,
427                deadline_remaining_ms,
428            },
429            super::snapshot::MaintenanceStateSnapshot::Maintenance { since_ms } => {
430                MaintenanceStateView::Maintenance { since_ms }
431            }
432            super::snapshot::MaintenanceStateSnapshot::ExitingMaintenance { since_ms } => {
433                MaintenanceStateView::ExitingMaintenance { since_ms }
434            }
435            super::snapshot::MaintenanceStateSnapshot::DrainFailed { since_ms, reason } => {
436                MaintenanceStateView::DrainFailed { since_ms, reason }
437            }
438            super::snapshot::MaintenanceStateSnapshot::Recovery { since_ms } => {
439                MaintenanceStateView::Recovery { since_ms }
440            }
441        };
442        self.metadata = MetadataView {
443            node_id: self.this_node,
444            daemon_id: self.daemon_id,
445            daemon_name: self.daemon_name.clone(),
446            maintenance_state: maint,
447            peers: snap.peers,
448        };
449        &self.metadata
450    }
451
452    /// Publish (or update) the daemon's [`CapabilitySet`]. The
453    /// SDK doesn't itself commit to the capability chain — the
454    /// host's substrate-side path does — but it surfaces a stub
455    /// that returns `Ok(())` for now. A future slice plumbs this
456    /// to the real `CapabilityIndex::announce` path.
457    ///
458    /// **Plumbing status:** this method is a thin contract today;
459    /// the actual capability-chain commit lands when the
460    /// `CapabilitySet` → admin-chain integration ships. Calling
461    /// it now is a no-op + always returns `Ok(())`.
462    pub fn publish_capabilities(&self, _caps: CapabilitySet) -> Result<(), SdkError> {
463        // TODO: when the capability-chain commit path lands,
464        // wire through to `CapabilityIndex::announce_set(...)`.
465        Ok(())
466    }
467
468    /// Publish a log line tagged with this daemon's id. The
469    /// loop stamps the seq + wall-clock timestamp + this
470    /// node's id before pushing onto the per-node log ring;
471    /// operators reading through Deck SDK's
472    /// `subscribe_logs(LogFilter::new().with_daemon(id))` see
473    /// the line.
474    ///
475    /// Non-blocking — uses `MeshOsHandle::try_publish` so a
476    /// saturated event queue surfaces as `SdkError` with kind
477    /// `queue_full` rather than parking the caller. Daemons in
478    /// hot loops can drop log lines on backpressure without
479    /// stalling.
480    pub fn publish_log(
481        &self,
482        level: super::logs::LogLevel,
483        message: impl Into<String>,
484    ) -> Result<(), SdkError> {
485        let line = super::logs::LogLine {
486            level,
487            daemon_id: Some(self.daemon_id),
488            message: message.into(),
489        };
490        self.mesh_handle
491            .try_publish(super::event::MeshOsEvent::LogLine(line))
492            .map_err(|e| match e {
493                super::event_loop::MeshOsHandleError::LoopClosed => SdkError::new(
494                    "loop_closed",
495                    "MeshOS loop has exited; daemon log line dropped",
496                ),
497                super::event_loop::MeshOsHandleError::QueueFull => SdkError::new(
498                    "queue_full",
499                    "MeshOS event queue at capacity; daemon log line dropped",
500                ),
501            })
502    }
503
504    /// Drive a graceful shutdown. Sends
505    /// `DaemonControl::Shutdown { grace_period_ms }` to the
506    /// daemon's control channel, parks for `grace` (or until
507    /// the daemon unregisters itself — whichever sooner), then
508    /// runs the failsafe unregister.
509    pub async fn graceful_shutdown(mut self, grace: Duration) -> Result<(), SdkError> {
510        // Inject a shutdown event so the daemon's `next_control`
511        // loop wakes up.
512        let grace_ms = grace.as_millis() as u64;
513        self.router.route(
514            self.daemon_id,
515            DaemonControl::Shutdown {
516                grace_period_ms: grace_ms,
517            },
518        );
519        // Park for the grace window, but short-circuit as soon as
520        // the daemon clears itself from the registry — a clean
521        // exit shouldn't be indistinguishable from a hung daemon.
522        // Pre-fix this was a blind `sleep(grace)`, multiplying
523        // shutdown latency across a fleet during rolling restarts
524        // for every daemon that exits faster than its grace
525        // window. 50 ms poll cadence is a compromise between
526        // wake-up latency on clean exit and CPU overhead during
527        // the grace window.
528        let deadline = tokio::time::Instant::now() + grace;
529        let mut poll = tokio::time::interval(Duration::from_millis(50));
530        poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
531        loop {
532            if !self.registry.contains(self.daemon_id) {
533                break;
534            }
535            if tokio::time::Instant::now() >= deadline {
536                break;
537            }
538            tokio::select! {
539                _ = tokio::time::sleep_until(deadline) => break,
540                _ = poll.tick() => {}
541            }
542        }
543        self.unregister_inner();
544        Ok(())
545    }
546
547    fn unregister_inner(&mut self) {
548        if self.unregistered {
549            return;
550        }
551        self.unregistered = true;
552        // Drop the router slot first so further dispatches against
553        // this daemon are no-ops; then unregister from the registry
554        // (which fires the lifecycle observer; the SDK consumer
555        // sees `Unregistered`).
556        self.router.unregister(self.daemon_id);
557        let _ = self.registry.unregister(self.daemon_id);
558    }
559}
560
561impl Drop for MeshOsDaemonHandle {
562    fn drop(&mut self) {
563        // Failsafe — if the consumer didn't call
564        // `graceful_shutdown`, still clean up the registry +
565        // router slot so the daemon doesn't leak.
566        self.unregister_inner();
567    }
568}
569
570/// SDK entry point. Wraps a [`MeshOsRuntime`] with the
571/// [`SdkRoutingDispatcher`] + a [`DaemonControlRouter`] so
572/// daemon-targeted actions translate to per-daemon control
573/// events.
574///
575/// Construct via [`Self::start`] (one-call setup) or
576/// [`Self::from_runtime`] (compose against a pre-built runtime
577/// when the consumer needs to share state with other
578/// subsystems).
579pub struct MeshOsDaemonSdk {
580    runtime: MeshOsRuntime,
581    router: DaemonControlRouter,
582    control_capacity: usize,
583}
584
585impl MeshOsDaemonSdk {
586    /// One-call setup. Wraps the user's dispatcher in
587    /// [`SdkRoutingDispatcher`]; starts the runtime; retains
588    /// the router for per-daemon registration.
589    pub fn start<D: ActionDispatcher>(config: MeshOsConfig, user_dispatcher: Arc<D>) -> Self {
590        let router = DaemonControlRouter::new();
591        let routed = Arc::new(SdkRoutingDispatcher::new(user_dispatcher, router.clone()));
592        let sink: Arc<dyn super::control::ControlSink> =
593            Arc::new(RouterControlSink::new(router.clone()));
594        let runtime = MeshOsRuntime::start_with_options(
595            config,
596            routed,
597            super::event_loop::ProbeRegistry::new(),
598            super::scheduler::SchedulerRegistry::new(),
599            Arc::new(DaemonRegistry::new()),
600            Some(sink),
601        );
602        Self {
603            runtime,
604            router,
605            control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
606        }
607    }
608
609    /// Compose against a pre-built runtime + router. The
610    /// runtime's dispatcher must already be wrapped in
611    /// [`SdkRoutingDispatcher`] for the same `router`.
612    pub fn from_runtime(runtime: MeshOsRuntime, router: DaemonControlRouter) -> Self {
613        Self {
614            runtime,
615            router,
616            control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
617        }
618    }
619
620    /// Like [`Self::start`] but also installs an
621    /// [`super::ice::AdminVerifier`] on the runtime. Required
622    /// for any deployment where the operator's signed admin
623    /// commits should fold with `VerificationOutcome::Accepted`
624    /// instead of `Unverified` — the verifier's
625    /// [`super::ice::OperatorRegistry`] must contain the
626    /// operator key that signs incoming commits.
627    pub fn start_with_verifier<D: ActionDispatcher>(
628        config: MeshOsConfig,
629        user_dispatcher: Arc<D>,
630        verifier: Arc<super::ice::AdminVerifier>,
631    ) -> Self {
632        Self::start_with_verifier_and_migration_source(
633            config,
634            user_dispatcher,
635            Some(verifier),
636            None,
637        )
638    }
639
640    /// Install an `AdminVerifier` plus an optional migration
641    /// snapshot source in one call. This is **not** the full
642    /// extension surface — the underlying
643    /// [`MeshOsRuntime`] also accepts admin-audit / log /
644    /// failure chain appenders and a migration aborter; this
645    /// SDK-wrapper constructor exposes only the two extensions
646    /// the daemon-side SDK shipped first. Plumbing the other
647    /// extension slots through the SDK wrapper is tracked in
648    /// `MESHOS_SDK_PLAN.md` § Deferred work; for now,
649    /// deployments needing the full surface drop down to
650    /// [`MeshOsRuntime::start_with_full_extensions`] directly.
651    pub fn start_with_verifier_and_migration_source<D: ActionDispatcher>(
652        config: MeshOsConfig,
653        user_dispatcher: Arc<D>,
654        verifier: Option<Arc<super::ice::AdminVerifier>>,
655        migration_snapshot_source: Option<
656            Arc<dyn super::migration_snapshot_source::MigrationSnapshotSource>,
657        >,
658    ) -> Self {
659        let router = DaemonControlRouter::new();
660        let routed = Arc::new(SdkRoutingDispatcher::new(user_dispatcher, router.clone()));
661        let sink: Arc<dyn super::control::ControlSink> =
662            Arc::new(RouterControlSink::new(router.clone()));
663        let runtime = MeshOsRuntime::start_with_full_extensions(
664            config,
665            routed,
666            super::event_loop::ProbeRegistry::new(),
667            super::scheduler::SchedulerRegistry::new(),
668            Arc::new(DaemonRegistry::new()),
669            Some(sink),
670            verifier,
671            None, // admin audit appender
672            None, // log appender
673            None, // failure appender
674            None, // migration aborter
675            migration_snapshot_source,
676        );
677        Self {
678            runtime,
679            router,
680            control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
681        }
682    }
683
684    /// Override the per-daemon control-channel capacity. Default
685    /// is [`DEFAULT_CONTROL_CHANNEL_CAPACITY`]. Increase for
686    /// daemons that pause `process()` longer than the supervisor's
687    /// tick cadence.
688    pub fn with_control_capacity(mut self, capacity: usize) -> Self {
689        self.control_capacity = capacity.max(1);
690        self
691    }
692
693    /// Borrow the wrapped runtime.
694    pub fn runtime(&self) -> &MeshOsRuntime {
695        &self.runtime
696    }
697
698    /// Borrow the daemon-control router.
699    pub fn router(&self) -> &DaemonControlRouter {
700        &self.router
701    }
702
703    /// Register a daemon. Constructs a [`DaemonHost`], inserts
704    /// it into the runtime's registry, allocates the per-daemon
705    /// control channel, and returns the handle.
706    pub fn register_daemon(
707        &self,
708        daemon: Box<dyn MeshDaemon>,
709        keypair: EntityKeypair,
710    ) -> Result<MeshOsDaemonHandle, SdkError> {
711        let daemon_id = keypair.origin_hash();
712        let daemon_name = daemon.name().to_string();
713        let host = DaemonHost::new(daemon, keypair, DaemonHostConfig::default());
714        self.runtime
715            .daemon_registry()
716            .register(host)
717            .map_err(SdkError::from)?;
718        let control_rx = self.router.register(daemon_id, self.control_capacity);
719        let snap = self.runtime.snapshot();
720        let metadata = MetadataView {
721            node_id: self.runtime_this_node(),
722            daemon_id,
723            daemon_name: daemon_name.clone(),
724            maintenance_state: MaintenanceStateView::Active,
725            peers: snap.peers,
726        };
727        Ok(MeshOsDaemonHandle {
728            daemon_id,
729            daemon_name,
730            control_rx,
731            registry: Arc::clone(self.runtime.daemon_registry()),
732            router: self.router.clone(),
733            metadata,
734            runtime_snapshot_reader: self.runtime.snapshot_reader().clone(),
735            mesh_handle: self.runtime.handle_clone(),
736            this_node: self.runtime_this_node(),
737            unregistered: false,
738        })
739    }
740
741    /// Total events the router dropped across every registered
742    /// daemon. Diagnostic.
743    pub fn dropped_control_events(&self) -> u64 {
744        self.router.total_dropped()
745    }
746
747    /// Drive a clean shutdown of the wrapped runtime.
748    pub async fn shutdown(self) -> Result<RuntimeStats, RuntimeShutdownError> {
749        self.runtime.shutdown().await
750    }
751
752    /// Read `MeshOsConfig::this_node` off the runtime. Sourced
753    /// directly from the field the runtime captured at
754    /// construction; the SDK's `MetadataView::node_id` now
755    /// reflects the deployment's identity instead of always 0.
756    fn runtime_this_node(&self) -> NodeId {
757        self.runtime.this_node()
758    }
759}
760
761/// One-call macro for the common "single daemon per process"
762/// case. Expands to a `tokio::main` body that:
763///
764/// 1. Constructs a [`MeshOsRuntime`] via the supplied config +
765///    dispatcher,
766/// 2. Wraps the runtime in [`MeshOsDaemonSdk`],
767/// 3. Registers the supplied daemon + keypair,
768/// 4. Drains control events; on `Shutdown` or `DrainFinish`,
769///    breaks the loop,
770/// 5. Drives `graceful_shutdown` on the handle, then
771///    `shutdown` on the SDK.
772///
773/// ```ignore
774/// daemon_main! {
775///     name: "my-telemetry",
776///     daemon: MyTelemetryDaemon::new(),
777///     keypair: EntityKeypair::generate(),
778///     config: MeshOsConfig::default(),
779///     dispatcher: my_dispatcher,
780/// }
781/// ```
782#[macro_export]
783macro_rules! daemon_main {
784    (
785        daemon: $daemon:expr,
786        keypair: $keypair:expr,
787        config: $config:expr,
788        dispatcher: $dispatcher:expr $(,)?
789    ) => {{
790        let sdk = $crate::adapter::net::behavior::meshos::sdk::MeshOsDaemonSdk::start(
791            $config,
792            $dispatcher,
793        );
794        let mut handle = sdk
795            .register_daemon(Box::new($daemon), $keypair)
796            .expect("daemon registration failed");
797        while let Some(ev) = handle.next_control().await {
798            use $crate::adapter::net::compute::DaemonControl;
799            if matches!(
800                ev,
801                DaemonControl::Shutdown { .. } | DaemonControl::DrainFinish
802            ) {
803                break;
804            }
805        }
806        let grace = $crate::adapter::net::behavior::meshos::sdk::DEFAULT_GRACEFUL_SHUTDOWN;
807        let _ = handle.graceful_shutdown(grace).await;
808        let _ = sdk.shutdown().await;
809    }};
810}
811
812#[cfg(test)]
813#[allow(clippy::field_reassign_with_default)]
814mod tests {
815    use std::sync::atomic::AtomicUsize;
816
817    use bytes::Bytes;
818
819    use super::*;
820    use crate::adapter::net::behavior::capability::CapabilityFilter;
821    use crate::adapter::net::behavior::meshos::action::ActionId;
822    use crate::adapter::net::behavior::meshos::executor::LoggingDispatcher;
823    use crate::adapter::net::behavior::meshos::PendingAction;
824    use crate::adapter::net::compute::{DaemonError, MeshDaemon};
825    use crate::adapter::net::state::causal::CausalEvent;
826
827    /// Minimal test daemon — no state, name + process only.
828    struct NoopDaemon {
829        name: String,
830        process_count: Arc<AtomicUsize>,
831    }
832    impl NoopDaemon {
833        fn new(name: &str) -> (Self, Arc<AtomicUsize>) {
834            let counter = Arc::new(AtomicUsize::new(0));
835            (
836                Self {
837                    name: name.into(),
838                    process_count: Arc::clone(&counter),
839                },
840                counter,
841            )
842        }
843    }
844    impl MeshDaemon for NoopDaemon {
845        fn name(&self) -> &str {
846            &self.name
847        }
848        fn requirements(&self) -> CapabilityFilter {
849            CapabilityFilter::default()
850        }
851        fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
852            self.process_count.fetch_add(1, Ordering::Relaxed);
853            Ok(Vec::new())
854        }
855    }
856
857    fn fast_config() -> MeshOsConfig {
858        let mut cfg = MeshOsConfig::default();
859        cfg.tick_interval = Duration::from_millis(10);
860        cfg
861    }
862
863    #[tokio::test]
864    async fn register_daemon_returns_handle_with_correct_identity() {
865        let dispatcher = Arc::new(LoggingDispatcher::new());
866        let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
867        let (daemon, _counter) = NoopDaemon::new("telemetry");
868        let kp = EntityKeypair::generate();
869        let expected_id = kp.origin_hash();
870        let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
871        assert_eq!(handle.daemon_id(), expected_id);
872        assert_eq!(handle.daemon_name(), "telemetry");
873        let _ = sdk.shutdown().await;
874    }
875
876    #[tokio::test]
877    async fn control_router_routes_stop_daemon_to_per_daemon_channel() {
878        let router = DaemonControlRouter::new();
879        let mut rx = router.register(42, 4);
880        // Inject directly — the routing dispatcher does this via
881        // translate_to_control under the hood.
882        router.route(
883            42,
884            DaemonControl::Shutdown {
885                grace_period_ms: 5000,
886            },
887        );
888        let ev = rx.try_recv().expect("event present");
889        assert!(matches!(
890            ev,
891            DaemonControl::Shutdown {
892                grace_period_ms: 5000
893            }
894        ));
895    }
896
897    #[tokio::test]
898    async fn control_router_drops_when_channel_full() {
899        let router = DaemonControlRouter::new();
900        let _rx = router.register(99, 1);
901        router.route(99, DaemonControl::BackpressureOn { level: 0.5 });
902        // Second push exceeds capacity 1 → drop.
903        router.route(99, DaemonControl::BackpressureOn { level: 0.8 });
904        assert_eq!(router.total_dropped(), 1);
905    }
906
907    #[tokio::test]
908    async fn translate_to_control_emits_shutdown_for_stop_daemon() {
909        let router = DaemonControlRouter::new();
910        let mut rx = router.register(7, 4);
911        let action = MeshOsAction::StopDaemon {
912            daemon: super::super::event::DaemonRef {
913                id: 7,
914                name: "x".into(),
915            },
916            reason: "intent-stop".into(),
917            deadline: Instant::now() + Duration::from_millis(2500),
918        };
919        translate_to_control(&router, &action);
920        let ev = rx.try_recv().expect("translated to control event");
921        match ev {
922            DaemonControl::Shutdown { grace_period_ms } => {
923                // Allow small slop for the Instant arithmetic.
924                assert!((2400..=2500).contains(&grace_period_ms));
925            }
926            other => panic!("expected Shutdown, got {other:?}"),
927        }
928    }
929
930    #[tokio::test]
931    async fn router_control_sink_broadcasts_drain_start_to_every_registered_daemon() {
932        // The sink is the substrate-emit seam — when the loop
933        // observes a maintenance transition it calls `emit`; the
934        // adapter broadcasts the translated `DaemonControl` to
935        // every registered daemon channel.
936        use super::super::control::{ControlSink, MeshOsControl};
937        let router = DaemonControlRouter::new();
938        let mut rx_a = router.register(1, 4);
939        let mut rx_b = router.register(2, 4);
940        let sink = RouterControlSink::new(router.clone());
941        sink.emit(MeshOsControl::DrainStart {
942            deadline: std::time::Instant::now() + std::time::Duration::from_secs(30),
943        });
944        let ev_a = rx_a.try_recv().expect("daemon A received drain start");
945        let ev_b = rx_b.try_recv().expect("daemon B received drain start");
946        assert!(matches!(ev_a, DaemonControl::DrainStart { .. }));
947        assert!(matches!(ev_b, DaemonControl::DrainStart { .. }));
948    }
949
950    #[tokio::test]
951    async fn unregister_removes_router_slot() {
952        let router = DaemonControlRouter::new();
953        let _rx = router.register(7, 4);
954        router.unregister(7);
955        // Subsequent dispatch against 7 is a no-op — no panic,
956        // no drop counter increment (the slot is gone).
957        router.route(7, DaemonControl::Shutdown { grace_period_ms: 1 });
958        assert_eq!(router.total_dropped(), 0);
959    }
960
961    #[tokio::test]
962    async fn publish_log_lands_on_runtime_log_ring_tagged_with_daemon_id() {
963        // Daemon-author surface: a registered daemon emits a
964        // log line via the handle; the line shows up on the
965        // runtime's log ring tagged with the daemon's id, ready
966        // for Deck SDK's `subscribe_logs` to pick up.
967        let dispatcher = Arc::new(LoggingDispatcher::new());
968        let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
969        let (daemon, _) = NoopDaemon::new("logger");
970        let kp = EntityKeypair::generate();
971        let daemon_id = kp.origin_hash();
972        let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
973
974        handle
975            .publish_log(
976                super::super::logs::LogLevel::Warn,
977                "throttling: queue depth high",
978            )
979            .expect("publish_log");
980
981        // Give the loop a tick + reconcile + snapshot publish.
982        tokio::time::sleep(Duration::from_millis(80)).await;
983        let snap = sdk.runtime().snapshot();
984        let matching: Vec<_> = snap
985            .log_ring
986            .iter()
987            .filter(|r| r.daemon_id == Some(daemon_id))
988            .collect();
989        assert_eq!(matching.len(), 1, "expected one log line for this daemon");
990        let record = matching[0];
991        assert_eq!(record.level, super::super::logs::LogLevel::Warn);
992        assert_eq!(record.message, "throttling: queue depth high");
993        let _ = sdk.shutdown().await;
994    }
995
996    #[tokio::test]
997    async fn publish_log_after_runtime_shutdown_returns_loop_closed() {
998        let dispatcher = Arc::new(LoggingDispatcher::new());
999        let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1000        let (daemon, _) = NoopDaemon::new("logger");
1001        let kp = EntityKeypair::generate();
1002        let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1003        let _ = sdk.shutdown().await;
1004        let err = handle
1005            .publish_log(super::super::logs::LogLevel::Info, "after shutdown")
1006            .expect_err("publish after shutdown should fail");
1007        assert_eq!(err.kind, "loop_closed");
1008    }
1009
1010    #[tokio::test]
1011    async fn handle_drop_unregisters_from_registry_and_router() {
1012        let dispatcher = Arc::new(LoggingDispatcher::new());
1013        let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1014        let registry = Arc::clone(sdk.runtime.daemon_registry());
1015        let (daemon, _) = NoopDaemon::new("temp");
1016        let kp = EntityKeypair::generate();
1017        let daemon_id = kp.origin_hash();
1018        let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1019        drop(handle);
1020        // Registry should no longer have the daemon — try
1021        // unregister and expect NotFound.
1022        assert!(matches!(
1023            registry.unregister(daemon_id),
1024            Err(DaemonError::NotFound(_))
1025        ));
1026        let _ = sdk.shutdown().await;
1027    }
1028
1029    #[tokio::test]
1030    async fn graceful_shutdown_sends_shutdown_control_then_unregisters() {
1031        let dispatcher = Arc::new(LoggingDispatcher::new());
1032        let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1033        let (daemon, _) = NoopDaemon::new("graceful");
1034        let kp = EntityKeypair::generate();
1035        let mut handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1036        // Spawn a task that consumes one control event, mimicking
1037        // a real daemon loop.
1038        let mut control_rx =
1039            std::mem::replace(&mut handle.control_rx, mpsc::channel::<DaemonControl>(1).1);
1040        let received = tokio::spawn(async move { control_rx.recv().await });
1041        // graceful_shutdown injects a Shutdown event + parks for
1042        // the grace window. Use a short grace to keep the test
1043        // fast.
1044        let _ = handle.graceful_shutdown(Duration::from_millis(50)).await;
1045        let ev = received.await.unwrap();
1046        assert!(matches!(ev, Some(DaemonControl::Shutdown { .. })));
1047        let _ = sdk.shutdown().await;
1048    }
1049
1050    #[tokio::test]
1051    async fn publish_capabilities_returns_ok_pending_capability_chain_wiring() {
1052        let dispatcher = Arc::new(LoggingDispatcher::new());
1053        let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1054        let (daemon, _) = NoopDaemon::new("noop");
1055        let kp = EntityKeypair::generate();
1056        let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1057        // Stub for now; the chain commit lands in a future slice.
1058        let result = handle.publish_capabilities(CapabilitySet::default());
1059        assert!(result.is_ok());
1060        let _ = sdk.shutdown().await;
1061    }
1062
1063    #[tokio::test]
1064    async fn refresh_metadata_pulls_from_runtime_snapshot() {
1065        let dispatcher = Arc::new(LoggingDispatcher::new());
1066        let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1067        let (daemon, _) = NoopDaemon::new("inspect");
1068        let kp = EntityKeypair::generate();
1069        let mut handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1070        // Let the loop run + publish at least one snapshot.
1071        tokio::time::sleep(Duration::from_millis(100)).await;
1072        let view = handle.refresh_metadata();
1073        assert_eq!(view.daemon_name, "inspect");
1074        // peers might be empty under a single-node test fixture;
1075        // pin only that the metadata view materializes without
1076        // panic.
1077        assert!(matches!(
1078            view.maintenance_state,
1079            MaintenanceStateView::Active
1080        ));
1081        let _ = sdk.shutdown().await;
1082    }
1083
1084    #[test]
1085    fn sdk_error_display_carries_kind_discriminator() {
1086        let err = SdkError::new("register_failed", "host already registered");
1087        let formatted = format!("{err}");
1088        assert!(formatted.starts_with("<<meshos-sdk-kind:register_failed>>"));
1089        assert!(formatted.ends_with("host already registered"));
1090    }
1091
1092    #[test]
1093    fn maintenance_state_view_round_trips_active_default() {
1094        let now = Instant::now();
1095        let active = MaintenanceStateView::from_state(&MaintenanceState::Active, now);
1096        assert!(matches!(active, MaintenanceStateView::Active));
1097    }
1098
1099    #[test]
1100    fn maintenance_state_view_clamps_past_deadlines_to_zero() {
1101        let now = Instant::now();
1102        let state = MaintenanceState::EnteringMaintenance {
1103            since: now - Duration::from_secs(5),
1104            deadline: Some(now - Duration::from_secs(1)),
1105        };
1106        let view = MaintenanceStateView::from_state(&state, now);
1107        match view {
1108            MaintenanceStateView::EnteringMaintenance {
1109                deadline_remaining_ms,
1110                ..
1111            } => assert_eq!(deadline_remaining_ms, Some(0)),
1112            other => panic!("expected EnteringMaintenance, got {other:?}"),
1113        }
1114    }
1115
1116    // Pin the unused-import suppression — we touch these types
1117    // through macro expansion paths in production but tests don't
1118    // always exercise them.
1119    #[allow(dead_code)]
1120    fn _pin(_p: PendingAction, _a: ActionId, _f: super::super::event::DaemonRef) {}
1121}