1use std::collections::BTreeMap;
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56
57use futures::future::BoxFuture;
58use parking_lot::RwLock;
59use tokio::sync::mpsc;
60
61use crate::adapter::net::behavior::capability::CapabilitySet;
62use crate::adapter::net::compute::{
63 DaemonControl, DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
64};
65use crate::adapter::net::identity::EntityKeypair;
66
67use super::action::MeshOsAction;
68use super::config::MeshOsConfig;
69use super::event::NodeId;
70use super::executor::{ActionDispatcher, DispatchError};
71use super::maintenance::MaintenanceState;
72use super::runtime::{MeshOsRuntime, RuntimeShutdownError, RuntimeStats};
73use super::snapshot::PeerSnapshot;
74
75pub const DEFAULT_CONTROL_CHANNEL_CAPACITY: usize = 8;
79
80pub const DEFAULT_GRACEFUL_SHUTDOWN: Duration = Duration::from_secs(5);
83
84#[derive(Clone, Debug, thiserror::Error)]
87#[error("<<meshos-sdk-kind:{kind}>>{message}")]
88pub struct SdkError {
89 pub kind: &'static str,
93 pub message: String,
95}
96
97impl SdkError {
98 fn new(kind: &'static str, message: impl Into<String>) -> Self {
99 Self {
100 kind,
101 message: message.into(),
102 }
103 }
104}
105
106impl From<DaemonError> for SdkError {
107 fn from(err: DaemonError) -> Self {
108 Self::new("register_failed", err.to_string())
109 }
110}
111
112#[derive(Clone, Debug)]
117pub struct MetadataView {
118 pub node_id: NodeId,
120 pub daemon_id: u64,
123 pub daemon_name: String,
125 pub maintenance_state: MaintenanceStateView,
128 pub peers: BTreeMap<NodeId, PeerSnapshot>,
130}
131
132#[derive(Clone, Debug, Eq, PartialEq)]
136#[non_exhaustive]
137pub enum MaintenanceStateView {
138 Active,
140 EnteringMaintenance {
142 since_ms: u64,
144 deadline_remaining_ms: Option<u64>,
147 },
148 Maintenance {
150 since_ms: u64,
152 },
153 ExitingMaintenance {
155 since_ms: u64,
157 },
158 DrainFailed {
160 since_ms: u64,
162 reason: String,
164 },
165 Recovery {
167 since_ms: u64,
169 },
170}
171
172impl MaintenanceStateView {
173 pub fn from_state(state: &MaintenanceState, now: Instant) -> Self {
178 match state {
179 MaintenanceState::Active => Self::Active,
180 MaintenanceState::EnteringMaintenance { since, deadline } => {
181 Self::EnteringMaintenance {
182 since_ms: now.saturating_duration_since(*since).as_millis() as u64,
183 deadline_remaining_ms: deadline
184 .map(|d| d.saturating_duration_since(now).as_millis() as u64),
185 }
186 }
187 MaintenanceState::Maintenance { since } => Self::Maintenance {
188 since_ms: now.saturating_duration_since(*since).as_millis() as u64,
189 },
190 MaintenanceState::ExitingMaintenance { since } => Self::ExitingMaintenance {
191 since_ms: now.saturating_duration_since(*since).as_millis() as u64,
192 },
193 MaintenanceState::DrainFailed { since, reason } => Self::DrainFailed {
194 since_ms: now.saturating_duration_since(*since).as_millis() as u64,
195 reason: reason.clone(),
196 },
197 MaintenanceState::Recovery { since } => Self::Recovery {
198 since_ms: now.saturating_duration_since(*since).as_millis() as u64,
199 },
200 }
201 }
202}
203
204#[derive(Debug)]
209struct DaemonControlSlot {
210 tx: mpsc::Sender<DaemonControl>,
211 dropped: AtomicU64,
214}
215
216#[derive(Clone, Default)]
221pub struct DaemonControlRouter {
222 inner: Arc<RwLock<BTreeMap<u64, Arc<DaemonControlSlot>>>>,
223 broadcast: Arc<RwLock<Vec<Arc<DaemonControlSlot>>>>,
228}
229
230impl DaemonControlRouter {
231 pub fn new() -> Self {
233 Self::default()
234 }
235
236 fn register(&self, daemon_id: u64, capacity: usize) -> mpsc::Receiver<DaemonControl> {
239 let (tx, rx) = mpsc::channel(capacity);
240 let slot = Arc::new(DaemonControlSlot {
241 tx,
242 dropped: AtomicU64::new(0),
243 });
244 self.inner.write().insert(daemon_id, Arc::clone(&slot));
245 self.broadcast.write().push(slot);
246 rx
247 }
248
249 fn unregister(&self, daemon_id: u64) {
253 let removed = self.inner.write().remove(&daemon_id);
254 if let Some(removed) = removed {
255 self.broadcast.write().retain(|s| !Arc::ptr_eq(s, &removed));
256 }
257 }
258
259 fn route(&self, daemon_id: u64, event: DaemonControl) {
263 let slot = self.inner.read().get(&daemon_id).cloned();
264 if let Some(slot) = slot {
265 if slot.tx.try_send(event).is_err() {
266 slot.dropped.fetch_add(1, Ordering::Relaxed);
267 }
268 }
269 }
270
271 fn broadcast(&self, event: DaemonControl) {
275 let slots = self.broadcast.read().clone();
276 for slot in slots {
277 if slot.tx.try_send(event.clone()).is_err() {
278 slot.dropped.fetch_add(1, Ordering::Relaxed);
279 }
280 }
281 }
282
283 pub fn total_dropped(&self) -> u64 {
287 let map = self.inner.read();
288 map.values()
289 .map(|slot| slot.dropped.load(Ordering::Relaxed))
290 .sum()
291 }
292}
293
294pub struct SdkRoutingDispatcher<D: ActionDispatcher> {
304 inner: Arc<D>,
305 router: DaemonControlRouter,
306}
307
308impl<D: ActionDispatcher> SdkRoutingDispatcher<D> {
309 pub fn new(inner: Arc<D>, router: DaemonControlRouter) -> Self {
311 Self { inner, router }
312 }
313}
314
315impl<D: ActionDispatcher> ActionDispatcher for SdkRoutingDispatcher<D> {
316 fn dispatch<'a>(&'a self, action: MeshOsAction) -> BoxFuture<'a, Result<(), DispatchError>> {
317 let router = self.router.clone();
318 let action_clone = action.clone();
319 let inner = Arc::clone(&self.inner);
320 Box::pin(async move {
321 translate_to_control(&router, &action_clone);
322 inner.dispatch(action).await
323 })
324 }
325}
326
327fn translate_to_control(router: &DaemonControlRouter, action: &MeshOsAction) {
328 let now = Instant::now();
329 if let MeshOsAction::StopDaemon {
330 daemon, deadline, ..
331 } = action
332 {
333 let grace_period_ms = deadline.saturating_duration_since(now).as_millis() as u64;
334 router.route(daemon.id, DaemonControl::Shutdown { grace_period_ms });
335 }
336 }
341
342pub(super) struct RouterControlSink {
348 router: DaemonControlRouter,
349}
350
351impl RouterControlSink {
352 pub(super) fn new(router: DaemonControlRouter) -> Self {
353 Self { router }
354 }
355}
356
357impl super::control::ControlSink for RouterControlSink {
358 fn emit(&self, event: super::control::MeshOsControl) {
359 let now = Instant::now();
360 self.router.broadcast(event.to_daemon_control(now));
361 }
362}
363
364pub struct MeshOsDaemonHandle {
367 daemon_id: u64,
368 daemon_name: String,
369 control_rx: mpsc::Receiver<DaemonControl>,
370 registry: Arc<DaemonRegistry>,
371 router: DaemonControlRouter,
372 metadata: MetadataView,
373 runtime_snapshot_reader: super::event_loop::MeshOsSnapshotReader,
374 mesh_handle: super::event_loop::MeshOsHandle,
378 this_node: NodeId,
379 unregistered: bool,
382}
383
384impl MeshOsDaemonHandle {
385 pub async fn next_control(&mut self) -> Option<DaemonControl> {
389 self.control_rx.recv().await
390 }
391
392 pub fn try_next_control(&mut self) -> Option<DaemonControl> {
395 self.control_rx.try_recv().ok()
396 }
397
398 pub fn daemon_id(&self) -> u64 {
401 self.daemon_id
402 }
403
404 pub fn daemon_name(&self) -> &str {
406 &self.daemon_name
407 }
408
409 pub fn metadata(&self) -> &MetadataView {
412 &self.metadata
413 }
414
415 pub fn refresh_metadata(&mut self) -> &MetadataView {
419 let snap = self.runtime_snapshot_reader.read();
420 let maint = match snap.local_maintenance {
421 super::snapshot::MaintenanceStateSnapshot::Active => MaintenanceStateView::Active,
422 super::snapshot::MaintenanceStateSnapshot::EnteringMaintenance {
423 since_ms,
424 deadline_remaining_ms,
425 } => MaintenanceStateView::EnteringMaintenance {
426 since_ms,
427 deadline_remaining_ms,
428 },
429 super::snapshot::MaintenanceStateSnapshot::Maintenance { since_ms } => {
430 MaintenanceStateView::Maintenance { since_ms }
431 }
432 super::snapshot::MaintenanceStateSnapshot::ExitingMaintenance { since_ms } => {
433 MaintenanceStateView::ExitingMaintenance { since_ms }
434 }
435 super::snapshot::MaintenanceStateSnapshot::DrainFailed { since_ms, reason } => {
436 MaintenanceStateView::DrainFailed { since_ms, reason }
437 }
438 super::snapshot::MaintenanceStateSnapshot::Recovery { since_ms } => {
439 MaintenanceStateView::Recovery { since_ms }
440 }
441 };
442 self.metadata = MetadataView {
443 node_id: self.this_node,
444 daemon_id: self.daemon_id,
445 daemon_name: self.daemon_name.clone(),
446 maintenance_state: maint,
447 peers: snap.peers,
448 };
449 &self.metadata
450 }
451
452 pub fn publish_capabilities(&self, _caps: CapabilitySet) -> Result<(), SdkError> {
463 Ok(())
466 }
467
468 pub fn publish_log(
481 &self,
482 level: super::logs::LogLevel,
483 message: impl Into<String>,
484 ) -> Result<(), SdkError> {
485 let line = super::logs::LogLine {
486 level,
487 daemon_id: Some(self.daemon_id),
488 message: message.into(),
489 };
490 self.mesh_handle
491 .try_publish(super::event::MeshOsEvent::LogLine(line))
492 .map_err(|e| match e {
493 super::event_loop::MeshOsHandleError::LoopClosed => SdkError::new(
494 "loop_closed",
495 "MeshOS loop has exited; daemon log line dropped",
496 ),
497 super::event_loop::MeshOsHandleError::QueueFull => SdkError::new(
498 "queue_full",
499 "MeshOS event queue at capacity; daemon log line dropped",
500 ),
501 })
502 }
503
504 pub async fn graceful_shutdown(mut self, grace: Duration) -> Result<(), SdkError> {
510 let grace_ms = grace.as_millis() as u64;
513 self.router.route(
514 self.daemon_id,
515 DaemonControl::Shutdown {
516 grace_period_ms: grace_ms,
517 },
518 );
519 let deadline = tokio::time::Instant::now() + grace;
529 let mut poll = tokio::time::interval(Duration::from_millis(50));
530 poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
531 loop {
532 if !self.registry.contains(self.daemon_id) {
533 break;
534 }
535 if tokio::time::Instant::now() >= deadline {
536 break;
537 }
538 tokio::select! {
539 _ = tokio::time::sleep_until(deadline) => break,
540 _ = poll.tick() => {}
541 }
542 }
543 self.unregister_inner();
544 Ok(())
545 }
546
547 fn unregister_inner(&mut self) {
548 if self.unregistered {
549 return;
550 }
551 self.unregistered = true;
552 self.router.unregister(self.daemon_id);
557 let _ = self.registry.unregister(self.daemon_id);
558 }
559}
560
561impl Drop for MeshOsDaemonHandle {
562 fn drop(&mut self) {
563 self.unregister_inner();
567 }
568}
569
570pub struct MeshOsDaemonSdk {
580 runtime: MeshOsRuntime,
581 router: DaemonControlRouter,
582 control_capacity: usize,
583}
584
585impl MeshOsDaemonSdk {
586 pub fn start<D: ActionDispatcher>(config: MeshOsConfig, user_dispatcher: Arc<D>) -> Self {
590 let router = DaemonControlRouter::new();
591 let routed = Arc::new(SdkRoutingDispatcher::new(user_dispatcher, router.clone()));
592 let sink: Arc<dyn super::control::ControlSink> =
593 Arc::new(RouterControlSink::new(router.clone()));
594 let runtime = MeshOsRuntime::start_with_options(
595 config,
596 routed,
597 super::event_loop::ProbeRegistry::new(),
598 super::scheduler::SchedulerRegistry::new(),
599 Arc::new(DaemonRegistry::new()),
600 Some(sink),
601 );
602 Self {
603 runtime,
604 router,
605 control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
606 }
607 }
608
609 pub fn from_runtime(runtime: MeshOsRuntime, router: DaemonControlRouter) -> Self {
613 Self {
614 runtime,
615 router,
616 control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
617 }
618 }
619
620 pub fn start_with_verifier<D: ActionDispatcher>(
628 config: MeshOsConfig,
629 user_dispatcher: Arc<D>,
630 verifier: Arc<super::ice::AdminVerifier>,
631 ) -> Self {
632 Self::start_with_verifier_and_migration_source(
633 config,
634 user_dispatcher,
635 Some(verifier),
636 None,
637 )
638 }
639
640 pub fn start_with_verifier_and_migration_source<D: ActionDispatcher>(
652 config: MeshOsConfig,
653 user_dispatcher: Arc<D>,
654 verifier: Option<Arc<super::ice::AdminVerifier>>,
655 migration_snapshot_source: Option<
656 Arc<dyn super::migration_snapshot_source::MigrationSnapshotSource>,
657 >,
658 ) -> Self {
659 let router = DaemonControlRouter::new();
660 let routed = Arc::new(SdkRoutingDispatcher::new(user_dispatcher, router.clone()));
661 let sink: Arc<dyn super::control::ControlSink> =
662 Arc::new(RouterControlSink::new(router.clone()));
663 let runtime = MeshOsRuntime::start_with_full_extensions(
664 config,
665 routed,
666 super::event_loop::ProbeRegistry::new(),
667 super::scheduler::SchedulerRegistry::new(),
668 Arc::new(DaemonRegistry::new()),
669 Some(sink),
670 verifier,
671 None, None, None, None, migration_snapshot_source,
676 );
677 Self {
678 runtime,
679 router,
680 control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
681 }
682 }
683
684 pub fn with_control_capacity(mut self, capacity: usize) -> Self {
689 self.control_capacity = capacity.max(1);
690 self
691 }
692
693 pub fn runtime(&self) -> &MeshOsRuntime {
695 &self.runtime
696 }
697
698 pub fn router(&self) -> &DaemonControlRouter {
700 &self.router
701 }
702
703 pub fn register_daemon(
707 &self,
708 daemon: Box<dyn MeshDaemon>,
709 keypair: EntityKeypair,
710 ) -> Result<MeshOsDaemonHandle, SdkError> {
711 let daemon_id = keypair.origin_hash();
712 let daemon_name = daemon.name().to_string();
713 let host = DaemonHost::new(daemon, keypair, DaemonHostConfig::default());
714 self.runtime
715 .daemon_registry()
716 .register(host)
717 .map_err(SdkError::from)?;
718 let control_rx = self.router.register(daemon_id, self.control_capacity);
719 let snap = self.runtime.snapshot();
720 let metadata = MetadataView {
721 node_id: self.runtime_this_node(),
722 daemon_id,
723 daemon_name: daemon_name.clone(),
724 maintenance_state: MaintenanceStateView::Active,
725 peers: snap.peers,
726 };
727 Ok(MeshOsDaemonHandle {
728 daemon_id,
729 daemon_name,
730 control_rx,
731 registry: Arc::clone(self.runtime.daemon_registry()),
732 router: self.router.clone(),
733 metadata,
734 runtime_snapshot_reader: self.runtime.snapshot_reader().clone(),
735 mesh_handle: self.runtime.handle_clone(),
736 this_node: self.runtime_this_node(),
737 unregistered: false,
738 })
739 }
740
741 pub fn dropped_control_events(&self) -> u64 {
744 self.router.total_dropped()
745 }
746
747 pub async fn shutdown(self) -> Result<RuntimeStats, RuntimeShutdownError> {
749 self.runtime.shutdown().await
750 }
751
752 fn runtime_this_node(&self) -> NodeId {
757 self.runtime.this_node()
758 }
759}
760
761#[macro_export]
783macro_rules! daemon_main {
784 (
785 daemon: $daemon:expr,
786 keypair: $keypair:expr,
787 config: $config:expr,
788 dispatcher: $dispatcher:expr $(,)?
789 ) => {{
790 let sdk = $crate::adapter::net::behavior::meshos::sdk::MeshOsDaemonSdk::start(
791 $config,
792 $dispatcher,
793 );
794 let mut handle = sdk
795 .register_daemon(Box::new($daemon), $keypair)
796 .expect("daemon registration failed");
797 while let Some(ev) = handle.next_control().await {
798 use $crate::adapter::net::compute::DaemonControl;
799 if matches!(
800 ev,
801 DaemonControl::Shutdown { .. } | DaemonControl::DrainFinish
802 ) {
803 break;
804 }
805 }
806 let grace = $crate::adapter::net::behavior::meshos::sdk::DEFAULT_GRACEFUL_SHUTDOWN;
807 let _ = handle.graceful_shutdown(grace).await;
808 let _ = sdk.shutdown().await;
809 }};
810}
811
812#[cfg(test)]
813#[allow(clippy::field_reassign_with_default)]
814mod tests {
815 use std::sync::atomic::AtomicUsize;
816
817 use bytes::Bytes;
818
819 use super::*;
820 use crate::adapter::net::behavior::capability::CapabilityFilter;
821 use crate::adapter::net::behavior::meshos::action::ActionId;
822 use crate::adapter::net::behavior::meshos::executor::LoggingDispatcher;
823 use crate::adapter::net::behavior::meshos::PendingAction;
824 use crate::adapter::net::compute::{DaemonError, MeshDaemon};
825 use crate::adapter::net::state::causal::CausalEvent;
826
827 struct NoopDaemon {
829 name: String,
830 process_count: Arc<AtomicUsize>,
831 }
832 impl NoopDaemon {
833 fn new(name: &str) -> (Self, Arc<AtomicUsize>) {
834 let counter = Arc::new(AtomicUsize::new(0));
835 (
836 Self {
837 name: name.into(),
838 process_count: Arc::clone(&counter),
839 },
840 counter,
841 )
842 }
843 }
844 impl MeshDaemon for NoopDaemon {
845 fn name(&self) -> &str {
846 &self.name
847 }
848 fn requirements(&self) -> CapabilityFilter {
849 CapabilityFilter::default()
850 }
851 fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
852 self.process_count.fetch_add(1, Ordering::Relaxed);
853 Ok(Vec::new())
854 }
855 }
856
857 fn fast_config() -> MeshOsConfig {
858 let mut cfg = MeshOsConfig::default();
859 cfg.tick_interval = Duration::from_millis(10);
860 cfg
861 }
862
863 #[tokio::test]
864 async fn register_daemon_returns_handle_with_correct_identity() {
865 let dispatcher = Arc::new(LoggingDispatcher::new());
866 let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
867 let (daemon, _counter) = NoopDaemon::new("telemetry");
868 let kp = EntityKeypair::generate();
869 let expected_id = kp.origin_hash();
870 let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
871 assert_eq!(handle.daemon_id(), expected_id);
872 assert_eq!(handle.daemon_name(), "telemetry");
873 let _ = sdk.shutdown().await;
874 }
875
876 #[tokio::test]
877 async fn control_router_routes_stop_daemon_to_per_daemon_channel() {
878 let router = DaemonControlRouter::new();
879 let mut rx = router.register(42, 4);
880 router.route(
883 42,
884 DaemonControl::Shutdown {
885 grace_period_ms: 5000,
886 },
887 );
888 let ev = rx.try_recv().expect("event present");
889 assert!(matches!(
890 ev,
891 DaemonControl::Shutdown {
892 grace_period_ms: 5000
893 }
894 ));
895 }
896
897 #[tokio::test]
898 async fn control_router_drops_when_channel_full() {
899 let router = DaemonControlRouter::new();
900 let _rx = router.register(99, 1);
901 router.route(99, DaemonControl::BackpressureOn { level: 0.5 });
902 router.route(99, DaemonControl::BackpressureOn { level: 0.8 });
904 assert_eq!(router.total_dropped(), 1);
905 }
906
907 #[tokio::test]
908 async fn translate_to_control_emits_shutdown_for_stop_daemon() {
909 let router = DaemonControlRouter::new();
910 let mut rx = router.register(7, 4);
911 let action = MeshOsAction::StopDaemon {
912 daemon: super::super::event::DaemonRef {
913 id: 7,
914 name: "x".into(),
915 },
916 reason: "intent-stop".into(),
917 deadline: Instant::now() + Duration::from_millis(2500),
918 };
919 translate_to_control(&router, &action);
920 let ev = rx.try_recv().expect("translated to control event");
921 match ev {
922 DaemonControl::Shutdown { grace_period_ms } => {
923 assert!((2400..=2500).contains(&grace_period_ms));
925 }
926 other => panic!("expected Shutdown, got {other:?}"),
927 }
928 }
929
930 #[tokio::test]
931 async fn router_control_sink_broadcasts_drain_start_to_every_registered_daemon() {
932 use super::super::control::{ControlSink, MeshOsControl};
937 let router = DaemonControlRouter::new();
938 let mut rx_a = router.register(1, 4);
939 let mut rx_b = router.register(2, 4);
940 let sink = RouterControlSink::new(router.clone());
941 sink.emit(MeshOsControl::DrainStart {
942 deadline: std::time::Instant::now() + std::time::Duration::from_secs(30),
943 });
944 let ev_a = rx_a.try_recv().expect("daemon A received drain start");
945 let ev_b = rx_b.try_recv().expect("daemon B received drain start");
946 assert!(matches!(ev_a, DaemonControl::DrainStart { .. }));
947 assert!(matches!(ev_b, DaemonControl::DrainStart { .. }));
948 }
949
950 #[tokio::test]
951 async fn unregister_removes_router_slot() {
952 let router = DaemonControlRouter::new();
953 let _rx = router.register(7, 4);
954 router.unregister(7);
955 router.route(7, DaemonControl::Shutdown { grace_period_ms: 1 });
958 assert_eq!(router.total_dropped(), 0);
959 }
960
961 #[tokio::test]
962 async fn publish_log_lands_on_runtime_log_ring_tagged_with_daemon_id() {
963 let dispatcher = Arc::new(LoggingDispatcher::new());
968 let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
969 let (daemon, _) = NoopDaemon::new("logger");
970 let kp = EntityKeypair::generate();
971 let daemon_id = kp.origin_hash();
972 let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
973
974 handle
975 .publish_log(
976 super::super::logs::LogLevel::Warn,
977 "throttling: queue depth high",
978 )
979 .expect("publish_log");
980
981 tokio::time::sleep(Duration::from_millis(80)).await;
983 let snap = sdk.runtime().snapshot();
984 let matching: Vec<_> = snap
985 .log_ring
986 .iter()
987 .filter(|r| r.daemon_id == Some(daemon_id))
988 .collect();
989 assert_eq!(matching.len(), 1, "expected one log line for this daemon");
990 let record = matching[0];
991 assert_eq!(record.level, super::super::logs::LogLevel::Warn);
992 assert_eq!(record.message, "throttling: queue depth high");
993 let _ = sdk.shutdown().await;
994 }
995
996 #[tokio::test]
997 async fn publish_log_after_runtime_shutdown_returns_loop_closed() {
998 let dispatcher = Arc::new(LoggingDispatcher::new());
999 let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1000 let (daemon, _) = NoopDaemon::new("logger");
1001 let kp = EntityKeypair::generate();
1002 let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1003 let _ = sdk.shutdown().await;
1004 let err = handle
1005 .publish_log(super::super::logs::LogLevel::Info, "after shutdown")
1006 .expect_err("publish after shutdown should fail");
1007 assert_eq!(err.kind, "loop_closed");
1008 }
1009
1010 #[tokio::test]
1011 async fn handle_drop_unregisters_from_registry_and_router() {
1012 let dispatcher = Arc::new(LoggingDispatcher::new());
1013 let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1014 let registry = Arc::clone(sdk.runtime.daemon_registry());
1015 let (daemon, _) = NoopDaemon::new("temp");
1016 let kp = EntityKeypair::generate();
1017 let daemon_id = kp.origin_hash();
1018 let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1019 drop(handle);
1020 assert!(matches!(
1023 registry.unregister(daemon_id),
1024 Err(DaemonError::NotFound(_))
1025 ));
1026 let _ = sdk.shutdown().await;
1027 }
1028
1029 #[tokio::test]
1030 async fn graceful_shutdown_sends_shutdown_control_then_unregisters() {
1031 let dispatcher = Arc::new(LoggingDispatcher::new());
1032 let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1033 let (daemon, _) = NoopDaemon::new("graceful");
1034 let kp = EntityKeypair::generate();
1035 let mut handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1036 let mut control_rx =
1039 std::mem::replace(&mut handle.control_rx, mpsc::channel::<DaemonControl>(1).1);
1040 let received = tokio::spawn(async move { control_rx.recv().await });
1041 let _ = handle.graceful_shutdown(Duration::from_millis(50)).await;
1045 let ev = received.await.unwrap();
1046 assert!(matches!(ev, Some(DaemonControl::Shutdown { .. })));
1047 let _ = sdk.shutdown().await;
1048 }
1049
1050 #[tokio::test]
1051 async fn publish_capabilities_returns_ok_pending_capability_chain_wiring() {
1052 let dispatcher = Arc::new(LoggingDispatcher::new());
1053 let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1054 let (daemon, _) = NoopDaemon::new("noop");
1055 let kp = EntityKeypair::generate();
1056 let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1057 let result = handle.publish_capabilities(CapabilitySet::default());
1059 assert!(result.is_ok());
1060 let _ = sdk.shutdown().await;
1061 }
1062
1063 #[tokio::test]
1064 async fn refresh_metadata_pulls_from_runtime_snapshot() {
1065 let dispatcher = Arc::new(LoggingDispatcher::new());
1066 let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
1067 let (daemon, _) = NoopDaemon::new("inspect");
1068 let kp = EntityKeypair::generate();
1069 let mut handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
1070 tokio::time::sleep(Duration::from_millis(100)).await;
1072 let view = handle.refresh_metadata();
1073 assert_eq!(view.daemon_name, "inspect");
1074 assert!(matches!(
1078 view.maintenance_state,
1079 MaintenanceStateView::Active
1080 ));
1081 let _ = sdk.shutdown().await;
1082 }
1083
1084 #[test]
1085 fn sdk_error_display_carries_kind_discriminator() {
1086 let err = SdkError::new("register_failed", "host already registered");
1087 let formatted = format!("{err}");
1088 assert!(formatted.starts_with("<<meshos-sdk-kind:register_failed>>"));
1089 assert!(formatted.ends_with("host already registered"));
1090 }
1091
1092 #[test]
1093 fn maintenance_state_view_round_trips_active_default() {
1094 let now = Instant::now();
1095 let active = MaintenanceStateView::from_state(&MaintenanceState::Active, now);
1096 assert!(matches!(active, MaintenanceStateView::Active));
1097 }
1098
1099 #[test]
1100 fn maintenance_state_view_clamps_past_deadlines_to_zero() {
1101 let now = Instant::now();
1102 let state = MaintenanceState::EnteringMaintenance {
1103 since: now - Duration::from_secs(5),
1104 deadline: Some(now - Duration::from_secs(1)),
1105 };
1106 let view = MaintenanceStateView::from_state(&state, now);
1107 match view {
1108 MaintenanceStateView::EnteringMaintenance {
1109 deadline_remaining_ms,
1110 ..
1111 } => assert_eq!(deadline_remaining_ms, Some(0)),
1112 other => panic!("expected EnteringMaintenance, got {other:?}"),
1113 }
1114 }
1115
1116 #[allow(dead_code)]
1120 fn _pin(_p: PendingAction, _a: ActionId, _f: super::super::event::DaemonRef) {}
1121}