use bytes::Bytes;
use crate::adapter::net::behavior::capability::{CapabilityFilter, CapabilitySet};
use crate::adapter::net::state::causal::CausalEvent;
pub trait MeshDaemon: Send + Sync {
fn name(&self) -> &str;
fn requirements(&self) -> CapabilityFilter;
fn required_capabilities(&self) -> CapabilitySet {
CapabilitySet::default()
}
fn optional_capabilities(&self) -> CapabilitySet {
CapabilitySet::default()
}
fn process(&mut self, event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError>;
fn snapshot(&self) -> Option<Bytes> {
None
}
fn is_stateful(&self) -> bool {
false
}
fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
if !self.is_stateful() && !state.is_empty() {
return Err(DaemonError::RestoreFailed(format!(
"stateless daemon (is_stateful=false) cannot restore \
{}-byte snapshot — override is_stateful() + restore() \
if this daemon is actually stateful",
state.len()
)));
}
Ok(())
}
fn health(&self) -> DaemonHealth {
DaemonHealth::Healthy
}
fn saturation(&self) -> f32 {
0.0
}
fn on_control(&mut self, _event: DaemonControl) {}
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum DaemonHealth {
Healthy,
Degraded {
reason: String,
},
Unhealthy,
}
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub enum DaemonControl {
Shutdown {
grace_period_ms: u64,
},
DrainStart {
grace_period_ms: u64,
},
DrainFinish,
BackpressureOn {
level: f32,
},
BackpressureOff,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum DaemonLifecycleEvent {
Registered {
id: u64,
name: String,
at: std::time::Instant,
},
Unregistered {
id: u64,
name: String,
at: std::time::Instant,
},
Crashed {
id: u64,
name: String,
at: std::time::Instant,
reason: String,
},
HealthChanged {
id: u64,
name: String,
at: std::time::Instant,
health: DaemonHealth,
},
SaturationChanged {
id: u64,
name: String,
at: std::time::Instant,
saturation: f32,
},
}
pub trait DaemonLifecycleObserver: Send + Sync + 'static {
fn observe(&self, event: DaemonLifecycleEvent);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DaemonError {
ProcessFailed(String),
SnapshotFailed(String),
RestoreFailed(String),
NotFound(u64),
Stale(u64),
}
impl std::fmt::Display for DaemonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ProcessFailed(msg) => write!(f, "daemon process failed: {}", msg),
Self::SnapshotFailed(msg) => write!(f, "snapshot failed: {}", msg),
Self::RestoreFailed(msg) => write!(f, "restore failed: {}", msg),
Self::NotFound(id) => write!(f, "daemon not found: {:#x}", id),
Self::Stale(id) => write!(
f,
"daemon {:#x} was swapped or unregistered concurrently; mutation did not land",
id
),
}
}
}
impl std::error::Error for DaemonError {}
#[derive(Debug, Clone)]
pub struct DaemonHostConfig {
pub auto_snapshot_interval: u64,
pub max_log_entries: u32,
}
impl Default for DaemonHostConfig {
fn default() -> Self {
Self {
auto_snapshot_interval: 0,
max_log_entries: 10_000,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct DaemonStats {
pub events_processed: u64,
pub events_emitted: u64,
pub errors: u64,
pub snapshots_taken: u64,
}
#[cfg(test)]
mod tests {
use super::*;
struct BareDaemon;
impl MeshDaemon for BareDaemon {
fn name(&self) -> &str {
"bare"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
Ok(Vec::new())
}
}
struct GpuDaemon;
impl MeshDaemon for GpuDaemon {
fn name(&self) -> &str {
"gpu"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn required_capabilities(&self) -> CapabilitySet {
CapabilitySet::new().add_tag("hardware.gpu")
}
fn optional_capabilities(&self) -> CapabilitySet {
CapabilitySet::new().add_tag("hardware.gpu.vram_gb=80")
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
Ok(Vec::new())
}
}
#[test]
fn required_capabilities_default_is_empty() {
let d = BareDaemon;
let req = d.required_capabilities();
assert!(req.tags.is_empty());
assert!(req.metadata.is_empty());
}
#[test]
fn optional_capabilities_default_is_empty() {
let d = BareDaemon;
let opt = d.optional_capabilities();
assert!(opt.tags.is_empty());
assert!(opt.metadata.is_empty());
}
#[test]
fn override_populates_required_and_optional() {
let d = GpuDaemon;
let req = d.required_capabilities();
let opt = d.optional_capabilities();
assert_eq!(req.tags.len(), 1);
assert!(req.tags.iter().any(|t| t.to_string() == "hardware.gpu"));
assert_eq!(opt.tags.len(), 1);
assert!(opt
.tags
.iter()
.any(|t| t.to_string() == "hardware.gpu.vram_gb=80"));
}
#[test]
fn required_capabilities_drive_artifact_daemon() {
use crate::adapter::net::behavior::placement::Artifact;
let d = GpuDaemon;
let req = d.required_capabilities();
let opt = d.optional_capabilities();
let _artifact = Artifact::Daemon {
daemon_id: [0u8; 32],
required: &req,
optional: &opt,
};
}
#[test]
fn health_default_is_healthy() {
let d = BareDaemon;
assert_eq!(d.health(), DaemonHealth::Healthy);
}
#[test]
fn saturation_default_is_zero() {
let d = BareDaemon;
assert_eq!(d.saturation(), 0.0);
}
struct WatchedDaemon {
last_control: Option<DaemonControl>,
health: DaemonHealth,
saturation: f32,
}
impl MeshDaemon for WatchedDaemon {
fn name(&self) -> &str {
"watched"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
Ok(Vec::new())
}
fn health(&self) -> DaemonHealth {
self.health.clone()
}
fn saturation(&self) -> f32 {
self.saturation
}
fn on_control(&mut self, event: DaemonControl) {
self.last_control = Some(event);
}
}
#[test]
fn override_surfaces_for_health_and_saturation() {
let d = WatchedDaemon {
last_control: None,
health: DaemonHealth::Degraded {
reason: "queue depth".into(),
},
saturation: 0.42,
};
assert!(matches!(d.health(), DaemonHealth::Degraded { .. }));
assert!((d.saturation() - 0.42).abs() < 1e-6);
}
#[test]
fn on_control_receives_supervisor_events() {
let mut d = WatchedDaemon {
last_control: None,
health: DaemonHealth::Healthy,
saturation: 0.0,
};
d.on_control(DaemonControl::Shutdown {
grace_period_ms: 5_000,
});
assert!(matches!(
d.last_control,
Some(DaemonControl::Shutdown {
grace_period_ms: 5_000
})
));
d.on_control(DaemonControl::BackpressureOn { level: 0.5 });
assert!(matches!(
d.last_control,
Some(DaemonControl::BackpressureOn { level }) if (level - 0.5).abs() < 1e-6
));
}
#[test]
fn bare_daemon_ignores_control_events_silently() {
let mut d = BareDaemon;
d.on_control(DaemonControl::DrainFinish);
d.on_control(DaemonControl::BackpressureOff);
}
#[test]
fn default_restore_rejects_nonempty_state_on_stateless_daemon() {
let mut d = BareDaemon;
assert!(!d.is_stateful());
d.restore(Bytes::new())
.expect("empty restore on stateless daemon must succeed");
let err = d
.restore(Bytes::from_static(b"surprise-snapshot-bytes"))
.expect_err("non-empty restore on stateless daemon must fail");
match err {
DaemonError::RestoreFailed(msg) => {
assert!(
msg.contains("stateless daemon"),
"error must name the daemon class: {msg}",
);
assert!(
msg.contains("23"),
"error must include the byte count for triage: {msg}",
);
}
other => panic!("expected RestoreFailed, got {:?}", other),
}
}
}