use photon_ring::Pod;
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct SubscriberSnapshot {
pub total_received: u64,
pub total_lagged: u64,
pub receive_ratio: f64,
pub pending: u64,
}
impl core::fmt::Display for SubscriberSnapshot {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"recv={} lag={} ratio={:.2}% pending={}",
self.total_received,
self.total_lagged,
self.receive_ratio * 100.0,
self.pending,
)
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct PublisherSnapshot {
pub published: u64,
pub capacity: u64,
}
impl core::fmt::Display for PublisherSnapshot {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "published={} capacity={}", self.published, self.capacity,)
}
}
pub struct SubscriberMetrics {
prev_received: u64,
prev_lagged: u64,
}
impl SubscriberMetrics {
pub fn new<T: Pod>(sub: &photon_ring::Subscriber<T>) -> Self {
Self {
prev_received: sub.total_received(),
prev_lagged: sub.total_lagged(),
}
}
pub fn snapshot<T: Pod>(&self, sub: &photon_ring::Subscriber<T>) -> SubscriberSnapshot {
SubscriberSnapshot {
total_received: sub.total_received(),
total_lagged: sub.total_lagged(),
receive_ratio: sub.receive_ratio(),
pending: sub.pending(),
}
}
pub fn delta<T: Pod>(&mut self, sub: &photon_ring::Subscriber<T>) -> SubscriberSnapshot {
let current = self.snapshot(sub);
let delta = SubscriberSnapshot {
total_received: current.total_received - self.prev_received,
total_lagged: current.total_lagged - self.prev_lagged,
receive_ratio: current.receive_ratio,
pending: current.pending,
};
self.prev_received = current.total_received;
self.prev_lagged = current.total_lagged;
delta
}
}
pub struct PublisherMetrics;
impl PublisherMetrics {
pub fn snapshot<T: Pod>(pub_: &photon_ring::Publisher<T>) -> PublisherSnapshot {
PublisherSnapshot {
published: pub_.published(),
capacity: pub_.capacity(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn subscriber_snapshot_initial() {
let (_, subs) = photon_ring::channel::<u64>(64);
let sub = subs.subscribe();
let metrics = SubscriberMetrics::new(&sub);
let snap = metrics.snapshot(&sub);
assert_eq!(snap.total_received, 0);
assert_eq!(snap.total_lagged, 0);
assert_eq!(snap.receive_ratio, 0.0);
assert_eq!(snap.pending, 0);
}
#[test]
fn subscriber_snapshot_after_recv() {
let (mut pub_, subs) = photon_ring::channel::<u64>(64);
let mut sub = subs.subscribe();
let metrics = SubscriberMetrics::new(&sub);
pub_.publish(1);
pub_.publish(2);
pub_.publish(3);
assert_eq!(sub.try_recv(), Ok(1));
assert_eq!(sub.try_recv(), Ok(2));
let snap = metrics.snapshot(&sub);
assert_eq!(snap.total_received, 2);
assert_eq!(snap.total_lagged, 0);
assert_eq!(snap.receive_ratio, 1.0);
assert_eq!(snap.pending, 1); }
#[test]
fn subscriber_delta() {
let (mut pub_, subs) = photon_ring::channel::<u64>(64);
let mut sub = subs.subscribe();
let mut metrics = SubscriberMetrics::new(&sub);
pub_.publish(10);
pub_.publish(20);
assert_eq!(sub.try_recv(), Ok(10));
assert_eq!(sub.try_recv(), Ok(20));
let d1 = metrics.delta(&sub);
assert_eq!(d1.total_received, 2);
assert_eq!(d1.total_lagged, 0);
pub_.publish(30);
assert_eq!(sub.try_recv(), Ok(30));
let d2 = metrics.delta(&sub);
assert_eq!(d2.total_received, 1);
assert_eq!(d2.total_lagged, 0);
}
#[test]
fn subscriber_delta_with_lag() {
let (mut pub_, subs) = photon_ring::channel::<u64>(4);
let mut sub = subs.subscribe();
let mut metrics = SubscriberMetrics::new(&sub);
for i in 0..6 {
pub_.publish(i);
}
let _ = sub.try_recv(); let _ = sub.try_recv();
let d = metrics.delta(&sub);
assert!(d.total_lagged > 0 || d.total_received > 0);
}
#[test]
fn publisher_snapshot() {
let (mut pub_, _subs) = photon_ring::channel::<u64>(128);
let snap0 = PublisherMetrics::snapshot(&pub_);
assert_eq!(snap0.published, 0);
assert_eq!(snap0.capacity, 128);
pub_.publish(1);
pub_.publish(2);
pub_.publish(3);
let snap1 = PublisherMetrics::snapshot(&pub_);
assert_eq!(snap1.published, 3);
assert_eq!(snap1.capacity, 128);
}
#[test]
fn subscriber_snapshot_display() {
let snap = SubscriberSnapshot {
total_received: 100,
total_lagged: 5,
receive_ratio: 0.9524,
pending: 3,
};
let s = format!("{snap}");
assert_eq!(s, "recv=100 lag=5 ratio=95.24% pending=3");
}
#[test]
fn publisher_snapshot_display() {
let snap = PublisherSnapshot {
published: 42,
capacity: 1024,
};
let s = format!("{snap}");
assert_eq!(s, "published=42 capacity=1024");
}
}