use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use bytes::Bytes;
pub use orbit_rs::OrbitTyped;
use orbit_rs::{Fleet, NetId64};
pub trait OrbitMetricSnapshot: OrbitTyped + Sized {
const FAMILY: &'static str;
fn node_id(&self) -> u16;
fn captured_at_unix_secs(&self) -> u64;
fn encode(&self) -> Result<Vec<u8>, String>;
fn decode(bytes: &[u8]) -> Result<Self, String>;
}
pub trait OrbitMetricKeyedSnapshot: OrbitMetricSnapshot {
type Key: Eq + Hash;
fn metric_key(&self) -> Self::Key;
}
#[derive(Clone, Debug)]
pub struct OrbitMetricSample<T> {
pub id: NetId64,
pub snapshot: T,
}
impl<T: OrbitMetricSnapshot> OrbitMetricSample<T> {
pub fn node_id(&self) -> u16 {
self.snapshot.node_id()
}
pub fn captured_at_unix_secs(&self) -> u64 {
self.snapshot.captured_at_unix_secs()
}
pub fn age_secs(&self, now_unix_secs: u64) -> u64 {
now_unix_secs.saturating_sub(self.captured_at_unix_secs())
}
pub fn is_fresh(&self, now_unix_secs: u64, max_age_secs: u64) -> bool {
self.age_secs(now_unix_secs) <= max_age_secs
}
}
#[derive(Clone)]
pub struct OrbitMetricFamily<T: OrbitMetricSnapshot> {
fleet: Arc<Fleet>,
_t: std::marker::PhantomData<T>,
}
impl<T: OrbitMetricSnapshot> OrbitMetricFamily<T> {
pub fn new(fleet: Arc<Fleet>) -> Self {
Self {
fleet,
_t: std::marker::PhantomData,
}
}
pub fn publisher(&self) -> OrbitMetricPublisher<T> {
OrbitMetricPublisher {
family: self.clone(),
}
}
pub fn collector(&self) -> OrbitMetricCollector<T> {
OrbitMetricCollector {
family: self.clone(),
}
}
}
#[derive(Clone)]
pub struct OrbitMetricPublisher<T: OrbitMetricSnapshot> {
family: OrbitMetricFamily<T>,
}
impl<T: OrbitMetricSnapshot> OrbitMetricPublisher<T> {
pub fn new(fleet: Arc<Fleet>) -> Self {
Self {
family: OrbitMetricFamily::new(fleet),
}
}
pub fn publish(&self, snapshot: &T) -> Result<NetId64, String> {
let payload = snapshot.encode()?;
#[cfg(unix)]
if payload.len() > orbit_rs::ring_shm::PAYLOAD_MAX {
return Err(format!(
"orbit metrics payload too large for {}: {} > {}",
T::FAMILY,
payload.len(),
orbit_rs::ring_shm::PAYLOAD_MAX
));
}
Ok(self.family.fleet.publish::<T>(
0,
snapshot.captured_at_unix_secs(),
Bytes::from(payload),
))
}
}
#[derive(Clone)]
pub struct OrbitMetricCollector<T: OrbitMetricSnapshot> {
family: OrbitMetricFamily<T>,
}
impl<T: OrbitMetricSnapshot> OrbitMetricCollector<T> {
pub fn new(fleet: Arc<Fleet>) -> Self {
Self {
family: OrbitMetricFamily::new(fleet),
}
}
pub fn latest_by_node(&self) -> HashMap<u16, OrbitMetricSample<T>> {
let head = self.family.fleet.head::<T>();
if head == 0 {
return HashMap::new();
}
let capacity = self.family.fleet.ring_capacity::<T>() as u64;
let walk_count = head.min(capacity);
let mut samples = HashMap::new();
let expected_nodes = self.family.fleet.fleet_size() as usize;
for i in 0..walk_count {
let counter = head - 1 - i;
let Some(frame) = self.family.fleet.read_at::<T>(counter) else {
if counter == 0 {
break;
}
continue;
};
let Ok(snapshot) = T::decode(&frame.payload) else {
if counter == 0 {
break;
}
continue;
};
samples
.entry(snapshot.node_id())
.or_insert(OrbitMetricSample {
id: frame.id,
snapshot,
});
if expected_nodes > 0 && samples.len() >= expected_nodes {
break;
}
if counter == 0 {
break;
}
}
samples
}
pub fn latest_by_key<K>(&self) -> HashMap<K, OrbitMetricSample<T>>
where
T: OrbitMetricKeyedSnapshot<Key = K>,
K: Eq + Hash,
{
let head = self.family.fleet.head::<T>();
if head == 0 {
return HashMap::new();
}
let capacity = self.family.fleet.ring_capacity::<T>() as u64;
let walk_count = head.min(capacity);
let mut samples = HashMap::new();
for i in 0..walk_count {
let counter = head - 1 - i;
let Some(frame) = self.family.fleet.read_at::<T>(counter) else {
if counter == 0 {
break;
}
continue;
};
let Ok(snapshot) = T::decode(&frame.payload) else {
if counter == 0 {
break;
}
continue;
};
samples
.entry(snapshot.metric_key())
.or_insert(OrbitMetricSample {
id: frame.id,
snapshot,
});
if counter == 0 {
break;
}
}
samples
}
pub fn fresh_by_key<K>(
&self,
now_unix_secs: u64,
max_age_secs: u64,
) -> HashMap<K, OrbitMetricSample<T>>
where
T: OrbitMetricKeyedSnapshot<Key = K>,
K: Eq + Hash,
{
self.latest_by_key()
.into_iter()
.filter(|(_, sample)| sample.is_fresh(now_unix_secs, max_age_secs))
.collect()
}
pub fn fresh_by_node(
&self,
now_unix_secs: u64,
max_age_secs: u64,
) -> HashMap<u16, OrbitMetricSample<T>> {
self.latest_by_node()
.into_iter()
.filter(|(_, sample)| sample.is_fresh(now_unix_secs, max_age_secs))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone, Debug, PartialEq, Eq)]
struct TestSnapshot {
node: u16,
captured_at: u64,
value: u64,
}
impl OrbitTyped for TestSnapshot {
const KIND: u8 = 211;
}
impl OrbitMetricSnapshot for TestSnapshot {
const FAMILY: &'static str = "test";
fn node_id(&self) -> u16 {
self.node
}
fn captured_at_unix_secs(&self) -> u64 {
self.captured_at
}
fn encode(&self) -> Result<Vec<u8>, String> {
let mut out = Vec::with_capacity(18);
out.extend_from_slice(&self.node.to_le_bytes());
out.extend_from_slice(&self.captured_at.to_le_bytes());
out.extend_from_slice(&self.value.to_le_bytes());
Ok(out)
}
fn decode(bytes: &[u8]) -> Result<Self, String> {
if bytes.len() != 18 {
return Err(format!("bad len {}", bytes.len()));
}
let node = u16::from_le_bytes(bytes[0..2].try_into().expect("node bytes"));
let captured_at = u64::from_le_bytes(bytes[2..10].try_into().expect("time bytes"));
let value = u64::from_le_bytes(bytes[10..18].try_into().expect("value bytes"));
Ok(Self {
node,
captured_at,
value,
})
}
}
#[test]
fn latest_by_node_keeps_newest_sample_per_node() {
let fleet = Arc::new(Fleet::join("metrics-test", 2).unwrap());
let family = OrbitMetricFamily::<TestSnapshot>::new(fleet);
let publisher = family.publisher();
let collector = family.collector();
publisher
.publish(&TestSnapshot {
node: 1,
captured_at: 10,
value: 100,
})
.unwrap();
publisher
.publish(&TestSnapshot {
node: 2,
captured_at: 11,
value: 200,
})
.unwrap();
publisher
.publish(&TestSnapshot {
node: 1,
captured_at: 12,
value: 101,
})
.unwrap();
let latest = collector.latest_by_node();
assert_eq!(latest.len(), 2);
assert_eq!(latest[&1].snapshot.value, 101);
assert_eq!(latest[&2].snapshot.value, 200);
}
#[test]
fn fresh_by_node_drops_stale_samples() {
let fleet = Arc::new(Fleet::join("metrics-fresh-test", 2).unwrap());
let family = OrbitMetricFamily::<TestSnapshot>::new(fleet);
let publisher = family.publisher();
let collector = family.collector();
publisher
.publish(&TestSnapshot {
node: 1,
captured_at: 10,
value: 100,
})
.unwrap();
publisher
.publish(&TestSnapshot {
node: 2,
captured_at: 20,
value: 200,
})
.unwrap();
let fresh = collector.fresh_by_node(25, 10);
assert_eq!(fresh.len(), 1);
assert_eq!(fresh[&2].snapshot.value, 200);
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct KeyedSnapshot {
node: u16,
key: &'static str,
captured_at: u64,
value: u64,
}
impl OrbitTyped for KeyedSnapshot {
const KIND: u8 = 212;
}
impl OrbitMetricSnapshot for KeyedSnapshot {
const FAMILY: &'static str = "keyed-test";
fn node_id(&self) -> u16 {
self.node
}
fn captured_at_unix_secs(&self) -> u64 {
self.captured_at
}
fn encode(&self) -> Result<Vec<u8>, String> {
let mut out = Vec::with_capacity(27);
out.extend_from_slice(&self.node.to_le_bytes());
out.extend_from_slice(&self.captured_at.to_le_bytes());
out.extend_from_slice(&self.value.to_le_bytes());
let key = self.key.as_bytes();
out.push(key.len() as u8);
out.extend_from_slice(key);
Ok(out)
}
fn decode(bytes: &[u8]) -> Result<Self, String> {
if bytes.len() < 19 {
return Err(format!("bad len {}", bytes.len()));
}
let node = u16::from_le_bytes(bytes[0..2].try_into().expect("node bytes"));
let captured_at = u64::from_le_bytes(bytes[2..10].try_into().expect("time bytes"));
let value = u64::from_le_bytes(bytes[10..18].try_into().expect("value bytes"));
let key_len = usize::from(bytes[18]);
if bytes.len() != 19 + key_len {
return Err(format!("bad key len {}", bytes.len()));
}
let key = std::str::from_utf8(&bytes[19..]).map_err(|e| e.to_string())?;
let key = match key {
"alpha" => "alpha",
"beta" => "beta",
_ => return Err(format!("unknown key {key}")),
};
Ok(Self {
node,
key,
captured_at,
value,
})
}
}
impl OrbitMetricKeyedSnapshot for KeyedSnapshot {
type Key = String;
fn metric_key(&self) -> Self::Key {
self.key.to_owned()
}
}
#[test]
fn latest_by_key_keeps_newest_sample_per_key() {
let fleet = Arc::new(Fleet::join("metrics-keyed-test", 2).unwrap());
let family = OrbitMetricFamily::<KeyedSnapshot>::new(fleet);
let publisher = family.publisher();
let collector = family.collector();
publisher
.publish(&KeyedSnapshot {
node: 0,
key: "alpha",
captured_at: 10,
value: 100,
})
.unwrap();
publisher
.publish(&KeyedSnapshot {
node: 0,
key: "beta",
captured_at: 11,
value: 200,
})
.unwrap();
publisher
.publish(&KeyedSnapshot {
node: 0,
key: "alpha",
captured_at: 12,
value: 101,
})
.unwrap();
let latest = collector.latest_by_key();
assert_eq!(latest.len(), 2);
assert_eq!(latest["alpha"].snapshot.value, 101);
assert_eq!(latest["beta"].snapshot.value, 200);
}
}