use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::adapter::net::behavior::proximity::ProximityGraph;
use super::event::{NodeHealth, NodeId};
pub trait LocalityProbe: Send + Sync + 'static {
fn rtt_samples(&self) -> Vec<(NodeId, Duration)>;
}
pub trait HealthProbe: Send + Sync + 'static {
fn health_samples(&self) -> Vec<(NodeId, NodeHealth)>;
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct PeerInventory {
pub cpu_load_1m: Option<f64>,
pub mem_used_bytes: Option<u64>,
pub mem_total_bytes: Option<u64>,
pub disk_used_bytes: Option<u64>,
pub disk_total_bytes: Option<u64>,
pub saturation_trend: Option<f32>,
pub capability_set: std::collections::BTreeSet<String>,
pub software_version: Option<String>,
pub forked_from: Option<NodeId>,
}
pub trait InventoryProbe: Send + Sync + 'static {
fn inventory_samples(&self) -> Vec<(NodeId, PeerInventory)>;
}
pub struct ProximityGraphLocalityProbe {
graph: Arc<ProximityGraph>,
this_node: NodeId,
}
impl ProximityGraphLocalityProbe {
pub fn new(graph: Arc<ProximityGraph>, this_node: NodeId) -> Self {
Self { graph, this_node }
}
}
impl LocalityProbe for ProximityGraphLocalityProbe {
fn rtt_samples(&self) -> Vec<(NodeId, Duration)> {
self.graph
.all_nodes()
.into_iter()
.filter_map(|node| {
let peer = graph_id_to_node_id(&node.node_id);
if peer == self.this_node {
return None;
}
Some((peer, Duration::from_micros(node.latency_us)))
})
.collect()
}
}
pub struct ProximityGraphHealthProbe {
graph: Arc<ProximityGraph>,
this_node: NodeId,
degraded_threshold: Duration,
stale_threshold: Duration,
}
impl ProximityGraphHealthProbe {
pub fn new(
graph: Arc<ProximityGraph>,
this_node: NodeId,
degraded_threshold: Duration,
stale_threshold: Duration,
) -> Self {
Self {
graph,
this_node,
degraded_threshold,
stale_threshold,
}
}
pub fn with_defaults(graph: Arc<ProximityGraph>, this_node: NodeId) -> Self {
Self::new(
graph,
this_node,
Duration::from_millis(1500),
Duration::from_secs(5),
)
}
}
impl HealthProbe for ProximityGraphHealthProbe {
fn health_samples(&self) -> Vec<(NodeId, NodeHealth)> {
let now = Instant::now();
self.graph
.all_nodes()
.into_iter()
.filter_map(|node| {
let peer = graph_id_to_node_id(&node.node_id);
if peer == self.this_node {
return None;
}
let age = now.saturating_duration_since(node.last_seen);
let health = if age >= self.stale_threshold {
NodeHealth::Unreachable
} else if age >= self.degraded_threshold {
NodeHealth::Degraded
} else {
NodeHealth::Healthy
};
Some((peer, health))
})
.collect()
}
}
#[expect(
clippy::unwrap_used,
reason = "input is &[u8; 32]; slicing [0..8] then .try_into::<[u8; 8]>() is statically infallible"
)]
fn graph_id_to_node_id(graph_id: &[u8; 32]) -> NodeId {
u64::from_le_bytes(graph_id[0..8].try_into().unwrap())
}
#[cfg(test)]
mod tests {
use super::*;
struct FixedLocalityProbe {
samples: Vec<(NodeId, Duration)>,
}
impl LocalityProbe for FixedLocalityProbe {
fn rtt_samples(&self) -> Vec<(NodeId, Duration)> {
self.samples.clone()
}
}
struct FixedHealthProbe {
samples: Vec<(NodeId, NodeHealth)>,
}
impl HealthProbe for FixedHealthProbe {
fn health_samples(&self) -> Vec<(NodeId, NodeHealth)> {
self.samples.clone()
}
}
#[test]
fn fixed_locality_probe_returns_configured_samples() {
let probe = FixedLocalityProbe {
samples: vec![
(1, Duration::from_millis(50)),
(2, Duration::from_millis(120)),
],
};
let samples = probe.rtt_samples();
assert_eq!(samples.len(), 2);
assert_eq!(samples[0], (1, Duration::from_millis(50)));
assert_eq!(samples[1], (2, Duration::from_millis(120)));
}
#[test]
fn fixed_health_probe_returns_configured_samples() {
let probe = FixedHealthProbe {
samples: vec![(1, NodeHealth::Healthy), (2, NodeHealth::Unreachable)],
};
let samples = probe.health_samples();
assert_eq!(samples.len(), 2);
assert_eq!(samples[0].1, NodeHealth::Healthy);
assert_eq!(samples[1].1, NodeHealth::Unreachable);
}
#[test]
fn graph_id_bridge_round_trips_first_8_bytes_little_endian() {
let mut graph_id = [0u8; 32];
graph_id[0..8].copy_from_slice(&12345u64.to_le_bytes());
assert_eq!(graph_id_to_node_id(&graph_id), 12345);
}
use crate::adapter::net::behavior::proximity::{
EnhancedPingwave, ProximityConfig, ProximityGraph,
};
use std::net::SocketAddr;
fn gid(n: u8) -> [u8; 32] {
let mut id = [0u8; 32];
id[0] = n;
id
}
fn graph_with_one_peer(peer_id: [u8; 32]) -> Arc<ProximityGraph> {
let my_id = gid(1);
let graph = Arc::new(ProximityGraph::new(my_id, ProximityConfig::default()));
let from_addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
let pw = EnhancedPingwave::new(peer_id, 1, 3);
graph.on_pingwave_from(pw, peer_id, from_addr);
graph
}
#[test]
fn proximity_graph_locality_probe_filters_self_and_returns_peers() {
let peer = gid(2);
let graph = graph_with_one_peer(peer);
let my_u64 = graph_id_to_node_id(&gid(1));
let probe = ProximityGraphLocalityProbe::new(graph, my_u64);
let samples = probe.rtt_samples();
assert_eq!(samples.len(), 1);
assert_eq!(samples[0].0, graph_id_to_node_id(&peer));
let _ = samples[0].1;
}
#[test]
fn proximity_graph_health_probe_classifies_age_into_three_tiers() {
let peer = gid(2);
let graph = graph_with_one_peer(peer);
let my_u64 = graph_id_to_node_id(&gid(1));
let peer_u64 = graph_id_to_node_id(&peer);
let probe = ProximityGraphHealthProbe::new(
Arc::clone(&graph),
my_u64,
Duration::from_secs(3600),
Duration::from_secs(3600),
);
let samples = probe.health_samples();
assert_eq!(samples.len(), 1);
assert_eq!(samples[0], (peer_u64, NodeHealth::Healthy));
std::thread::sleep(Duration::from_millis(2));
let probe = ProximityGraphHealthProbe::new(
Arc::clone(&graph),
my_u64,
Duration::from_micros(1),
Duration::from_secs(3600),
);
let samples = probe.health_samples();
assert_eq!(samples[0].1, NodeHealth::Degraded);
let probe = ProximityGraphHealthProbe::new(
graph,
my_u64,
Duration::from_micros(1),
Duration::from_micros(1),
);
let samples = probe.health_samples();
assert_eq!(samples[0].1, NodeHealth::Unreachable);
}
#[test]
fn proximity_graph_health_probe_with_defaults_picks_sensible_thresholds() {
let peer = gid(2);
let graph = graph_with_one_peer(peer);
let my_u64 = graph_id_to_node_id(&gid(1));
let defaults_probe = ProximityGraphHealthProbe::with_defaults(Arc::clone(&graph), my_u64);
let samples = defaults_probe.health_samples();
assert_eq!(samples.len(), 1);
assert_eq!(samples[0].1, NodeHealth::Healthy);
assert_eq!(
defaults_probe.degraded_threshold,
Duration::from_millis(1500)
);
assert_eq!(defaults_probe.stale_threshold, Duration::from_secs(5));
}
}