photon_ring_metrics/
lib.rs1use photon_ring::Pod;
34
35#[derive(Debug, Clone, Copy, PartialEq)]
41pub struct SubscriberSnapshot {
42 pub total_received: u64,
44 pub total_lagged: u64,
46 pub receive_ratio: f64,
48 pub pending: u64,
50}
51
52impl core::fmt::Display for SubscriberSnapshot {
53 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
54 write!(
55 f,
56 "recv={} lag={} ratio={:.2}% pending={}",
57 self.total_received,
58 self.total_lagged,
59 self.receive_ratio * 100.0,
60 self.pending,
61 )
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq)]
71pub struct PublisherSnapshot {
72 pub published: u64,
74 pub capacity: u64,
76}
77
78impl core::fmt::Display for PublisherSnapshot {
79 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
80 write!(f, "published={} capacity={}", self.published, self.capacity,)
81 }
82}
83
84pub struct SubscriberMetrics {
93 prev_received: u64,
94 prev_lagged: u64,
95}
96
97impl SubscriberMetrics {
98 pub fn new<T: Pod>(sub: &photon_ring::Subscriber<T>) -> Self {
101 Self {
102 prev_received: sub.total_received(),
103 prev_lagged: sub.total_lagged(),
104 }
105 }
106
107 pub fn snapshot<T: Pod>(&self, sub: &photon_ring::Subscriber<T>) -> SubscriberSnapshot {
109 SubscriberSnapshot {
110 total_received: sub.total_received(),
111 total_lagged: sub.total_lagged(),
112 receive_ratio: sub.receive_ratio(),
113 pending: sub.pending(),
114 }
115 }
116
117 pub fn delta<T: Pod>(&mut self, sub: &photon_ring::Subscriber<T>) -> SubscriberSnapshot {
123 let current = self.snapshot(sub);
124 let delta = SubscriberSnapshot {
125 total_received: current.total_received - self.prev_received,
126 total_lagged: current.total_lagged - self.prev_lagged,
127 receive_ratio: current.receive_ratio,
128 pending: current.pending,
129 };
130 self.prev_received = current.total_received;
131 self.prev_lagged = current.total_lagged;
132 delta
133 }
134}
135
136pub struct PublisherMetrics;
145
146impl PublisherMetrics {
147 pub fn snapshot<T: Pod>(pub_: &photon_ring::Publisher<T>) -> PublisherSnapshot {
149 PublisherSnapshot {
150 published: pub_.published(),
151 capacity: pub_.capacity(),
152 }
153 }
154}
155
156#[cfg(test)]
161mod tests {
162 use super::*;
163
164 #[test]
165 fn subscriber_snapshot_initial() {
166 let (_, subs) = photon_ring::channel::<u64>(64);
167 let sub = subs.subscribe();
168 let metrics = SubscriberMetrics::new(&sub);
169 let snap = metrics.snapshot(&sub);
170
171 assert_eq!(snap.total_received, 0);
172 assert_eq!(snap.total_lagged, 0);
173 assert_eq!(snap.receive_ratio, 0.0);
174 assert_eq!(snap.pending, 0);
175 }
176
177 #[test]
178 fn subscriber_snapshot_after_recv() {
179 let (mut pub_, subs) = photon_ring::channel::<u64>(64);
180 let mut sub = subs.subscribe();
181 let metrics = SubscriberMetrics::new(&sub);
182
183 pub_.publish(1);
184 pub_.publish(2);
185 pub_.publish(3);
186 assert_eq!(sub.try_recv(), Ok(1));
187 assert_eq!(sub.try_recv(), Ok(2));
188
189 let snap = metrics.snapshot(&sub);
190 assert_eq!(snap.total_received, 2);
191 assert_eq!(snap.total_lagged, 0);
192 assert_eq!(snap.receive_ratio, 1.0);
193 assert_eq!(snap.pending, 1); }
195
196 #[test]
197 fn subscriber_delta() {
198 let (mut pub_, subs) = photon_ring::channel::<u64>(64);
199 let mut sub = subs.subscribe();
200 let mut metrics = SubscriberMetrics::new(&sub);
201
202 pub_.publish(10);
204 pub_.publish(20);
205 assert_eq!(sub.try_recv(), Ok(10));
206 assert_eq!(sub.try_recv(), Ok(20));
207
208 let d1 = metrics.delta(&sub);
209 assert_eq!(d1.total_received, 2);
210 assert_eq!(d1.total_lagged, 0);
211
212 pub_.publish(30);
214 assert_eq!(sub.try_recv(), Ok(30));
215
216 let d2 = metrics.delta(&sub);
217 assert_eq!(d2.total_received, 1);
218 assert_eq!(d2.total_lagged, 0);
219 }
220
221 #[test]
222 fn subscriber_delta_with_lag() {
223 let (mut pub_, subs) = photon_ring::channel::<u64>(4);
226 let mut sub = subs.subscribe();
227 let mut metrics = SubscriberMetrics::new(&sub);
228
229 for i in 0..6 {
230 pub_.publish(i);
231 }
232
233 let _ = sub.try_recv(); let _ = sub.try_recv(); let d = metrics.delta(&sub);
238 assert!(d.total_lagged > 0 || d.total_received > 0);
240 }
241
242 #[test]
243 fn publisher_snapshot() {
244 let (mut pub_, _subs) = photon_ring::channel::<u64>(128);
245
246 let snap0 = PublisherMetrics::snapshot(&pub_);
247 assert_eq!(snap0.published, 0);
248 assert_eq!(snap0.capacity, 128);
249
250 pub_.publish(1);
251 pub_.publish(2);
252 pub_.publish(3);
253
254 let snap1 = PublisherMetrics::snapshot(&pub_);
255 assert_eq!(snap1.published, 3);
256 assert_eq!(snap1.capacity, 128);
257 }
258
259 #[test]
260 fn subscriber_snapshot_display() {
261 let snap = SubscriberSnapshot {
262 total_received: 100,
263 total_lagged: 5,
264 receive_ratio: 0.9524,
265 pending: 3,
266 };
267 let s = format!("{snap}");
268 assert_eq!(s, "recv=100 lag=5 ratio=95.24% pending=3");
269 }
270
271 #[test]
272 fn publisher_snapshot_display() {
273 let snap = PublisherSnapshot {
274 published: 42,
275 capacity: 1024,
276 };
277 let s = format!("{snap}");
278 assert_eq!(s, "published=42 capacity=1024");
279 }
280}