Skip to main content

mockforge_observability/
protocol_metrics.rs

1//! Generic protocol metrics collector for all protocol crates.
2//!
3//! Provides a shared [`ProtocolMetrics`] struct with the common counters that every
4//! protocol implementation needs (connections, messages, errors, bytes, latency).
5//! Protocol crates embed this struct and add protocol-specific counters alongside it.
6//!
7//! # Example
8//!
9//! ```rust
10//! use mockforge_observability::protocol_metrics::ProtocolMetrics;
11//!
12//! let metrics = ProtocolMetrics::new();
13//! metrics.record_connection();
14//! metrics.record_message();
15//! metrics.record_bytes_sent(1024);
16//! metrics.record_error();
17//! metrics.record_disconnection();
18//!
19//! let snapshot = metrics.snapshot();
20//! assert_eq!(snapshot.connections_total, 1);
21//! assert_eq!(snapshot.connections_active, 0);
22//! assert_eq!(snapshot.messages_total, 1);
23//! assert_eq!(snapshot.errors_total, 1);
24//! assert_eq!(snapshot.bytes_sent, 1024);
25//! ```
26
27use std::sync::atomic::{AtomicU64, Ordering};
28
29/// Generic metrics collector shared by all protocol crates.
30///
31/// Contains the common counters that every protocol needs. Protocol crates can
32/// embed this struct and add protocol-specific fields alongside it.
33#[derive(Debug)]
34pub struct ProtocolMetrics {
35    /// Total number of connections ever established
36    pub connections_total: AtomicU64,
37    /// Currently active connections
38    pub connections_active: AtomicU64,
39    /// Total messages processed (sent or received, depending on protocol)
40    pub messages_total: AtomicU64,
41    /// Total errors encountered
42    pub errors_total: AtomicU64,
43    /// Total bytes sent
44    pub bytes_sent: AtomicU64,
45    /// Total bytes received
46    pub bytes_received: AtomicU64,
47    /// Average latency in microseconds (simple moving average)
48    pub latency_micros: AtomicU64,
49}
50
51impl ProtocolMetrics {
52    /// Create a new metrics collector with all counters at zero.
53    pub fn new() -> Self {
54        Self {
55            connections_total: AtomicU64::new(0),
56            connections_active: AtomicU64::new(0),
57            messages_total: AtomicU64::new(0),
58            errors_total: AtomicU64::new(0),
59            bytes_sent: AtomicU64::new(0),
60            bytes_received: AtomicU64::new(0),
61            latency_micros: AtomicU64::new(0),
62        }
63    }
64
65    /// Record a new connection (increments both total and active).
66    pub fn record_connection(&self) {
67        self.connections_total.fetch_add(1, Ordering::Relaxed);
68        self.connections_active.fetch_add(1, Ordering::Relaxed);
69    }
70
71    /// Record a disconnection (decrements active connections).
72    pub fn record_disconnection(&self) {
73        self.connections_active.fetch_sub(1, Ordering::Relaxed);
74    }
75
76    /// Record a message processed.
77    pub fn record_message(&self) {
78        self.messages_total.fetch_add(1, Ordering::Relaxed);
79    }
80
81    /// Record multiple messages processed at once.
82    pub fn record_messages(&self, count: u64) {
83        self.messages_total.fetch_add(count, Ordering::Relaxed);
84    }
85
86    /// Record an error.
87    pub fn record_error(&self) {
88        self.errors_total.fetch_add(1, Ordering::Relaxed);
89    }
90
91    /// Record bytes sent.
92    pub fn record_bytes_sent(&self, bytes: u64) {
93        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
94    }
95
96    /// Record bytes received.
97    pub fn record_bytes_received(&self, bytes: u64) {
98        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
99    }
100
101    /// Record latency in microseconds (simple moving average).
102    pub fn record_latency(&self, latency_micros: u64) {
103        let current = self.latency_micros.load(Ordering::Relaxed);
104        let new_avg = if current == 0 {
105            latency_micros
106        } else {
107            (current + latency_micros) / 2
108        };
109        self.latency_micros.store(new_avg, Ordering::Relaxed);
110    }
111
112    /// Take a point-in-time snapshot of all counters.
113    pub fn snapshot(&self) -> ProtocolMetricsSnapshot {
114        ProtocolMetricsSnapshot {
115            connections_total: self.connections_total.load(Ordering::Relaxed),
116            connections_active: self.connections_active.load(Ordering::Relaxed),
117            messages_total: self.messages_total.load(Ordering::Relaxed),
118            errors_total: self.errors_total.load(Ordering::Relaxed),
119            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
120            bytes_received: self.bytes_received.load(Ordering::Relaxed),
121            avg_latency_micros: self.latency_micros.load(Ordering::Relaxed),
122        }
123    }
124
125    /// Export common metrics in Prometheus format with a given protocol prefix.
126    ///
127    /// # Arguments
128    /// * `prefix` - Protocol name for metric labels (e.g., "kafka", "mqtt")
129    pub fn export_prometheus(&self, prefix: &str) -> String {
130        let snap = self.snapshot();
131        format!(
132            "# HELP {prefix}_connections_total Total number of connections\n\
133             # TYPE {prefix}_connections_total counter\n\
134             {prefix}_connections_total {}\n\
135             # HELP {prefix}_connections_active Number of active connections\n\
136             # TYPE {prefix}_connections_active gauge\n\
137             {prefix}_connections_active {}\n\
138             # HELP {prefix}_messages_total Total messages processed\n\
139             # TYPE {prefix}_messages_total counter\n\
140             {prefix}_messages_total {}\n\
141             # HELP {prefix}_errors_total Total errors\n\
142             # TYPE {prefix}_errors_total counter\n\
143             {prefix}_errors_total {}\n\
144             # HELP {prefix}_bytes_sent Total bytes sent\n\
145             # TYPE {prefix}_bytes_sent counter\n\
146             {prefix}_bytes_sent {}\n\
147             # HELP {prefix}_bytes_received Total bytes received\n\
148             # TYPE {prefix}_bytes_received counter\n\
149             {prefix}_bytes_received {}\n\
150             # HELP {prefix}_latency_micros_avg Average latency in microseconds\n\
151             # TYPE {prefix}_latency_micros_avg gauge\n\
152             {prefix}_latency_micros_avg {}\n",
153            snap.connections_total,
154            snap.connections_active,
155            snap.messages_total,
156            snap.errors_total,
157            snap.bytes_sent,
158            snap.bytes_received,
159            snap.avg_latency_micros,
160        )
161    }
162}
163
164impl Default for ProtocolMetrics {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170/// Point-in-time snapshot of protocol metrics with plain `u64` values.
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub struct ProtocolMetricsSnapshot {
173    /// Total connections ever established
174    pub connections_total: u64,
175    /// Currently active connections
176    pub connections_active: u64,
177    /// Total messages processed
178    pub messages_total: u64,
179    /// Total errors encountered
180    pub errors_total: u64,
181    /// Total bytes sent
182    pub bytes_sent: u64,
183    /// Total bytes received
184    pub bytes_received: u64,
185    /// Average latency in microseconds
186    pub avg_latency_micros: u64,
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_new_metrics_are_zero() {
195        let m = ProtocolMetrics::new();
196        let s = m.snapshot();
197        assert_eq!(s.connections_total, 0);
198        assert_eq!(s.connections_active, 0);
199        assert_eq!(s.messages_total, 0);
200        assert_eq!(s.errors_total, 0);
201        assert_eq!(s.bytes_sent, 0);
202        assert_eq!(s.bytes_received, 0);
203        assert_eq!(s.avg_latency_micros, 0);
204    }
205
206    #[test]
207    fn test_default_is_new() {
208        let m = ProtocolMetrics::default();
209        assert_eq!(m.connections_total.load(Ordering::Relaxed), 0);
210    }
211
212    #[test]
213    fn test_record_connection_and_disconnection() {
214        let m = ProtocolMetrics::new();
215        m.record_connection();
216        m.record_connection();
217        assert_eq!(m.connections_total.load(Ordering::Relaxed), 2);
218        assert_eq!(m.connections_active.load(Ordering::Relaxed), 2);
219
220        m.record_disconnection();
221        assert_eq!(m.connections_total.load(Ordering::Relaxed), 2);
222        assert_eq!(m.connections_active.load(Ordering::Relaxed), 1);
223    }
224
225    #[test]
226    fn test_record_message() {
227        let m = ProtocolMetrics::new();
228        m.record_message();
229        m.record_message();
230        assert_eq!(m.messages_total.load(Ordering::Relaxed), 2);
231    }
232
233    #[test]
234    fn test_record_messages_batch() {
235        let m = ProtocolMetrics::new();
236        m.record_messages(10);
237        m.record_messages(5);
238        assert_eq!(m.messages_total.load(Ordering::Relaxed), 15);
239    }
240
241    #[test]
242    fn test_record_error() {
243        let m = ProtocolMetrics::new();
244        m.record_error();
245        m.record_error();
246        assert_eq!(m.errors_total.load(Ordering::Relaxed), 2);
247    }
248
249    #[test]
250    fn test_record_bytes() {
251        let m = ProtocolMetrics::new();
252        m.record_bytes_sent(100);
253        m.record_bytes_received(200);
254        m.record_bytes_sent(50);
255
256        let s = m.snapshot();
257        assert_eq!(s.bytes_sent, 150);
258        assert_eq!(s.bytes_received, 200);
259    }
260
261    #[test]
262    fn test_record_latency() {
263        let m = ProtocolMetrics::new();
264        m.record_latency(100);
265        assert_eq!(m.latency_micros.load(Ordering::Relaxed), 100);
266
267        m.record_latency(200);
268        // Moving average: (100 + 200) / 2 = 150
269        assert_eq!(m.latency_micros.load(Ordering::Relaxed), 150);
270    }
271
272    #[test]
273    fn test_snapshot_is_independent() {
274        let m = ProtocolMetrics::new();
275        m.record_connection();
276        let s1 = m.snapshot();
277
278        m.record_connection();
279        let s2 = m.snapshot();
280
281        assert_eq!(s1.connections_total, 1);
282        assert_eq!(s2.connections_total, 2);
283    }
284
285    #[test]
286    fn test_snapshot_clone() {
287        let m = ProtocolMetrics::new();
288        m.record_connection();
289        let s = m.snapshot();
290        let cloned = s.clone();
291        assert_eq!(s, cloned);
292    }
293
294    #[test]
295    fn test_debug_formatting() {
296        let m = ProtocolMetrics::new();
297        let debug = format!("{:?}", m);
298        assert!(debug.contains("ProtocolMetrics"));
299
300        let s = m.snapshot();
301        let debug = format!("{:?}", s);
302        assert!(debug.contains("ProtocolMetricsSnapshot"));
303    }
304
305    #[test]
306    fn test_export_prometheus() {
307        let m = ProtocolMetrics::new();
308        m.record_connection();
309        m.record_message();
310        m.record_error();
311        m.record_bytes_sent(1024);
312
313        let output = m.export_prometheus("test_proto");
314        assert!(output.contains("test_proto_connections_total 1"));
315        assert!(output.contains("test_proto_messages_total 1"));
316        assert!(output.contains("test_proto_errors_total 1"));
317        assert!(output.contains("test_proto_bytes_sent 1024"));
318        assert!(output.contains("# HELP"));
319        assert!(output.contains("# TYPE"));
320    }
321}