Skip to main content

photon_ring_metrics/
lib.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Observability wrappers for photon-ring channels.
5//!
6//! Provides framework-agnostic metric snapshots from photon-ring's built-in
7//! counters ([`total_received`], [`total_lagged`], [`receive_ratio`],
8//! [`published`], [`pending`]).
9//!
10//! [`total_received`]: photon_ring::Subscriber::total_received
11//! [`total_lagged`]: photon_ring::Subscriber::total_lagged
12//! [`receive_ratio`]: photon_ring::Subscriber::receive_ratio
13//! [`published`]: photon_ring::Publisher::published
14//! [`pending`]: photon_ring::Subscriber::pending
15//!
16//! # Example
17//!
18//! ```
19//! use photon_ring_metrics::SubscriberMetrics;
20//!
21//! let (mut pub_, subs) = photon_ring::channel::<u64>(1024);
22//! let mut sub = subs.subscribe();
23//! let metrics = SubscriberMetrics::new(&sub);
24//!
25//! pub_.publish(42);
26//! sub.try_recv().unwrap();
27//!
28//! let snapshot = metrics.snapshot(&sub);
29//! assert_eq!(snapshot.total_received, 1);
30//! assert_eq!(snapshot.total_lagged, 0);
31//! ```
32
33use photon_ring::Pod;
34
35// ---------------------------------------------------------------------------
36// Subscriber snapshot
37// ---------------------------------------------------------------------------
38
39/// Point-in-time metrics snapshot from a subscriber.
40#[derive(Debug, Clone, Copy, PartialEq)]
41pub struct SubscriberSnapshot {
42    /// Cumulative messages successfully received.
43    pub total_received: u64,
44    /// Cumulative messages lost due to lag (consumer fell behind the ring).
45    pub total_lagged: u64,
46    /// Ratio of received to total (received + lagged). 0.0 if no messages processed.
47    pub receive_ratio: f64,
48    /// Messages currently available to read (capped at ring capacity).
49    pub pending: u64,
50}
51
52impl core::fmt::Display for SubscriberSnapshot {
53    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
54        write!(
55            f,
56            "recv={} lag={} ratio={:.2}% pending={}",
57            self.total_received,
58            self.total_lagged,
59            self.receive_ratio * 100.0,
60            self.pending,
61        )
62    }
63}
64
65// ---------------------------------------------------------------------------
66// Publisher snapshot
67// ---------------------------------------------------------------------------
68
69/// Point-in-time metrics snapshot from a publisher.
70#[derive(Debug, Clone, Copy, PartialEq)]
71pub struct PublisherSnapshot {
72    /// Total messages published so far.
73    pub published: u64,
74    /// Ring capacity (power of two).
75    pub capacity: u64,
76}
77
78impl core::fmt::Display for PublisherSnapshot {
79    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
80        write!(f, "published={} capacity={}", self.published, self.capacity,)
81    }
82}
83
84// ---------------------------------------------------------------------------
85// SubscriberMetrics
86// ---------------------------------------------------------------------------
87
88/// Metrics wrapper for a [`photon_ring::Subscriber`].
89///
90/// Tracks cumulative counters and supports both absolute snapshots and
91/// delta snapshots (changes since the last `delta()` call).
92pub struct SubscriberMetrics {
93    prev_received: u64,
94    prev_lagged: u64,
95}
96
97impl SubscriberMetrics {
98    /// Create a new metrics wrapper, capturing the subscriber's current
99    /// counters as the baseline for future delta computations.
100    pub fn new<T: Pod>(sub: &photon_ring::Subscriber<T>) -> Self {
101        Self {
102            prev_received: sub.total_received(),
103            prev_lagged: sub.total_lagged(),
104        }
105    }
106
107    /// Take an absolute snapshot of current subscriber metrics.
108    pub fn snapshot<T: Pod>(&self, sub: &photon_ring::Subscriber<T>) -> SubscriberSnapshot {
109        SubscriberSnapshot {
110            total_received: sub.total_received(),
111            total_lagged: sub.total_lagged(),
112            receive_ratio: sub.receive_ratio(),
113            pending: sub.pending(),
114        }
115    }
116
117    /// Take a delta snapshot: changes since the last `delta()` call
118    /// (or since construction if `delta()` has not been called yet).
119    ///
120    /// `receive_ratio` and `pending` are absolute (not deltas) because
121    /// ratios and instantaneous counts are not meaningful as differences.
122    pub fn delta<T: Pod>(&mut self, sub: &photon_ring::Subscriber<T>) -> SubscriberSnapshot {
123        let current = self.snapshot(sub);
124        let delta = SubscriberSnapshot {
125            total_received: current.total_received - self.prev_received,
126            total_lagged: current.total_lagged - self.prev_lagged,
127            receive_ratio: current.receive_ratio,
128            pending: current.pending,
129        };
130        self.prev_received = current.total_received;
131        self.prev_lagged = current.total_lagged;
132        delta
133    }
134}
135
136// ---------------------------------------------------------------------------
137// PublisherMetrics
138// ---------------------------------------------------------------------------
139
140/// Metrics wrapper for a [`photon_ring::Publisher`].
141///
142/// Since publisher counters are monotonic and rarely need delta tracking,
143/// this provides only a static `snapshot()` method.
144pub struct PublisherMetrics;
145
146impl PublisherMetrics {
147    /// Take an absolute snapshot of current publisher metrics.
148    pub fn snapshot<T: Pod>(pub_: &photon_ring::Publisher<T>) -> PublisherSnapshot {
149        PublisherSnapshot {
150            published: pub_.published(),
151            capacity: pub_.capacity(),
152        }
153    }
154}
155
156// ---------------------------------------------------------------------------
157// Tests
158// ---------------------------------------------------------------------------
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    #[test]
165    fn subscriber_snapshot_initial() {
166        let (_, subs) = photon_ring::channel::<u64>(64);
167        let sub = subs.subscribe();
168        let metrics = SubscriberMetrics::new(&sub);
169        let snap = metrics.snapshot(&sub);
170
171        assert_eq!(snap.total_received, 0);
172        assert_eq!(snap.total_lagged, 0);
173        assert_eq!(snap.receive_ratio, 0.0);
174        assert_eq!(snap.pending, 0);
175    }
176
177    #[test]
178    fn subscriber_snapshot_after_recv() {
179        let (mut pub_, subs) = photon_ring::channel::<u64>(64);
180        let mut sub = subs.subscribe();
181        let metrics = SubscriberMetrics::new(&sub);
182
183        pub_.publish(1);
184        pub_.publish(2);
185        pub_.publish(3);
186        assert_eq!(sub.try_recv(), Ok(1));
187        assert_eq!(sub.try_recv(), Ok(2));
188
189        let snap = metrics.snapshot(&sub);
190        assert_eq!(snap.total_received, 2);
191        assert_eq!(snap.total_lagged, 0);
192        assert_eq!(snap.receive_ratio, 1.0);
193        assert_eq!(snap.pending, 1); // message 3 still pending
194    }
195
196    #[test]
197    fn subscriber_delta() {
198        let (mut pub_, subs) = photon_ring::channel::<u64>(64);
199        let mut sub = subs.subscribe();
200        let mut metrics = SubscriberMetrics::new(&sub);
201
202        // Phase 1: receive 2 messages
203        pub_.publish(10);
204        pub_.publish(20);
205        assert_eq!(sub.try_recv(), Ok(10));
206        assert_eq!(sub.try_recv(), Ok(20));
207
208        let d1 = metrics.delta(&sub);
209        assert_eq!(d1.total_received, 2);
210        assert_eq!(d1.total_lagged, 0);
211
212        // Phase 2: receive 1 more
213        pub_.publish(30);
214        assert_eq!(sub.try_recv(), Ok(30));
215
216        let d2 = metrics.delta(&sub);
217        assert_eq!(d2.total_received, 1);
218        assert_eq!(d2.total_lagged, 0);
219    }
220
221    #[test]
222    fn subscriber_delta_with_lag() {
223        // Tiny ring: capacity 4. Publishing 6 messages before the subscriber
224        // reads will cause lag.
225        let (mut pub_, subs) = photon_ring::channel::<u64>(4);
226        let mut sub = subs.subscribe();
227        let mut metrics = SubscriberMetrics::new(&sub);
228
229        for i in 0..6 {
230            pub_.publish(i);
231        }
232
233        // First try_recv should report Lagged, second should succeed.
234        let _ = sub.try_recv(); // Lagged — cursor advanced
235        let _ = sub.try_recv(); // Ok
236
237        let d = metrics.delta(&sub);
238        // At least some lag should have been recorded.
239        assert!(d.total_lagged > 0 || d.total_received > 0);
240    }
241
242    #[test]
243    fn publisher_snapshot() {
244        let (mut pub_, _subs) = photon_ring::channel::<u64>(128);
245
246        let snap0 = PublisherMetrics::snapshot(&pub_);
247        assert_eq!(snap0.published, 0);
248        assert_eq!(snap0.capacity, 128);
249
250        pub_.publish(1);
251        pub_.publish(2);
252        pub_.publish(3);
253
254        let snap1 = PublisherMetrics::snapshot(&pub_);
255        assert_eq!(snap1.published, 3);
256        assert_eq!(snap1.capacity, 128);
257    }
258
259    #[test]
260    fn subscriber_snapshot_display() {
261        let snap = SubscriberSnapshot {
262            total_received: 100,
263            total_lagged: 5,
264            receive_ratio: 0.9524,
265            pending: 3,
266        };
267        let s = format!("{snap}");
268        assert_eq!(s, "recv=100 lag=5 ratio=95.24% pending=3");
269    }
270
271    #[test]
272    fn publisher_snapshot_display() {
273        let snap = PublisherSnapshot {
274            published: 42,
275            capacity: 1024,
276        };
277        let s = format!("{snap}");
278        assert_eq!(s, "published=42 capacity=1024");
279    }
280}