use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::future::BoxFuture;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use crate::adapter::net::behavior::capability::CapabilitySet;
use crate::adapter::net::compute::{
DaemonControl, DaemonError, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
};
use crate::adapter::net::identity::EntityKeypair;
use super::action::MeshOsAction;
use super::config::MeshOsConfig;
use super::event::NodeId;
use super::executor::{ActionDispatcher, DispatchError};
use super::maintenance::MaintenanceState;
use super::runtime::{MeshOsRuntime, RuntimeShutdownError, RuntimeStats};
use super::snapshot::PeerSnapshot;
pub const DEFAULT_CONTROL_CHANNEL_CAPACITY: usize = 8;
pub const DEFAULT_GRACEFUL_SHUTDOWN: Duration = Duration::from_secs(5);
#[derive(Clone, Debug, thiserror::Error)]
#[error("<<meshos-sdk-kind:{kind}>>{message}")]
pub struct SdkError {
pub kind: &'static str,
pub message: String,
}
impl SdkError {
fn new(kind: &'static str, message: impl Into<String>) -> Self {
Self {
kind,
message: message.into(),
}
}
}
impl From<DaemonError> for SdkError {
fn from(err: DaemonError) -> Self {
Self::new("register_failed", err.to_string())
}
}
#[derive(Clone, Debug)]
pub struct MetadataView {
pub node_id: NodeId,
pub daemon_id: u64,
pub daemon_name: String,
pub maintenance_state: MaintenanceStateView,
pub peers: BTreeMap<NodeId, PeerSnapshot>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum MaintenanceStateView {
Active,
EnteringMaintenance {
since_ms: u64,
deadline_remaining_ms: Option<u64>,
},
Maintenance {
since_ms: u64,
},
ExitingMaintenance {
since_ms: u64,
},
DrainFailed {
since_ms: u64,
reason: String,
},
Recovery {
since_ms: u64,
},
}
impl MaintenanceStateView {
pub fn from_state(state: &MaintenanceState, now: Instant) -> Self {
match state {
MaintenanceState::Active => Self::Active,
MaintenanceState::EnteringMaintenance { since, deadline } => {
Self::EnteringMaintenance {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
deadline_remaining_ms: deadline
.map(|d| d.saturating_duration_since(now).as_millis() as u64),
}
}
MaintenanceState::Maintenance { since } => Self::Maintenance {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
},
MaintenanceState::ExitingMaintenance { since } => Self::ExitingMaintenance {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
},
MaintenanceState::DrainFailed { since, reason } => Self::DrainFailed {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
reason: reason.clone(),
},
MaintenanceState::Recovery { since } => Self::Recovery {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
},
}
}
}
#[derive(Debug)]
struct DaemonControlSlot {
tx: mpsc::Sender<DaemonControl>,
dropped: AtomicU64,
}
#[derive(Clone, Default)]
pub struct DaemonControlRouter {
inner: Arc<RwLock<BTreeMap<u64, Arc<DaemonControlSlot>>>>,
broadcast: Arc<RwLock<Vec<Arc<DaemonControlSlot>>>>,
}
impl DaemonControlRouter {
pub fn new() -> Self {
Self::default()
}
fn register(&self, daemon_id: u64, capacity: usize) -> mpsc::Receiver<DaemonControl> {
let (tx, rx) = mpsc::channel(capacity);
let slot = Arc::new(DaemonControlSlot {
tx,
dropped: AtomicU64::new(0),
});
self.inner.write().insert(daemon_id, Arc::clone(&slot));
self.broadcast.write().push(slot);
rx
}
fn unregister(&self, daemon_id: u64) {
let removed = self.inner.write().remove(&daemon_id);
if let Some(removed) = removed {
self.broadcast.write().retain(|s| !Arc::ptr_eq(s, &removed));
}
}
fn route(&self, daemon_id: u64, event: DaemonControl) {
let slot = self.inner.read().get(&daemon_id).cloned();
if let Some(slot) = slot {
if slot.tx.try_send(event).is_err() {
slot.dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
fn broadcast(&self, event: DaemonControl) {
let slots = self.broadcast.read().clone();
for slot in slots {
if slot.tx.try_send(event.clone()).is_err() {
slot.dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn total_dropped(&self) -> u64 {
let map = self.inner.read();
map.values()
.map(|slot| slot.dropped.load(Ordering::Relaxed))
.sum()
}
}
pub struct SdkRoutingDispatcher<D: ActionDispatcher> {
inner: Arc<D>,
router: DaemonControlRouter,
}
impl<D: ActionDispatcher> SdkRoutingDispatcher<D> {
pub fn new(inner: Arc<D>, router: DaemonControlRouter) -> Self {
Self { inner, router }
}
}
impl<D: ActionDispatcher> ActionDispatcher for SdkRoutingDispatcher<D> {
fn dispatch<'a>(&'a self, action: MeshOsAction) -> BoxFuture<'a, Result<(), DispatchError>> {
let router = self.router.clone();
let action_clone = action.clone();
let inner = Arc::clone(&self.inner);
Box::pin(async move {
translate_to_control(&router, &action_clone);
inner.dispatch(action).await
})
}
}
fn translate_to_control(router: &DaemonControlRouter, action: &MeshOsAction) {
let now = Instant::now();
if let MeshOsAction::StopDaemon {
daemon, deadline, ..
} = action
{
let grace_period_ms = deadline.saturating_duration_since(now).as_millis() as u64;
router.route(daemon.id, DaemonControl::Shutdown { grace_period_ms });
}
}
pub(super) struct RouterControlSink {
router: DaemonControlRouter,
}
impl RouterControlSink {
pub(super) fn new(router: DaemonControlRouter) -> Self {
Self { router }
}
}
impl super::control::ControlSink for RouterControlSink {
fn emit(&self, event: super::control::MeshOsControl) {
let now = Instant::now();
self.router.broadcast(event.to_daemon_control(now));
}
}
pub struct MeshOsDaemonHandle {
daemon_id: u64,
daemon_name: String,
control_rx: mpsc::Receiver<DaemonControl>,
registry: Arc<DaemonRegistry>,
router: DaemonControlRouter,
metadata: MetadataView,
runtime_snapshot_reader: super::event_loop::MeshOsSnapshotReader,
mesh_handle: super::event_loop::MeshOsHandle,
this_node: NodeId,
unregistered: bool,
}
impl MeshOsDaemonHandle {
pub async fn next_control(&mut self) -> Option<DaemonControl> {
self.control_rx.recv().await
}
pub fn try_next_control(&mut self) -> Option<DaemonControl> {
self.control_rx.try_recv().ok()
}
pub fn daemon_id(&self) -> u64 {
self.daemon_id
}
pub fn daemon_name(&self) -> &str {
&self.daemon_name
}
pub fn metadata(&self) -> &MetadataView {
&self.metadata
}
pub fn refresh_metadata(&mut self) -> &MetadataView {
let snap = self.runtime_snapshot_reader.read();
let maint = match snap.local_maintenance {
super::snapshot::MaintenanceStateSnapshot::Active => MaintenanceStateView::Active,
super::snapshot::MaintenanceStateSnapshot::EnteringMaintenance {
since_ms,
deadline_remaining_ms,
} => MaintenanceStateView::EnteringMaintenance {
since_ms,
deadline_remaining_ms,
},
super::snapshot::MaintenanceStateSnapshot::Maintenance { since_ms } => {
MaintenanceStateView::Maintenance { since_ms }
}
super::snapshot::MaintenanceStateSnapshot::ExitingMaintenance { since_ms } => {
MaintenanceStateView::ExitingMaintenance { since_ms }
}
super::snapshot::MaintenanceStateSnapshot::DrainFailed { since_ms, reason } => {
MaintenanceStateView::DrainFailed { since_ms, reason }
}
super::snapshot::MaintenanceStateSnapshot::Recovery { since_ms } => {
MaintenanceStateView::Recovery { since_ms }
}
};
self.metadata = MetadataView {
node_id: self.this_node,
daemon_id: self.daemon_id,
daemon_name: self.daemon_name.clone(),
maintenance_state: maint,
peers: snap.peers,
};
&self.metadata
}
pub fn publish_capabilities(&self, _caps: CapabilitySet) -> Result<(), SdkError> {
Ok(())
}
pub fn publish_log(
&self,
level: super::logs::LogLevel,
message: impl Into<String>,
) -> Result<(), SdkError> {
let line = super::logs::LogLine {
level,
daemon_id: Some(self.daemon_id),
message: message.into(),
};
self.mesh_handle
.try_publish(super::event::MeshOsEvent::LogLine(line))
.map_err(|e| match e {
super::event_loop::MeshOsHandleError::LoopClosed => SdkError::new(
"loop_closed",
"MeshOS loop has exited; daemon log line dropped",
),
super::event_loop::MeshOsHandleError::QueueFull => SdkError::new(
"queue_full",
"MeshOS event queue at capacity; daemon log line dropped",
),
})
}
pub async fn graceful_shutdown(mut self, grace: Duration) -> Result<(), SdkError> {
let grace_ms = grace.as_millis() as u64;
self.router.route(
self.daemon_id,
DaemonControl::Shutdown {
grace_period_ms: grace_ms,
},
);
let deadline = tokio::time::Instant::now() + grace;
let mut poll = tokio::time::interval(Duration::from_millis(50));
poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
if !self.registry.contains(self.daemon_id) {
break;
}
if tokio::time::Instant::now() >= deadline {
break;
}
tokio::select! {
_ = tokio::time::sleep_until(deadline) => break,
_ = poll.tick() => {}
}
}
self.unregister_inner();
Ok(())
}
fn unregister_inner(&mut self) {
if self.unregistered {
return;
}
self.unregistered = true;
self.router.unregister(self.daemon_id);
let _ = self.registry.unregister(self.daemon_id);
}
}
impl Drop for MeshOsDaemonHandle {
fn drop(&mut self) {
self.unregister_inner();
}
}
pub struct MeshOsDaemonSdk {
runtime: MeshOsRuntime,
router: DaemonControlRouter,
control_capacity: usize,
}
impl MeshOsDaemonSdk {
pub fn start<D: ActionDispatcher>(config: MeshOsConfig, user_dispatcher: Arc<D>) -> Self {
let router = DaemonControlRouter::new();
let routed = Arc::new(SdkRoutingDispatcher::new(user_dispatcher, router.clone()));
let sink: Arc<dyn super::control::ControlSink> =
Arc::new(RouterControlSink::new(router.clone()));
let runtime = MeshOsRuntime::start_with_options(
config,
routed,
super::event_loop::ProbeRegistry::new(),
super::scheduler::SchedulerRegistry::new(),
Arc::new(DaemonRegistry::new()),
Some(sink),
);
Self {
runtime,
router,
control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
}
}
pub fn from_runtime(runtime: MeshOsRuntime, router: DaemonControlRouter) -> Self {
Self {
runtime,
router,
control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
}
}
pub fn start_with_verifier<D: ActionDispatcher>(
config: MeshOsConfig,
user_dispatcher: Arc<D>,
verifier: Arc<super::ice::AdminVerifier>,
) -> Self {
Self::start_with_verifier_and_migration_source(
config,
user_dispatcher,
Some(verifier),
None,
)
}
pub fn start_with_verifier_and_migration_source<D: ActionDispatcher>(
config: MeshOsConfig,
user_dispatcher: Arc<D>,
verifier: Option<Arc<super::ice::AdminVerifier>>,
migration_snapshot_source: Option<
Arc<dyn super::migration_snapshot_source::MigrationSnapshotSource>,
>,
) -> Self {
let router = DaemonControlRouter::new();
let routed = Arc::new(SdkRoutingDispatcher::new(user_dispatcher, router.clone()));
let sink: Arc<dyn super::control::ControlSink> =
Arc::new(RouterControlSink::new(router.clone()));
let runtime = MeshOsRuntime::start_with_full_extensions(
config,
routed,
super::event_loop::ProbeRegistry::new(),
super::scheduler::SchedulerRegistry::new(),
Arc::new(DaemonRegistry::new()),
Some(sink),
verifier,
None, None, None, None, migration_snapshot_source,
);
Self {
runtime,
router,
control_capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
}
}
pub fn with_control_capacity(mut self, capacity: usize) -> Self {
self.control_capacity = capacity.max(1);
self
}
pub fn runtime(&self) -> &MeshOsRuntime {
&self.runtime
}
pub fn router(&self) -> &DaemonControlRouter {
&self.router
}
pub fn register_daemon(
&self,
daemon: Box<dyn MeshDaemon>,
keypair: EntityKeypair,
) -> Result<MeshOsDaemonHandle, SdkError> {
let daemon_id = keypair.origin_hash();
let daemon_name = daemon.name().to_string();
let host = DaemonHost::new(daemon, keypair, DaemonHostConfig::default());
self.runtime
.daemon_registry()
.register(host)
.map_err(SdkError::from)?;
let control_rx = self.router.register(daemon_id, self.control_capacity);
let snap = self.runtime.snapshot();
let metadata = MetadataView {
node_id: self.runtime_this_node(),
daemon_id,
daemon_name: daemon_name.clone(),
maintenance_state: MaintenanceStateView::Active,
peers: snap.peers,
};
Ok(MeshOsDaemonHandle {
daemon_id,
daemon_name,
control_rx,
registry: Arc::clone(self.runtime.daemon_registry()),
router: self.router.clone(),
metadata,
runtime_snapshot_reader: self.runtime.snapshot_reader().clone(),
mesh_handle: self.runtime.handle_clone(),
this_node: self.runtime_this_node(),
unregistered: false,
})
}
pub fn dropped_control_events(&self) -> u64 {
self.router.total_dropped()
}
pub async fn shutdown(self) -> Result<RuntimeStats, RuntimeShutdownError> {
self.runtime.shutdown().await
}
fn runtime_this_node(&self) -> NodeId {
self.runtime.this_node()
}
}
#[macro_export]
macro_rules! daemon_main {
(
daemon: $daemon:expr,
keypair: $keypair:expr,
config: $config:expr,
dispatcher: $dispatcher:expr $(,)?
) => {{
let sdk = $crate::adapter::net::behavior::meshos::sdk::MeshOsDaemonSdk::start(
$config,
$dispatcher,
);
let mut handle = sdk
.register_daemon(Box::new($daemon), $keypair)
.expect("daemon registration failed");
while let Some(ev) = handle.next_control().await {
use $crate::adapter::net::compute::DaemonControl;
if matches!(
ev,
DaemonControl::Shutdown { .. } | DaemonControl::DrainFinish
) {
break;
}
}
let grace = $crate::adapter::net::behavior::meshos::sdk::DEFAULT_GRACEFUL_SHUTDOWN;
let _ = handle.graceful_shutdown(grace).await;
let _ = sdk.shutdown().await;
}};
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use std::sync::atomic::AtomicUsize;
use bytes::Bytes;
use super::*;
use crate::adapter::net::behavior::capability::CapabilityFilter;
use crate::adapter::net::behavior::meshos::action::ActionId;
use crate::adapter::net::behavior::meshos::executor::LoggingDispatcher;
use crate::adapter::net::behavior::meshos::PendingAction;
use crate::adapter::net::compute::{DaemonError, MeshDaemon};
use crate::adapter::net::state::causal::CausalEvent;
struct NoopDaemon {
name: String,
process_count: Arc<AtomicUsize>,
}
impl NoopDaemon {
fn new(name: &str) -> (Self, Arc<AtomicUsize>) {
let counter = Arc::new(AtomicUsize::new(0));
(
Self {
name: name.into(),
process_count: Arc::clone(&counter),
},
counter,
)
}
}
impl MeshDaemon for NoopDaemon {
fn name(&self) -> &str {
&self.name
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
self.process_count.fetch_add(1, Ordering::Relaxed);
Ok(Vec::new())
}
}
fn fast_config() -> MeshOsConfig {
let mut cfg = MeshOsConfig::default();
cfg.tick_interval = Duration::from_millis(10);
cfg
}
#[tokio::test]
async fn register_daemon_returns_handle_with_correct_identity() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
let (daemon, _counter) = NoopDaemon::new("telemetry");
let kp = EntityKeypair::generate();
let expected_id = kp.origin_hash();
let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
assert_eq!(handle.daemon_id(), expected_id);
assert_eq!(handle.daemon_name(), "telemetry");
let _ = sdk.shutdown().await;
}
#[tokio::test]
async fn control_router_routes_stop_daemon_to_per_daemon_channel() {
let router = DaemonControlRouter::new();
let mut rx = router.register(42, 4);
router.route(
42,
DaemonControl::Shutdown {
grace_period_ms: 5000,
},
);
let ev = rx.try_recv().expect("event present");
assert!(matches!(
ev,
DaemonControl::Shutdown {
grace_period_ms: 5000
}
));
}
#[tokio::test]
async fn control_router_drops_when_channel_full() {
let router = DaemonControlRouter::new();
let _rx = router.register(99, 1);
router.route(99, DaemonControl::BackpressureOn { level: 0.5 });
router.route(99, DaemonControl::BackpressureOn { level: 0.8 });
assert_eq!(router.total_dropped(), 1);
}
#[tokio::test]
async fn translate_to_control_emits_shutdown_for_stop_daemon() {
let router = DaemonControlRouter::new();
let mut rx = router.register(7, 4);
let action = MeshOsAction::StopDaemon {
daemon: super::super::event::DaemonRef {
id: 7,
name: "x".into(),
},
reason: "intent-stop".into(),
deadline: Instant::now() + Duration::from_millis(2500),
};
translate_to_control(&router, &action);
let ev = rx.try_recv().expect("translated to control event");
match ev {
DaemonControl::Shutdown { grace_period_ms } => {
assert!((2400..=2500).contains(&grace_period_ms));
}
other => panic!("expected Shutdown, got {other:?}"),
}
}
#[tokio::test]
async fn router_control_sink_broadcasts_drain_start_to_every_registered_daemon() {
use super::super::control::{ControlSink, MeshOsControl};
let router = DaemonControlRouter::new();
let mut rx_a = router.register(1, 4);
let mut rx_b = router.register(2, 4);
let sink = RouterControlSink::new(router.clone());
sink.emit(MeshOsControl::DrainStart {
deadline: std::time::Instant::now() + std::time::Duration::from_secs(30),
});
let ev_a = rx_a.try_recv().expect("daemon A received drain start");
let ev_b = rx_b.try_recv().expect("daemon B received drain start");
assert!(matches!(ev_a, DaemonControl::DrainStart { .. }));
assert!(matches!(ev_b, DaemonControl::DrainStart { .. }));
}
#[tokio::test]
async fn unregister_removes_router_slot() {
let router = DaemonControlRouter::new();
let _rx = router.register(7, 4);
router.unregister(7);
router.route(7, DaemonControl::Shutdown { grace_period_ms: 1 });
assert_eq!(router.total_dropped(), 0);
}
#[tokio::test]
async fn publish_log_lands_on_runtime_log_ring_tagged_with_daemon_id() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
let (daemon, _) = NoopDaemon::new("logger");
let kp = EntityKeypair::generate();
let daemon_id = kp.origin_hash();
let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
handle
.publish_log(
super::super::logs::LogLevel::Warn,
"throttling: queue depth high",
)
.expect("publish_log");
tokio::time::sleep(Duration::from_millis(80)).await;
let snap = sdk.runtime().snapshot();
let matching: Vec<_> = snap
.log_ring
.iter()
.filter(|r| r.daemon_id == Some(daemon_id))
.collect();
assert_eq!(matching.len(), 1, "expected one log line for this daemon");
let record = matching[0];
assert_eq!(record.level, super::super::logs::LogLevel::Warn);
assert_eq!(record.message, "throttling: queue depth high");
let _ = sdk.shutdown().await;
}
#[tokio::test]
async fn publish_log_after_runtime_shutdown_returns_loop_closed() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
let (daemon, _) = NoopDaemon::new("logger");
let kp = EntityKeypair::generate();
let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
let _ = sdk.shutdown().await;
let err = handle
.publish_log(super::super::logs::LogLevel::Info, "after shutdown")
.expect_err("publish after shutdown should fail");
assert_eq!(err.kind, "loop_closed");
}
#[tokio::test]
async fn handle_drop_unregisters_from_registry_and_router() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
let registry = Arc::clone(sdk.runtime.daemon_registry());
let (daemon, _) = NoopDaemon::new("temp");
let kp = EntityKeypair::generate();
let daemon_id = kp.origin_hash();
let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
drop(handle);
assert!(matches!(
registry.unregister(daemon_id),
Err(DaemonError::NotFound(_))
));
let _ = sdk.shutdown().await;
}
#[tokio::test]
async fn graceful_shutdown_sends_shutdown_control_then_unregisters() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
let (daemon, _) = NoopDaemon::new("graceful");
let kp = EntityKeypair::generate();
let mut handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
let mut control_rx =
std::mem::replace(&mut handle.control_rx, mpsc::channel::<DaemonControl>(1).1);
let received = tokio::spawn(async move { control_rx.recv().await });
let _ = handle.graceful_shutdown(Duration::from_millis(50)).await;
let ev = received.await.unwrap();
assert!(matches!(ev, Some(DaemonControl::Shutdown { .. })));
let _ = sdk.shutdown().await;
}
#[tokio::test]
async fn publish_capabilities_returns_ok_pending_capability_chain_wiring() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
let (daemon, _) = NoopDaemon::new("noop");
let kp = EntityKeypair::generate();
let handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
let result = handle.publish_capabilities(CapabilitySet::default());
assert!(result.is_ok());
let _ = sdk.shutdown().await;
}
#[tokio::test]
async fn refresh_metadata_pulls_from_runtime_snapshot() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let sdk = MeshOsDaemonSdk::start(fast_config(), dispatcher);
let (daemon, _) = NoopDaemon::new("inspect");
let kp = EntityKeypair::generate();
let mut handle = sdk.register_daemon(Box::new(daemon), kp).unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let view = handle.refresh_metadata();
assert_eq!(view.daemon_name, "inspect");
assert!(matches!(
view.maintenance_state,
MaintenanceStateView::Active
));
let _ = sdk.shutdown().await;
}
#[test]
fn sdk_error_display_carries_kind_discriminator() {
let err = SdkError::new("register_failed", "host already registered");
let formatted = format!("{err}");
assert!(formatted.starts_with("<<meshos-sdk-kind:register_failed>>"));
assert!(formatted.ends_with("host already registered"));
}
#[test]
fn maintenance_state_view_round_trips_active_default() {
let now = Instant::now();
let active = MaintenanceStateView::from_state(&MaintenanceState::Active, now);
assert!(matches!(active, MaintenanceStateView::Active));
}
#[test]
fn maintenance_state_view_clamps_past_deadlines_to_zero() {
let now = Instant::now();
let state = MaintenanceState::EnteringMaintenance {
since: now - Duration::from_secs(5),
deadline: Some(now - Duration::from_secs(1)),
};
let view = MaintenanceStateView::from_state(&state, now);
match view {
MaintenanceStateView::EnteringMaintenance {
deadline_remaining_ms,
..
} => assert_eq!(deadline_remaining_ms, Some(0)),
other => panic!("expected EnteringMaintenance, got {other:?}"),
}
}
#[allow(dead_code)]
fn _pin(_p: PendingAction, _a: ActionId, _f: super::super::event::DaemonRef) {}
}