use std::sync::Arc;
use std::time::Duration;
use crate::mesh::Mesh;
use crate::meshos::{
HealthProbe, InventoryProbe, LocalityProbe, MeshOsRuntime, NodeHealth, NodeId, PeerInventory,
};
pub struct MeshLocalityProbe {
mesh: Arc<Mesh>,
expected_peers: Arc<Vec<NodeId>>,
loopback_rtt: Duration,
}
impl MeshLocalityProbe {
pub fn new(mesh: Arc<Mesh>, expected_peers: Arc<Vec<NodeId>>) -> Self {
Self {
mesh,
expected_peers,
loopback_rtt: Duration::from_micros(500),
}
}
}
impl LocalityProbe for MeshLocalityProbe {
fn rtt_samples(&self) -> Vec<(NodeId, Duration)> {
let local = self.mesh.node_id();
self.expected_peers
.iter()
.copied()
.filter(|id| *id != local)
.filter(|id| peer_reachable(&self.mesh, *id))
.map(|id| (id, self.loopback_rtt))
.collect()
}
}
pub struct MeshHealthProbe {
mesh: Arc<Mesh>,
expected_peers: Arc<Vec<NodeId>>,
}
impl MeshHealthProbe {
pub fn new(mesh: Arc<Mesh>, expected_peers: Arc<Vec<NodeId>>) -> Self {
Self {
mesh,
expected_peers,
}
}
}
impl HealthProbe for MeshHealthProbe {
fn health_samples(&self) -> Vec<(NodeId, NodeHealth)> {
let local = self.mesh.node_id();
self.expected_peers
.iter()
.copied()
.filter(|id| *id != local)
.map(|id| {
let h = if peer_reachable(&self.mesh, id) {
NodeHealth::Healthy
} else {
NodeHealth::Unreachable
};
(id, h)
})
.collect()
}
}
pub struct MeshInventoryProbe {
mesh: Arc<Mesh>,
expected_peers: Arc<Vec<NodeId>>,
}
impl MeshInventoryProbe {
pub fn new(mesh: Arc<Mesh>, expected_peers: Arc<Vec<NodeId>>) -> Self {
Self {
mesh,
expected_peers,
}
}
}
impl InventoryProbe for MeshInventoryProbe {
fn inventory_samples(&self) -> Vec<(NodeId, PeerInventory)> {
let local = self.mesh.node_id();
self.expected_peers
.iter()
.copied()
.filter(|id| *id != local)
.filter(|id| peer_reachable(&self.mesh, *id))
.map(|id| (id, PeerInventory::default()))
.collect()
}
}
pub fn install_mesh_probes(
runtime: &MeshOsRuntime,
mesh: Arc<Mesh>,
expected_peers: Arc<Vec<NodeId>>,
) {
runtime.add_locality_probe(Arc::new(MeshLocalityProbe::new(
Arc::clone(&mesh),
Arc::clone(&expected_peers),
)));
runtime.add_health_probe(Arc::new(MeshHealthProbe::new(
Arc::clone(&mesh),
Arc::clone(&expected_peers),
)));
runtime.add_inventory_probe(Arc::new(MeshInventoryProbe::new(mesh, expected_peers)));
}
fn peer_reachable(mesh: &Mesh, peer: NodeId) -> bool {
mesh.inner().peer_addr(peer).is_some()
}