amadeus_node/
metrics.rs

1use crate::utils::misc::{Typename, get_unix_secs_now};
2use scc::HashIndex as SccHashIndex;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
8use tracing::warn;
9
10/// Complete metrics snapshot
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct MetricsSnapshot {
13    pub incoming_protos: HashMap<String, u64>,
14    pub outgoing_protos: HashMap<String, u64>,
15    pub errors: HashMap<String, u64>,
16    pub udp: UdpStats,
17    pub udpps: UdpStats, // stats per second
18    pub uptime: u32,
19    pub tasks: u64,
20}
21
22/// Packet statistics with rate calculations
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct UdpStats {
25    pub incoming_packets: u64,
26    pub incoming_bytes: u64,
27    pub outgoing_packets: u64,
28    pub outgoing_bytes: u64,
29}
30
31impl MetricsSnapshot {
32    /// Serialize to JSON string
33    pub fn to_json_string(&self) -> String {
34        serde_json::to_string(self).unwrap_or_else(|e| {
35            warn!("Failed to serialize metrics snapshot: {}", e);
36            "{}".into()
37        })
38    }
39
40    /// Serialize to Prometheus metrics format string
41    pub fn to_prometheus_string(&self) -> String {
42        let udp = format!(
43            r#"
44# HELP amadeus_udp_packets_total Total number of UDP packets
45# TYPE amadeus_udp_packets_total counter
46amadeus_udp_packets_total{{type="incoming"}} {}
47amadeus_udp_packets_total{{type="outgoing"}} {}
48
49# HELP amadeus_udp_bytes_total Total number of UDP bytes
50# TYPE amadeus_udp_bytes_total counter
51amadeus_udp_bytes_total{{type="incoming"}} {}
52amadeus_udp_bytes_total{{type="outgoing"}} {}
53
54# HELP amadeus_uptime_seconds Process uptime in seconds
55# TYPE amadeus_uptime_seconds gauge
56amadeus_uptime_seconds {}
57
58# HELP amadeus_tasks_active Current number of active tasks
59# TYPE amadeus_tasks_active gauge
60amadeus_tasks_active {}"#,
61            self.udp.incoming_packets,
62            self.udp.outgoing_packets,
63            self.udp.incoming_bytes,
64            self.udp.outgoing_bytes,
65            self.uptime,
66            self.tasks
67        );
68
69        let udpps = format!(
70            r#"
71
72# HELP amadeus_udp_packets_per_second Total number of UDP packets
73# TYPE amadeus_udp_packets_per_second gauge
74amadeus_udp_packets_per_second{{type="incoming"}} {}
75amadeus_udp_packets_per_second{{type="outgoing"}} {}
76
77# HELP amadeus_udp_bytes_per_second Total number of UDP bytes
78# TYPE amadeus_udp_bytes_per_second gauge
79amadeus_udp_bytes_per_second{{type="incoming"}} {}
80amadeus_udp_bytes_per_second{{type="outgoing"}} {}"#,
81            self.udpps.incoming_packets,
82            self.udpps.outgoing_packets,
83            self.udpps.incoming_bytes,
84            self.udpps.outgoing_bytes
85        );
86
87        let mut protos = Vec::new();
88        protos.push("\n\n# HELP amadeus_incoming_protos_total Total number of proto messages handled by type".into());
89        protos.push("# TYPE amadeus_incoming_protos_total counter".into());
90        for (proto_name, count) in &self.incoming_protos {
91            protos.push(format!("amadeus_incoming_protos_total{{type=\"{}\"}} {}", proto_name, count));
92        }
93
94        let mut sent_packets = Vec::new();
95        sent_packets
96            .push("\n\n# HELP amadeus_outgoing_protos_total Total number of messages sent by protocol type".into());
97        sent_packets.push("# TYPE amadeus_outgoing_protos_total counter".into());
98        for (proto_name, count) in &self.outgoing_protos {
99            sent_packets.push(format!("amadeus_outgoing_protos_total{{type=\"{}\"}} {}", proto_name, count));
100        }
101
102        let mut errors = Vec::new();
103        errors.push("\n\n# HELP amadeus_packet_errors_total Total number of packet processing errors by type".into());
104        errors.push("# TYPE amadeus_packet_errors_total counter".into());
105        for (error_type, count) in &self.errors {
106            errors.push(format!("amadeus_packet_errors_total{{type=\"{}\"}} {}", error_type, count));
107        }
108
109        format!("{}{}{}{}{}", udp, udpps, protos.join("\n"), sent_packets.join("\n"), errors.join("\n"))
110    }
111}
112
113pub struct Metrics {
114    // Total packets counter
115    incoming_bytes: AtomicU64,   // Total bytes received
116    incoming_packets: AtomicU64, // Total UDP packets received
117    outgoing_bytes: AtomicU64,   // Total bytes sent
118    outgoing_packets: AtomicU64, // Total UDP packets sent
119
120    // Handled protocol message counters by name (dynamic)
121    incoming_protos: SccHashIndex<String, Arc<AtomicU64>>,
122
123    // Error counters by type name (dynamic)
124    errors: SccHashIndex<String, Arc<AtomicU64>>,
125
126    // Sent packets counter by protocol type (dynamic)
127    outgoing_protos: SccHashIndex<String, Arc<AtomicU64>>,
128
129    // Active tasks gauge
130    tasks: AtomicU64,
131
132    // Start time for uptime calculation
133    start_time: u32,
134}
135
136impl Metrics {
137    pub fn new() -> Self {
138        let handled_protos = SccHashIndex::new();
139        let errors = SccHashIndex::new();
140        let sent_packets = SccHashIndex::new();
141        Self {
142            incoming_bytes: AtomicU64::new(0),
143            incoming_packets: AtomicU64::new(0),
144            outgoing_bytes: AtomicU64::new(0),
145            outgoing_packets: AtomicU64::new(0),
146            incoming_protos: handled_protos,
147            errors,
148            outgoing_protos: sent_packets,
149            tasks: AtomicU64::new(0),
150            start_time: get_unix_secs_now(),
151        }
152    }
153
154    #[inline]
155    pub fn add_incoming_proto(&self, name: &str) {
156        // correct way of handling ownership in scc HashIndex
157        let name = name.to_owned();
158        if let Some(counter) = self.incoming_protos.peek_with(&name, |_, v| v.clone()) {
159            counter.fetch_add(1, Ordering::Relaxed);
160        } else {
161            let _ = self.incoming_protos.insert_sync(name, Arc::new(AtomicU64::new(1)));
162        }
163    }
164
165    #[inline]
166    pub fn add_outgoing_proto(&self, name: &str) {
167        // correct way of handling ownership in scc HashIndex
168        let name = name.to_owned();
169        if let Some(counter) = self.outgoing_protos.peek_with(&name, |_, v| v.clone()) {
170            counter.fetch_add(1, Ordering::Relaxed);
171        } else {
172            let _ = self.outgoing_protos.insert_sync(name, Arc::new(AtomicU64::new(1)));
173        }
174    }
175
176    /// Increment UDP packet count with size
177    pub fn add_incoming_udp_packet(&self, len: usize) {
178        self.incoming_bytes.fetch_add(len as u64, Ordering::Relaxed);
179        self.incoming_packets.fetch_add(1, Ordering::Relaxed);
180    }
181
182    /// Increment outgoing UDP packet count with size
183    pub fn add_outgoing_udp_packet(&self, len: usize) {
184        self.outgoing_bytes.fetch_add(len as u64, Ordering::Relaxed);
185        self.outgoing_packets.fetch_add(1, Ordering::Relaxed);
186    }
187
188    /// Increment V2 parsing errors
189    pub fn add_error<E: Debug + Typename>(&self, error: &E) {
190        warn!(target = "metrics", "error: {error:?}");
191        self.add_error_by_name(error.typename());
192    }
193
194    fn add_error_by_name(&self, error_type: &str) {
195        // correct way of handling ownership in scc HashIndex
196        let et_owned = error_type.to_string();
197        if let Some(counter) = self.errors.peek_with(&et_owned, |_, v| v.clone()) {
198            counter.fetch_add(1, Ordering::Relaxed);
199        } else {
200            let _ = self.errors.insert_sync(et_owned, Arc::new(AtomicU64::new(1)));
201        }
202    }
203
204    /// Increment active task count
205    pub fn inc_tasks(&self) {
206        self.tasks.fetch_add(1, Ordering::Relaxed);
207    }
208
209    /// Decrement active task count
210    pub fn dec_tasks(&self) {
211        self.tasks.fetch_sub(1, Ordering::Relaxed);
212    }
213
214    /// Get a complete metrics snapshot
215    pub fn get_snapshot(&self) -> MetricsSnapshot {
216        let uptime = self.get_uptime();
217
218        let mut incoming_protos = HashMap::new();
219        let mut outgoing_protos = HashMap::new();
220        let mut errors = HashMap::new();
221
222        // Collect data using the new scc 3.0 API
223        self.incoming_protos.iter_sync(|proto_name, counter| {
224            incoming_protos.insert(proto_name.clone(), counter.load(Ordering::Relaxed));
225            true
226        });
227
228        self.outgoing_protos.iter_sync(|proto_name, counter| {
229            outgoing_protos.insert(proto_name.clone(), counter.load(Ordering::Relaxed));
230            true
231        });
232
233        self.errors.iter_sync(|error_type, counter| {
234            errors.insert(error_type.clone(), counter.load(Ordering::Relaxed));
235            true
236        });
237
238        let (udp, udpps) = self.get_udp_stats(uptime);
239        let tasks = self.tasks.load(Ordering::Relaxed);
240        MetricsSnapshot { incoming_protos, outgoing_protos, uptime, errors, udp, udpps, tasks }
241    }
242
243    // Small convenience function to get uptime
244    pub fn get_uptime(&self) -> u32 {
245        let now = get_unix_secs_now();
246        now.saturating_sub(self.start_time)
247    }
248
249    fn get_udp_stats(&self, uptime_seconds: u32) -> (UdpStats, UdpStats) {
250        static LAST_INCOMING_BYTES: AtomicU64 = AtomicU64::new(0);
251        static LAST_INCOMING_PACKETS: AtomicU64 = AtomicU64::new(0);
252        static LAST_OUTGOING_BYTES: AtomicU64 = AtomicU64::new(0);
253        static LAST_OUTGOING_PACKETS: AtomicU64 = AtomicU64::new(0);
254        static LAST_UPTIME_SECONDS: AtomicU32 = AtomicU32::new(0);
255
256        let incoming_packets = self.incoming_packets.load(Ordering::Relaxed);
257        let incoming_bytes = self.incoming_bytes.load(Ordering::Relaxed);
258        let outgoing_packets = self.outgoing_packets.load(Ordering::Relaxed);
259        let outgoing_bytes = self.outgoing_bytes.load(Ordering::Relaxed);
260
261        let lus = LAST_UPTIME_SECONDS.swap(uptime_seconds, Ordering::Relaxed);
262        let lip = LAST_INCOMING_PACKETS.swap(incoming_packets, Ordering::Relaxed);
263        let lib = LAST_INCOMING_BYTES.swap(incoming_bytes, Ordering::Relaxed);
264        let lop = LAST_OUTGOING_PACKETS.swap(outgoing_packets, Ordering::Relaxed);
265        let lob = LAST_OUTGOING_BYTES.swap(outgoing_bytes, Ordering::Relaxed);
266
267        // Use saturating arithmetic to avoid underflow when previous snapshot
268        // belongs to another Metrics instance or counters reset between runs.
269        let mut seconds = (uptime_seconds.saturating_sub(lus)) as u64;
270        if seconds == 0 {
271            seconds = 1;
272        }
273        let dp_in = incoming_packets.saturating_sub(lip);
274        let db_in = incoming_bytes.saturating_sub(lib);
275        let dp_out = outgoing_packets.saturating_sub(lop);
276        let db_out = outgoing_bytes.saturating_sub(lob);
277
278        let udp = UdpStats { incoming_packets, incoming_bytes, outgoing_packets, outgoing_bytes };
279        let udpps = UdpStats {
280            incoming_packets: dp_in / seconds,
281            incoming_bytes: db_in / seconds,
282            outgoing_packets: dp_out / seconds,
283            outgoing_bytes: db_out / seconds,
284        };
285
286        (udp, udpps)
287    }
288
289    /// Get JSON-formatted metrics (backward compatibility)
290    pub fn get_json(&self) -> serde_json::Value {
291        serde_json::to_value(self.get_snapshot()).unwrap_or_else(|e| {
292            warn!("Failed to serialize metrics: {}", e);
293            serde_json::json!({})
294        })
295    }
296
297    /// Get Prometheus-formatted metrics (backward compatibility)
298    pub fn get_prometheus(&self) -> String {
299        self.get_snapshot().to_prometheus_string()
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn udp_packet_totals_are_tracked() {
309        let m = Metrics::new();
310        m.add_incoming_udp_packet(100);
311        m.add_incoming_udp_packet(250);
312        let snapshot = m.get_snapshot();
313        assert_eq!(snapshot.udp.incoming_packets, 2);
314        assert_eq!(snapshot.udp.incoming_bytes, 350);
315    }
316
317    #[test]
318    fn protocol_counters_and_prometheus_include_counts() {
319        let m = Metrics::new();
320        m.add_incoming_proto("ping");
321        m.add_incoming_proto("ping");
322        m.add_incoming_proto("peers");
323
324        let snapshot = m.get_snapshot();
325        assert_eq!(snapshot.incoming_protos.get("ping"), Some(&2));
326        assert_eq!(snapshot.incoming_protos.get("peers"), Some(&1));
327
328        let prom = snapshot.to_prometheus_string();
329        assert!(prom.contains("amadeus_incoming_protos_total{type=\"ping\"} 2"));
330        assert!(prom.contains("amadeus_incoming_protos_total{type=\"peers\"} 1"));
331    }
332
333    #[derive(Debug)]
334    struct DummyErr;
335    impl crate::utils::misc::Typename for DummyErr {
336        fn typename(&self) -> &'static str {
337            "dummy"
338        }
339    }
340
341    #[test]
342    fn error_counters_by_typename_and_prometheus() {
343        let m = Metrics::new();
344        let e = DummyErr;
345        m.add_error(&e);
346        m.add_error(&e);
347
348        let snapshot = m.get_snapshot();
349        assert_eq!(snapshot.errors.get("dummy"), Some(&2));
350
351        let prom = snapshot.to_prometheus_string();
352        assert!(prom.contains("amadeus_packet_errors_total{type=\"dummy\"} 2"));
353    }
354
355    #[test]
356    fn uptime_is_nonnegative_and_present() {
357        let m = Metrics::new();
358        let snapshot = m.get_snapshot();
359        // u64 is always >= 0, just test that it's accessible
360        let _uptime = snapshot.uptime;
361    }
362
363    #[test]
364    fn prometheus_packet_totals_reflect_counters() {
365        let m = Metrics::new();
366        m.add_incoming_udp_packet(10);
367        m.add_incoming_udp_packet(20);
368        m.add_outgoing_udp_packet(15);
369        let prom = m.get_snapshot().to_prometheus_string();
370        assert!(prom.contains("amadeus_udp_packets_total{type=\"incoming\"} 2"));
371        assert!(prom.contains("amadeus_udp_bytes_total{type=\"incoming\"} 30"));
372        assert!(prom.contains("amadeus_udp_packets_total{type=\"outgoing\"} 1"));
373        assert!(prom.contains("amadeus_udp_bytes_total{type=\"outgoing\"} 15"));
374    }
375
376    #[test]
377    fn sent_packet_counters_and_prometheus_include_counts() {
378        let m = Metrics::new();
379        m.add_outgoing_proto("ping");
380        m.add_outgoing_proto("ping");
381        m.add_outgoing_proto("pong");
382
383        let snapshot = m.get_snapshot();
384        assert_eq!(snapshot.outgoing_protos.get("ping"), Some(&2));
385        assert_eq!(snapshot.outgoing_protos.get("pong"), Some(&1));
386
387        let prom = snapshot.to_prometheus_string();
388        assert!(prom.contains("amadeus_outgoing_protos_total{type=\"ping\"} 2"));
389        assert!(prom.contains("amadeus_outgoing_protos_total{type=\"pong\"} 1"));
390    }
391
392    #[test]
393    fn metrics_snapshot_serialization() {
394        let m = Metrics::new();
395        m.add_incoming_proto("test");
396        m.add_outgoing_proto("test");
397        m.add_incoming_udp_packet(100);
398
399        let snapshot = m.get_snapshot();
400
401        // Test that we can serialize and deserialize the snapshot
402        let json = serde_json::to_string(&snapshot).expect("Should serialize");
403        let deserialized: MetricsSnapshot = serde_json::from_str(&json).expect("Should deserialize");
404
405        assert_eq!(deserialized.incoming_protos.get("test"), Some(&1));
406        assert_eq!(deserialized.outgoing_protos.get("test"), Some(&1));
407        assert_eq!(deserialized.udp.incoming_packets, 1);
408        assert_eq!(deserialized.udp.incoming_bytes, 100);
409        assert_eq!(deserialized.tasks, 0);
410    }
411
412    #[test]
413    fn prometheus_generation_from_snapshot() {
414        let m = Metrics::new();
415        m.add_incoming_proto("test_proto");
416        m.add_incoming_udp_packet(50);
417
418        let snapshot = m.get_snapshot();
419        let prometheus = snapshot.to_prometheus_string();
420
421        assert!(prometheus.contains("amadeus_incoming_protos_total{type=\"test_proto\"} 1"));
422        assert!(prometheus.contains("amadeus_udp_packets_total{type=\"incoming\"} 1"));
423        assert!(prometheus.contains("amadeus_udp_bytes_total{type=\"incoming\"} 50"));
424    }
425
426    #[test]
427    fn tasks_are_tracked_correctly() {
428        let m = Metrics::new();
429
430        // Initial tasks should be 0
431        let snapshot = m.get_snapshot();
432        assert_eq!(snapshot.tasks, 0);
433
434        // Add some tasks
435        m.inc_tasks();
436        m.inc_tasks();
437        m.inc_tasks();
438
439        let snapshot = m.get_snapshot();
440        assert_eq!(snapshot.tasks, 3);
441
442        // Remove a task
443        m.dec_tasks();
444
445        let snapshot = m.get_snapshot();
446        assert_eq!(snapshot.tasks, 2);
447    }
448
449    #[test]
450    fn prometheus_includes_tasks_gauge() {
451        let m = Metrics::new();
452        m.inc_tasks();
453        m.inc_tasks();
454
455        let snapshot = m.get_snapshot();
456        let prometheus = snapshot.to_prometheus_string();
457
458        assert!(prometheus.contains("amadeus_tasks_active 2"));
459    }
460}