use std::collections::{BTreeMap, VecDeque};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use super::action::{MeshOsAction, PendingAction};
use super::event::{ChainId, DaemonRef, NodeId};
use super::maintenance::MaintenanceState;
use super::state::{DaemonLifecycle, DesiredState, MeshOsState};
pub const RECENT_FAILURES_CAPACITY: usize = 256;
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct MeshOsSnapshot {
pub daemons: BTreeMap<u64, DaemonSnapshot>,
pub replicas: BTreeMap<ChainId, ReplicaSnapshot>,
pub peers: BTreeMap<NodeId, PeerSnapshot>,
pub avoid_list: BTreeMap<NodeId, AvoidEntrySnapshot>,
pub local_maintenance: MaintenanceStateSnapshot,
pub recently_emitted: std::sync::Arc<[PendingActionSnapshot]>,
pub recent_failures: VecDeque<FailureRecord>,
pub freeze_remaining_ms: Option<u64>,
pub admin_audit: std::sync::Arc<[super::ice::AdminAuditRecord]>,
pub log_ring: std::sync::Arc<[super::logs::LogRecord]>,
pub in_flight_migrations: std::sync::Arc<[MigrationSnapshot]>,
pub runtime_epoch_id: u64,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct MigrationSnapshot {
pub daemon_origin: u64,
pub phase: MigrationPhaseSnapshot,
pub elapsed_ms: u64,
#[serde(default)]
pub source_node: u64,
#[serde(default)]
pub target_node: u64,
#[serde(default)]
pub age_in_phase_ms: u64,
#[serde(default)]
pub snapshot_bytes: Option<u64>,
#[serde(default)]
pub retries: u32,
#[serde(default)]
pub progress_pct: Option<u8>,
#[serde(default)]
pub buffered_events: u32,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum MigrationPhaseSnapshot {
#[default]
Snapshot,
Transfer,
Restore,
Replay,
Cutover,
Complete,
}
impl From<crate::adapter::net::compute::MigrationPhase> for MigrationPhaseSnapshot {
fn from(p: crate::adapter::net::compute::MigrationPhase) -> Self {
use crate::adapter::net::compute::MigrationPhase;
match p {
MigrationPhase::Snapshot => Self::Snapshot,
MigrationPhase::Transfer => Self::Transfer,
MigrationPhase::Restore => Self::Restore,
MigrationPhase::Replay => Self::Replay,
MigrationPhase::Cutover => Self::Cutover,
MigrationPhase::Complete => Self::Complete,
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct DaemonSnapshot {
pub name: String,
pub lifecycle: DaemonLifecycleSnapshot,
pub health: Option<DaemonHealthSnapshot>,
pub saturation: f32,
pub restart_state: RestartStateSnapshot,
#[serde(default)]
pub placement: NodeId,
#[serde(default)]
pub age_ms: u64,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum DaemonLifecycleSnapshot {
#[default]
Stopped,
Starting,
Running,
Stopping,
}
impl From<DaemonLifecycle> for DaemonLifecycleSnapshot {
fn from(l: DaemonLifecycle) -> Self {
match l {
DaemonLifecycle::Stopped => Self::Stopped,
DaemonLifecycle::Starting => Self::Starting,
DaemonLifecycle::Running => Self::Running,
DaemonLifecycle::Stopping => Self::Stopping,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum DaemonHealthSnapshot {
Healthy,
Degraded {
reason: String,
},
Unhealthy,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum RestartStateSnapshot {
#[default]
Idle,
BackingOff {
until_ms: u64,
},
CrashLooping {
until_ms: u64,
},
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicaSnapshot {
pub holders: Vec<NodeId>,
pub desired_count: Option<u32>,
pub leader: Option<NodeId>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct PeerSnapshot {
pub rtt_ms: Option<u64>,
pub health: Option<PeerHealthSnapshot>,
pub maintenance: Option<MaintenanceMirrorSnapshot>,
#[serde(default)]
pub cpu_load_1m: Option<f64>,
#[serde(default)]
pub mem_used_bytes: Option<u64>,
#[serde(default)]
pub mem_total_bytes: Option<u64>,
#[serde(default)]
pub disk_used_bytes: Option<u64>,
#[serde(default)]
pub disk_total_bytes: Option<u64>,
#[serde(default)]
pub saturation_trend: Option<f32>,
#[serde(default)]
pub capability_set: std::collections::BTreeSet<String>,
#[serde(default)]
pub software_version: Option<String>,
#[serde(default)]
pub forked_from: Option<NodeId>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum PeerHealthSnapshot {
Healthy,
Degraded,
Unreachable,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum MaintenanceMirrorSnapshot {
Active,
EnteringMaintenance,
Maintenance,
ExitingMaintenance,
DrainFailed,
Recovery,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct AvoidEntrySnapshot {
pub reason: String,
pub ttl_ms: u64,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum MaintenanceStateSnapshot {
#[default]
Active,
EnteringMaintenance {
since_ms: u64,
deadline_remaining_ms: Option<u64>,
},
Maintenance {
since_ms: u64,
},
ExitingMaintenance {
since_ms: u64,
},
DrainFailed {
since_ms: u64,
reason: String,
},
Recovery {
since_ms: u64,
},
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PendingActionSnapshot {
pub id: u64,
pub kind: String,
pub age_ms: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct FailureRecord {
pub seq: u64,
pub source: String,
pub reason: String,
pub recorded_at_ms: u64,
}
impl FailureRecord {
pub fn age_ms(&self, now_ms: u64) -> u64 {
now_ms.saturating_sub(self.recorded_at_ms)
}
}
impl MeshOsSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn from_state(
actual: &MeshOsState,
desired: &DesiredState,
pending: &[PendingAction],
recent_failures: &[FailureRecord],
in_flight_migrations: Vec<MigrationSnapshot>,
admin_audit_ring: &std::collections::VecDeque<super::ice::AdminAuditRecord>,
log_ring: &std::collections::VecDeque<super::logs::LogRecord>,
this_node: NodeId,
) -> Self {
let now = actual.last_tick.unwrap_or_else(std::time::Instant::now);
let daemons = actual
.daemons
.iter()
.map(|(d, status)| {
let started_age = status
.last_started
.map(|t| now.saturating_duration_since(t).as_millis() as u64);
let stopped_age = status
.last_exit
.into_iter()
.chain(status.last_crash)
.max()
.map(|t| now.saturating_duration_since(t).as_millis() as u64);
let age_ms = match status.lifecycle {
DaemonLifecycle::Stopped => stopped_age.or(started_age).unwrap_or(0),
_ => started_age.unwrap_or(0),
};
let snapshot = DaemonSnapshot {
name: d.name.clone(),
lifecycle: status.lifecycle.into(),
placement: this_node,
age_ms,
health: status.health.as_ref().map(|h| match h {
super::event::DaemonHealth::Healthy => DaemonHealthSnapshot::Healthy,
super::event::DaemonHealth::Degraded { reason } => {
DaemonHealthSnapshot::Degraded {
reason: reason.clone(),
}
}
super::event::DaemonHealth::Unhealthy => DaemonHealthSnapshot::Unhealthy,
}),
saturation: status.saturation,
restart_state: match status.backoff.state() {
super::supervision::RestartState::Idle => RestartStateSnapshot::Idle,
super::supervision::RestartState::BackingOff { until } => {
RestartStateSnapshot::BackingOff {
until_ms: until.saturating_duration_since(now).as_millis() as u64,
}
}
super::supervision::RestartState::CrashLooping { until } => {
RestartStateSnapshot::CrashLooping {
until_ms: until.saturating_duration_since(now).as_millis() as u64,
}
}
},
};
(d.id, snapshot)
})
.collect();
let mut replicas: BTreeMap<ChainId, ReplicaSnapshot> = BTreeMap::new();
for (chain, holders) in &actual.replicas {
replicas.insert(
*chain,
ReplicaSnapshot {
holders: holders.iter().copied().collect(),
desired_count: desired.desired_replicas.get(chain).copied(),
leader: actual.replica_leader.get(chain).copied(),
},
);
}
for (chain, count) in &desired.desired_replicas {
replicas.entry(*chain).or_default().desired_count = Some(*count);
}
let peers: BTreeMap<NodeId, PeerSnapshot> = {
let mut peers: BTreeMap<NodeId, PeerSnapshot> = BTreeMap::new();
for (peer, rtt) in &actual.rtt {
peers.entry(*peer).or_default().rtt_ms = Some(rtt.as_millis() as u64);
}
for (peer, health) in &actual.node_health {
peers.entry(*peer).or_default().health = Some(match health {
super::event::NodeHealth::Healthy => PeerHealthSnapshot::Healthy,
super::event::NodeHealth::Degraded => PeerHealthSnapshot::Degraded,
super::event::NodeHealth::Unreachable => PeerHealthSnapshot::Unreachable,
});
}
for (peer, mirror) in &actual.maintenance {
peers.entry(*peer).or_default().maintenance = Some(match mirror {
super::state::MaintenanceMirror::Active => MaintenanceMirrorSnapshot::Active,
super::state::MaintenanceMirror::EnteringMaintenance => {
MaintenanceMirrorSnapshot::EnteringMaintenance
}
super::state::MaintenanceMirror::Maintenance => {
MaintenanceMirrorSnapshot::Maintenance
}
super::state::MaintenanceMirror::ExitingMaintenance => {
MaintenanceMirrorSnapshot::ExitingMaintenance
}
super::state::MaintenanceMirror::DrainFailed => {
MaintenanceMirrorSnapshot::DrainFailed
}
super::state::MaintenanceMirror::Recovery => {
MaintenanceMirrorSnapshot::Recovery
}
});
}
for (peer, inv) in &actual.inventory {
let entry = peers.entry(*peer).or_default();
entry.cpu_load_1m = inv.cpu_load_1m;
entry.mem_used_bytes = inv.mem_used_bytes;
entry.mem_total_bytes = inv.mem_total_bytes;
entry.disk_used_bytes = inv.disk_used_bytes;
entry.disk_total_bytes = inv.disk_total_bytes;
entry.saturation_trend = inv.saturation_trend;
entry.capability_set = inv.capability_set.clone();
entry.software_version = inv.software_version.clone();
entry.forked_from = inv.forked_from;
}
peers
};
let avoid_list = actual
.avoid_list
.iter()
.map(|(peer, entry)| {
let ttl = entry.until.saturating_duration_since(now);
(
*peer,
AvoidEntrySnapshot {
reason: entry.reason.clone(),
ttl_ms: ttl.as_millis() as u64,
},
)
})
.collect();
let local_maintenance = match &actual.local_maintenance {
MaintenanceState::Active => MaintenanceStateSnapshot::Active,
MaintenanceState::EnteringMaintenance { since, deadline } => {
MaintenanceStateSnapshot::EnteringMaintenance {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
deadline_remaining_ms: deadline
.map(|d| d.saturating_duration_since(now).as_millis() as u64),
}
}
MaintenanceState::Maintenance { since } => MaintenanceStateSnapshot::Maintenance {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
},
MaintenanceState::ExitingMaintenance { since } => {
MaintenanceStateSnapshot::ExitingMaintenance {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
}
}
MaintenanceState::DrainFailed { since, reason } => {
MaintenanceStateSnapshot::DrainFailed {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
reason: reason.clone(),
}
}
MaintenanceState::Recovery { since } => MaintenanceStateSnapshot::Recovery {
since_ms: now.saturating_duration_since(*since).as_millis() as u64,
},
};
let recently_emitted: std::sync::Arc<[PendingActionSnapshot]> = pending
.iter()
.map(|p| PendingActionSnapshot {
id: p.id.0,
kind: action_kind_str(&p.action).to_string(),
age_ms: now.saturating_duration_since(p.emitted_at).as_millis() as u64,
})
.collect::<Vec<_>>()
.into();
let freeze_remaining_ms = actual
.freeze_until
.map(|until| until.saturating_duration_since(now).as_millis() as u64);
let admin_audit: std::sync::Arc<[super::ice::AdminAuditRecord]> =
admin_audit_ring.iter().cloned().collect::<Vec<_>>().into();
let log_ring_arc: std::sync::Arc<[super::logs::LogRecord]> =
log_ring.iter().cloned().collect::<Vec<_>>().into();
let in_flight_migrations_arc: std::sync::Arc<[MigrationSnapshot]> =
in_flight_migrations.into();
Self {
daemons,
replicas,
peers,
avoid_list,
local_maintenance,
recently_emitted,
recent_failures: recent_failures.iter().cloned().collect(),
freeze_remaining_ms,
admin_audit,
log_ring: log_ring_arc,
in_flight_migrations: in_flight_migrations_arc,
runtime_epoch_id: 0,
}
}
}
pub fn action_kind_str(action: &MeshOsAction) -> &'static str {
match action {
MeshOsAction::StartDaemon { .. } => "start_daemon",
MeshOsAction::StopDaemon { .. } => "stop_daemon",
MeshOsAction::PullReplica { .. } => "pull_replica",
MeshOsAction::DropReplica { .. } => "drop_replica",
MeshOsAction::RequestPlacement { .. } => "request_placement",
MeshOsAction::RequestEviction { .. } => "request_eviction",
MeshOsAction::MigrateBlob { .. } => "migrate_blob",
MeshOsAction::ReduceHeat { .. } => "reduce_heat",
MeshOsAction::MarkAvoid { .. } => "mark_avoid",
MeshOsAction::ApplyBackoff { .. } => "apply_backoff",
MeshOsAction::CommitMaintenanceTransition { .. } => "commit_maintenance_transition",
}
}
pub fn failure_record(
source: impl Into<String>,
reason: impl Into<String>,
age: Duration,
) -> FailureRecord {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let recorded_at_ms = now_ms.saturating_sub(age.as_millis() as u64);
FailureRecord {
seq: 0,
source: source.into(),
reason: reason.into(),
recorded_at_ms,
}
}
pub fn daemon_id(d: &DaemonRef) -> u64 {
d.id
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use std::time::Instant;
use super::super::action::{ActionId, MaintenanceTransition};
use super::super::event::{DaemonHealth, NodeHealth};
use super::super::maintenance::MaintenanceState;
use super::super::state::{AvoidEntry, DaemonStatus, MaintenanceMirror};
use super::*;
fn dref(name: &str, id: u64) -> DaemonRef {
DaemonRef {
id,
name: name.into(),
}
}
#[test]
fn failure_record_age_ms_derives_from_recorded_at_ms() {
let r = FailureRecord {
seq: 1,
source: "test".into(),
reason: "boom".into(),
recorded_at_ms: 1_000,
};
assert_eq!(r.age_ms(3_000), 2_000);
assert_eq!(r.age_ms(500), 0);
}
#[test]
fn empty_snapshot_round_trips_through_postcard() {
let s = MeshOsSnapshot::default();
let bytes = postcard::to_allocvec(&s).expect("encode");
let back: MeshOsSnapshot = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(s, back);
}
#[test]
fn empty_snapshot_round_trips_through_json() {
let s = MeshOsSnapshot::default();
let json = serde_json::to_string(&s).expect("encode");
let back: MeshOsSnapshot = serde_json::from_str(&json).expect("decode");
assert_eq!(s, back);
}
#[test]
fn snapshot_captures_daemon_lifecycle_and_health() {
let mut actual = MeshOsState::default();
let base = Instant::now();
actual.last_tick = Some(base);
let d = dref("telemetry", 1);
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
status.health = Some(DaemonHealth::Degraded {
reason: "queue depth".into(),
});
status.saturation = 0.42;
actual.daemons.insert(d.clone(), status);
let desired = DesiredState::default();
let snap = MeshOsSnapshot::from_state(
&actual,
&desired,
&[],
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
0,
);
let daemon = snap.daemons.get(&1).expect("daemon present");
assert_eq!(daemon.name, "telemetry");
assert_eq!(daemon.lifecycle, DaemonLifecycleSnapshot::Running);
assert!(matches!(
daemon.health,
Some(DaemonHealthSnapshot::Degraded { .. })
));
assert!((daemon.saturation - 0.42).abs() < 1e-6);
}
#[test]
fn snapshot_captures_replica_holders_and_leader_and_desired_count() {
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
actual
.replicas
.insert(0xAA, ::std::collections::BTreeSet::from([1, 2, 3]));
actual.replica_leader.insert(0xAA, 1);
let mut desired = DesiredState::default();
desired.desired_replicas.insert(0xAA, 5);
let snap = MeshOsSnapshot::from_state(
&actual,
&desired,
&[],
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
0,
);
let r = snap.replicas.get(&0xAA).expect("replica present");
assert_eq!(r.holders, vec![1, 2, 3]);
assert_eq!(r.desired_count, Some(5));
assert_eq!(r.leader, Some(1));
}
#[test]
fn snapshot_surfaces_chains_with_desired_count_but_no_holders_yet() {
let actual = MeshOsState::default();
let mut desired = DesiredState::default();
desired.desired_replicas.insert(0xBB, 3);
let snap = MeshOsSnapshot::from_state(
&actual,
&desired,
&[],
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
0,
);
let r = snap
.replicas
.get(&0xBB)
.expect("missing chain not surfaced");
assert_eq!(r.holders, Vec::<NodeId>::new());
assert_eq!(r.desired_count, Some(3));
}
#[test]
fn snapshot_round_trips_a_realistic_state() {
let mut actual = MeshOsState::default();
let base = Instant::now();
actual.last_tick = Some(base);
let d = dref("watch", 7);
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
status.health = Some(DaemonHealth::Healthy);
actual.daemons.insert(d, status);
actual
.replicas
.insert(0xC0FFEE, ::std::collections::BTreeSet::from([10, 11]));
actual.replica_leader.insert(0xC0FFEE, 10);
actual.rtt.insert(10, Duration::from_millis(45));
actual.node_health.insert(10, NodeHealth::Healthy);
actual
.maintenance
.insert(11, MaintenanceMirror::Maintenance);
actual.avoid_list.insert(
999,
AvoidEntry {
reason: "noisy".into(),
until: base + Duration::from_secs(120),
},
);
actual.local_maintenance = MaintenanceState::EnteringMaintenance {
since: base,
deadline: Some(base + Duration::from_secs(60)),
};
let mut desired = DesiredState::default();
desired.desired_replicas.insert(0xC0FFEE, 3);
let pending = [PendingAction {
id: ActionId(1),
action: MeshOsAction::CommitMaintenanceTransition {
node: 0,
target: MaintenanceTransition::Maintenance,
},
emitted_at: base,
}];
let snap = MeshOsSnapshot::from_state(
&actual,
&desired,
&pending,
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
0,
);
let bytes = postcard::to_allocvec(&snap).expect("encode");
let back: MeshOsSnapshot = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(snap, back);
let json = serde_json::to_string(&snap).expect("encode json");
let back2: MeshOsSnapshot = serde_json::from_str(&json).expect("decode json");
assert_eq!(snap, back2);
}
#[test]
fn placement_matches_this_node_for_every_entry() {
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
for (name, id) in [("alpha", 1u64), ("beta", 2), ("gamma", 3)] {
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
actual.daemons.insert(dref(name, id), status);
}
for this_node in [0u64, 1, 0x2A2A, NodeId::MAX] {
let snap = MeshOsSnapshot::from_state(
&actual,
&DesiredState::default(),
&[],
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
this_node,
);
assert!(!snap.daemons.is_empty(), "fixture daemons were dropped");
for (id, d) in &snap.daemons {
assert_eq!(
d.placement, this_node,
"daemon {id} placement diverged from this_node {this_node:x}",
);
}
}
}
#[test]
fn daemon_age_anchors_on_last_exit_when_stopped() {
let mut actual = MeshOsState::default();
let now = Instant::now();
actual.last_tick = Some(now);
let d = dref("worker", 1);
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Stopped;
status.last_started = now.checked_sub(Duration::from_secs(2));
status.last_exit = now.checked_sub(Duration::from_millis(500));
assert!(status.last_started.is_some());
assert!(status.last_exit.is_some());
actual.daemons.insert(d, status);
let snap = MeshOsSnapshot::from_state(
&actual,
&DesiredState::default(),
&[],
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
0,
);
let daemon = snap.daemons.get(&1).expect("daemon present");
assert!(
daemon.age_ms < 1500,
"age_ms anchored wrong (looks like last_started): got {}",
daemon.age_ms,
);
assert!(
daemon.age_ms >= 400,
"age_ms below last_exit floor: got {}",
daemon.age_ms,
);
}
#[test]
fn daemon_age_anchors_on_last_started_when_running() {
let mut actual = MeshOsState::default();
let now = Instant::now();
actual.last_tick = Some(now);
let d = dref("worker", 2);
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
status.last_started = Some(now - Duration::from_secs(30));
status.last_exit = Some(now - Duration::from_secs(900));
actual.daemons.insert(d, status);
let snap = MeshOsSnapshot::from_state(
&actual,
&DesiredState::default(),
&[],
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
0,
);
let daemon = snap.daemons.get(&2).expect("daemon present");
assert!(daemon.age_ms >= 30_000, "got age_ms = {}", daemon.age_ms);
assert!(daemon.age_ms < 60_000, "got age_ms = {}", daemon.age_ms);
}
#[test]
fn daemon_age_uses_most_recent_of_exit_or_crash() {
let mut actual = MeshOsState::default();
let now = Instant::now();
actual.last_tick = Some(now);
let d = dref("worker", 3);
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Stopped;
status.last_started = Some(now - Duration::from_secs(120));
status.last_exit = Some(now - Duration::from_secs(90));
status.last_crash = Some(now - Duration::from_secs(10));
actual.daemons.insert(d, status);
let snap = MeshOsSnapshot::from_state(
&actual,
&DesiredState::default(),
&[],
&[],
Vec::new(),
&std::collections::VecDeque::new(),
&std::collections::VecDeque::new(),
0,
);
let daemon = snap.daemons.get(&3).expect("daemon present");
assert!(daemon.age_ms >= 10_000, "got age_ms = {}", daemon.age_ms);
assert!(daemon.age_ms < 60_000, "got age_ms = {}", daemon.age_ms);
}
#[test]
fn daemon_snapshot_decodes_legacy_json_without_placement_or_age() {
let legacy = r#"{
"name": "compute-A",
"lifecycle": "Running",
"health": "Healthy",
"saturation": 0.25,
"restart_state": "Idle"
}"#;
let back: DaemonSnapshot = serde_json::from_str(legacy).expect("decode legacy");
assert_eq!(back.name, "compute-A");
assert_eq!(back.lifecycle, DaemonLifecycleSnapshot::Running);
assert_eq!(back.placement, 0);
assert_eq!(back.age_ms, 0);
}
#[test]
fn peer_snapshot_decodes_legacy_json_without_inventory_fields() {
let legacy = r#"{
"rtt_ms": 7,
"health": "Healthy",
"maintenance": "Active"
}"#;
let back: PeerSnapshot = serde_json::from_str(legacy).expect("decode legacy");
assert_eq!(back.rtt_ms, Some(7));
assert!(back.cpu_load_1m.is_none());
assert!(back.mem_used_bytes.is_none());
assert!(back.capability_set.is_empty());
assert!(back.software_version.is_none());
assert!(back.forked_from.is_none());
}
#[test]
fn daemon_snapshot_postcard_wire_is_byte_stable() {
let s = DaemonSnapshot {
name: "x".into(),
lifecycle: DaemonLifecycleSnapshot::Running,
health: None,
saturation: 0.5,
restart_state: RestartStateSnapshot::Idle,
placement: 0xAA,
age_ms: 1234,
};
let bytes = postcard::to_allocvec(&s).expect("encode");
let captured: &[u8] = &[
0x01, 0x78, 0x02, 0x00, 0x00, 0x00, 0x00, 0x3F, 0x00, 0xAA, 0x01, 0xD2, 0x09,
];
assert_eq!(
bytes, captured,
"DaemonSnapshot postcard wire drifted — got {bytes:?}",
);
let back: DaemonSnapshot = postcard::from_bytes(captured).expect("decode captured bytes");
assert_eq!(back, s);
}
#[test]
fn peer_snapshot_postcard_wire_is_byte_stable() {
let mut p = PeerSnapshot {
rtt_ms: Some(7),
health: Some(PeerHealthSnapshot::Healthy),
maintenance: Some(MaintenanceMirrorSnapshot::Active),
cpu_load_1m: Some(0.25),
mem_used_bytes: Some(1024),
mem_total_bytes: Some(8192),
disk_used_bytes: None,
disk_total_bytes: None,
saturation_trend: Some(0.4),
capability_set: std::collections::BTreeSet::new(),
software_version: Some("v1".into()),
forked_from: None,
};
p.capability_set.insert("net.peer".into());
let bytes = postcard::to_allocvec(&p).expect("encode");
let captured: &[u8] = &[
0x01, 0x07, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD0,
0x3F, 0x01, 0x80, 0x08, 0x01, 0x80, 0x40, 0x00, 0x00, 0x01, 0xCD, 0xCC, 0xCC, 0x3E,
0x01, 0x08, 0x6E, 0x65, 0x74, 0x2E, 0x70, 0x65, 0x65, 0x72, 0x01, 0x02, 0x76, 0x31,
0x00,
];
assert_eq!(
bytes, captured,
"PeerSnapshot postcard wire drifted — got {bytes:?}",
);
let back: PeerSnapshot = postcard::from_bytes(captured).expect("decode captured bytes");
assert_eq!(back, p);
}
#[test]
fn action_kind_str_covers_every_variant() {
let cases: Vec<MeshOsAction> = vec![
MeshOsAction::StartDaemon {
daemon: dref("a", 1),
},
MeshOsAction::StopDaemon {
daemon: dref("a", 1),
reason: "x".into(),
deadline: Instant::now(),
},
MeshOsAction::PullReplica {
chain: 1,
source: 2,
},
MeshOsAction::DropReplica { chain: 1 },
MeshOsAction::RequestPlacement {
chain: 1,
exclude: vec![],
target: None,
},
MeshOsAction::RequestEviction {
chain: 1,
victim: 2,
},
MeshOsAction::MigrateBlob {
blob: 1,
from: 2,
to: 3,
},
MeshOsAction::ReduceHeat { blob: 1, by: 1 },
MeshOsAction::MarkAvoid {
peer: 1,
reason: "x".into(),
ttl: Duration::from_secs(60),
},
MeshOsAction::ApplyBackoff {
daemon: dref("a", 1),
until: Instant::now(),
},
MeshOsAction::CommitMaintenanceTransition {
node: 1,
target: MaintenanceTransition::Maintenance,
},
];
for a in cases {
let _ = action_kind_str(&a);
}
}
}