mockforge_observability/
protocol_metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
28
29#[derive(Debug)]
34pub struct ProtocolMetrics {
35 pub connections_total: AtomicU64,
37 pub connections_active: AtomicU64,
39 pub messages_total: AtomicU64,
41 pub errors_total: AtomicU64,
43 pub bytes_sent: AtomicU64,
45 pub bytes_received: AtomicU64,
47 pub latency_micros: AtomicU64,
49}
50
51impl ProtocolMetrics {
52 pub fn new() -> Self {
54 Self {
55 connections_total: AtomicU64::new(0),
56 connections_active: AtomicU64::new(0),
57 messages_total: AtomicU64::new(0),
58 errors_total: AtomicU64::new(0),
59 bytes_sent: AtomicU64::new(0),
60 bytes_received: AtomicU64::new(0),
61 latency_micros: AtomicU64::new(0),
62 }
63 }
64
65 pub fn record_connection(&self) {
67 self.connections_total.fetch_add(1, Ordering::Relaxed);
68 self.connections_active.fetch_add(1, Ordering::Relaxed);
69 }
70
71 pub fn record_disconnection(&self) {
73 self.connections_active.fetch_sub(1, Ordering::Relaxed);
74 }
75
76 pub fn record_message(&self) {
78 self.messages_total.fetch_add(1, Ordering::Relaxed);
79 }
80
81 pub fn record_messages(&self, count: u64) {
83 self.messages_total.fetch_add(count, Ordering::Relaxed);
84 }
85
86 pub fn record_error(&self) {
88 self.errors_total.fetch_add(1, Ordering::Relaxed);
89 }
90
91 pub fn record_bytes_sent(&self, bytes: u64) {
93 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
94 }
95
96 pub fn record_bytes_received(&self, bytes: u64) {
98 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
99 }
100
101 pub fn record_latency(&self, latency_micros: u64) {
103 let current = self.latency_micros.load(Ordering::Relaxed);
104 let new_avg = if current == 0 {
105 latency_micros
106 } else {
107 (current + latency_micros) / 2
108 };
109 self.latency_micros.store(new_avg, Ordering::Relaxed);
110 }
111
112 pub fn snapshot(&self) -> ProtocolMetricsSnapshot {
114 ProtocolMetricsSnapshot {
115 connections_total: self.connections_total.load(Ordering::Relaxed),
116 connections_active: self.connections_active.load(Ordering::Relaxed),
117 messages_total: self.messages_total.load(Ordering::Relaxed),
118 errors_total: self.errors_total.load(Ordering::Relaxed),
119 bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
120 bytes_received: self.bytes_received.load(Ordering::Relaxed),
121 avg_latency_micros: self.latency_micros.load(Ordering::Relaxed),
122 }
123 }
124
125 pub fn export_prometheus(&self, prefix: &str) -> String {
130 let snap = self.snapshot();
131 format!(
132 "# HELP {prefix}_connections_total Total number of connections\n\
133 # TYPE {prefix}_connections_total counter\n\
134 {prefix}_connections_total {}\n\
135 # HELP {prefix}_connections_active Number of active connections\n\
136 # TYPE {prefix}_connections_active gauge\n\
137 {prefix}_connections_active {}\n\
138 # HELP {prefix}_messages_total Total messages processed\n\
139 # TYPE {prefix}_messages_total counter\n\
140 {prefix}_messages_total {}\n\
141 # HELP {prefix}_errors_total Total errors\n\
142 # TYPE {prefix}_errors_total counter\n\
143 {prefix}_errors_total {}\n\
144 # HELP {prefix}_bytes_sent Total bytes sent\n\
145 # TYPE {prefix}_bytes_sent counter\n\
146 {prefix}_bytes_sent {}\n\
147 # HELP {prefix}_bytes_received Total bytes received\n\
148 # TYPE {prefix}_bytes_received counter\n\
149 {prefix}_bytes_received {}\n\
150 # HELP {prefix}_latency_micros_avg Average latency in microseconds\n\
151 # TYPE {prefix}_latency_micros_avg gauge\n\
152 {prefix}_latency_micros_avg {}\n",
153 snap.connections_total,
154 snap.connections_active,
155 snap.messages_total,
156 snap.errors_total,
157 snap.bytes_sent,
158 snap.bytes_received,
159 snap.avg_latency_micros,
160 )
161 }
162}
163
164impl Default for ProtocolMetrics {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
172pub struct ProtocolMetricsSnapshot {
173 pub connections_total: u64,
175 pub connections_active: u64,
177 pub messages_total: u64,
179 pub errors_total: u64,
181 pub bytes_sent: u64,
183 pub bytes_received: u64,
185 pub avg_latency_micros: u64,
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn test_new_metrics_are_zero() {
195 let m = ProtocolMetrics::new();
196 let s = m.snapshot();
197 assert_eq!(s.connections_total, 0);
198 assert_eq!(s.connections_active, 0);
199 assert_eq!(s.messages_total, 0);
200 assert_eq!(s.errors_total, 0);
201 assert_eq!(s.bytes_sent, 0);
202 assert_eq!(s.bytes_received, 0);
203 assert_eq!(s.avg_latency_micros, 0);
204 }
205
206 #[test]
207 fn test_default_is_new() {
208 let m = ProtocolMetrics::default();
209 assert_eq!(m.connections_total.load(Ordering::Relaxed), 0);
210 }
211
212 #[test]
213 fn test_record_connection_and_disconnection() {
214 let m = ProtocolMetrics::new();
215 m.record_connection();
216 m.record_connection();
217 assert_eq!(m.connections_total.load(Ordering::Relaxed), 2);
218 assert_eq!(m.connections_active.load(Ordering::Relaxed), 2);
219
220 m.record_disconnection();
221 assert_eq!(m.connections_total.load(Ordering::Relaxed), 2);
222 assert_eq!(m.connections_active.load(Ordering::Relaxed), 1);
223 }
224
225 #[test]
226 fn test_record_message() {
227 let m = ProtocolMetrics::new();
228 m.record_message();
229 m.record_message();
230 assert_eq!(m.messages_total.load(Ordering::Relaxed), 2);
231 }
232
233 #[test]
234 fn test_record_messages_batch() {
235 let m = ProtocolMetrics::new();
236 m.record_messages(10);
237 m.record_messages(5);
238 assert_eq!(m.messages_total.load(Ordering::Relaxed), 15);
239 }
240
241 #[test]
242 fn test_record_error() {
243 let m = ProtocolMetrics::new();
244 m.record_error();
245 m.record_error();
246 assert_eq!(m.errors_total.load(Ordering::Relaxed), 2);
247 }
248
249 #[test]
250 fn test_record_bytes() {
251 let m = ProtocolMetrics::new();
252 m.record_bytes_sent(100);
253 m.record_bytes_received(200);
254 m.record_bytes_sent(50);
255
256 let s = m.snapshot();
257 assert_eq!(s.bytes_sent, 150);
258 assert_eq!(s.bytes_received, 200);
259 }
260
261 #[test]
262 fn test_record_latency() {
263 let m = ProtocolMetrics::new();
264 m.record_latency(100);
265 assert_eq!(m.latency_micros.load(Ordering::Relaxed), 100);
266
267 m.record_latency(200);
268 assert_eq!(m.latency_micros.load(Ordering::Relaxed), 150);
270 }
271
272 #[test]
273 fn test_snapshot_is_independent() {
274 let m = ProtocolMetrics::new();
275 m.record_connection();
276 let s1 = m.snapshot();
277
278 m.record_connection();
279 let s2 = m.snapshot();
280
281 assert_eq!(s1.connections_total, 1);
282 assert_eq!(s2.connections_total, 2);
283 }
284
285 #[test]
286 fn test_snapshot_clone() {
287 let m = ProtocolMetrics::new();
288 m.record_connection();
289 let s = m.snapshot();
290 let cloned = s.clone();
291 assert_eq!(s, cloned);
292 }
293
294 #[test]
295 fn test_debug_formatting() {
296 let m = ProtocolMetrics::new();
297 let debug = format!("{:?}", m);
298 assert!(debug.contains("ProtocolMetrics"));
299
300 let s = m.snapshot();
301 let debug = format!("{:?}", s);
302 assert!(debug.contains("ProtocolMetricsSnapshot"));
303 }
304
305 #[test]
306 fn test_export_prometheus() {
307 let m = ProtocolMetrics::new();
308 m.record_connection();
309 m.record_message();
310 m.record_error();
311 m.record_bytes_sent(1024);
312
313 let output = m.export_prometheus("test_proto");
314 assert!(output.contains("test_proto_connections_total 1"));
315 assert!(output.contains("test_proto_messages_total 1"));
316 assert!(output.contains("test_proto_errors_total 1"));
317 assert!(output.contains("test_proto_bytes_sent 1024"));
318 assert!(output.contains("# HELP"));
319 assert!(output.contains("# TYPE"));
320 }
321}