1use crate::utils::misc::{Typename, get_unix_secs_now};
2use scc::HashIndex as SccHashIndex;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
8use tracing::warn;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct MetricsSnapshot {
13 pub incoming_protos: HashMap<String, u64>,
14 pub outgoing_protos: HashMap<String, u64>,
15 pub errors: HashMap<String, u64>,
16 pub udp: UdpStats,
17 pub udpps: UdpStats, pub uptime: u32,
19 pub tasks: u64,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct UdpStats {
25 pub incoming_packets: u64,
26 pub incoming_bytes: u64,
27 pub outgoing_packets: u64,
28 pub outgoing_bytes: u64,
29}
30
31impl MetricsSnapshot {
32 pub fn to_json_string(&self) -> String {
34 serde_json::to_string(self).unwrap_or_else(|e| {
35 warn!("Failed to serialize metrics snapshot: {}", e);
36 "{}".into()
37 })
38 }
39
40 pub fn to_prometheus_string(&self) -> String {
42 let udp = format!(
43 r#"
44# HELP amadeus_udp_packets_total Total number of UDP packets
45# TYPE amadeus_udp_packets_total counter
46amadeus_udp_packets_total{{type="incoming"}} {}
47amadeus_udp_packets_total{{type="outgoing"}} {}
48
49# HELP amadeus_udp_bytes_total Total number of UDP bytes
50# TYPE amadeus_udp_bytes_total counter
51amadeus_udp_bytes_total{{type="incoming"}} {}
52amadeus_udp_bytes_total{{type="outgoing"}} {}
53
54# HELP amadeus_uptime_seconds Process uptime in seconds
55# TYPE amadeus_uptime_seconds gauge
56amadeus_uptime_seconds {}
57
58# HELP amadeus_tasks_active Current number of active tasks
59# TYPE amadeus_tasks_active gauge
60amadeus_tasks_active {}"#,
61 self.udp.incoming_packets,
62 self.udp.outgoing_packets,
63 self.udp.incoming_bytes,
64 self.udp.outgoing_bytes,
65 self.uptime,
66 self.tasks
67 );
68
69 let udpps = format!(
70 r#"
71
72# HELP amadeus_udp_packets_per_second Total number of UDP packets
73# TYPE amadeus_udp_packets_per_second gauge
74amadeus_udp_packets_per_second{{type="incoming"}} {}
75amadeus_udp_packets_per_second{{type="outgoing"}} {}
76
77# HELP amadeus_udp_bytes_per_second Total number of UDP bytes
78# TYPE amadeus_udp_bytes_per_second gauge
79amadeus_udp_bytes_per_second{{type="incoming"}} {}
80amadeus_udp_bytes_per_second{{type="outgoing"}} {}"#,
81 self.udpps.incoming_packets,
82 self.udpps.outgoing_packets,
83 self.udpps.incoming_bytes,
84 self.udpps.outgoing_bytes
85 );
86
87 let mut protos = Vec::new();
88 protos.push("\n\n# HELP amadeus_incoming_protos_total Total number of proto messages handled by type".into());
89 protos.push("# TYPE amadeus_incoming_protos_total counter".into());
90 for (proto_name, count) in &self.incoming_protos {
91 protos.push(format!("amadeus_incoming_protos_total{{type=\"{}\"}} {}", proto_name, count));
92 }
93
94 let mut sent_packets = Vec::new();
95 sent_packets
96 .push("\n\n# HELP amadeus_outgoing_protos_total Total number of messages sent by protocol type".into());
97 sent_packets.push("# TYPE amadeus_outgoing_protos_total counter".into());
98 for (proto_name, count) in &self.outgoing_protos {
99 sent_packets.push(format!("amadeus_outgoing_protos_total{{type=\"{}\"}} {}", proto_name, count));
100 }
101
102 let mut errors = Vec::new();
103 errors.push("\n\n# HELP amadeus_packet_errors_total Total number of packet processing errors by type".into());
104 errors.push("# TYPE amadeus_packet_errors_total counter".into());
105 for (error_type, count) in &self.errors {
106 errors.push(format!("amadeus_packet_errors_total{{type=\"{}\"}} {}", error_type, count));
107 }
108
109 format!("{}{}{}{}{}", udp, udpps, protos.join("\n"), sent_packets.join("\n"), errors.join("\n"))
110 }
111}
112
113pub struct Metrics {
114 incoming_bytes: AtomicU64, incoming_packets: AtomicU64, outgoing_bytes: AtomicU64, outgoing_packets: AtomicU64, incoming_protos: SccHashIndex<String, Arc<AtomicU64>>,
122
123 errors: SccHashIndex<String, Arc<AtomicU64>>,
125
126 outgoing_protos: SccHashIndex<String, Arc<AtomicU64>>,
128
129 tasks: AtomicU64,
131
132 start_time: u32,
134}
135
136impl Metrics {
137 pub fn new() -> Self {
138 let handled_protos = SccHashIndex::new();
139 let errors = SccHashIndex::new();
140 let sent_packets = SccHashIndex::new();
141 Self {
142 incoming_bytes: AtomicU64::new(0),
143 incoming_packets: AtomicU64::new(0),
144 outgoing_bytes: AtomicU64::new(0),
145 outgoing_packets: AtomicU64::new(0),
146 incoming_protos: handled_protos,
147 errors,
148 outgoing_protos: sent_packets,
149 tasks: AtomicU64::new(0),
150 start_time: get_unix_secs_now(),
151 }
152 }
153
154 #[inline]
155 pub fn add_incoming_proto(&self, name: &str) {
156 let name = name.to_owned();
158 if let Some(counter) = self.incoming_protos.peek_with(&name, |_, v| v.clone()) {
159 counter.fetch_add(1, Ordering::Relaxed);
160 } else {
161 let _ = self.incoming_protos.insert_sync(name, Arc::new(AtomicU64::new(1)));
162 }
163 }
164
165 #[inline]
166 pub fn add_outgoing_proto(&self, name: &str) {
167 let name = name.to_owned();
169 if let Some(counter) = self.outgoing_protos.peek_with(&name, |_, v| v.clone()) {
170 counter.fetch_add(1, Ordering::Relaxed);
171 } else {
172 let _ = self.outgoing_protos.insert_sync(name, Arc::new(AtomicU64::new(1)));
173 }
174 }
175
176 pub fn add_incoming_udp_packet(&self, len: usize) {
178 self.incoming_bytes.fetch_add(len as u64, Ordering::Relaxed);
179 self.incoming_packets.fetch_add(1, Ordering::Relaxed);
180 }
181
182 pub fn add_outgoing_udp_packet(&self, len: usize) {
184 self.outgoing_bytes.fetch_add(len as u64, Ordering::Relaxed);
185 self.outgoing_packets.fetch_add(1, Ordering::Relaxed);
186 }
187
188 pub fn add_error<E: Debug + Typename>(&self, error: &E) {
190 warn!(target = "metrics", "error: {error:?}");
191 self.add_error_by_name(error.typename());
192 }
193
194 fn add_error_by_name(&self, error_type: &str) {
195 let et_owned = error_type.to_string();
197 if let Some(counter) = self.errors.peek_with(&et_owned, |_, v| v.clone()) {
198 counter.fetch_add(1, Ordering::Relaxed);
199 } else {
200 let _ = self.errors.insert_sync(et_owned, Arc::new(AtomicU64::new(1)));
201 }
202 }
203
204 pub fn inc_tasks(&self) {
206 self.tasks.fetch_add(1, Ordering::Relaxed);
207 }
208
209 pub fn dec_tasks(&self) {
211 self.tasks.fetch_sub(1, Ordering::Relaxed);
212 }
213
214 pub fn get_snapshot(&self) -> MetricsSnapshot {
216 let uptime = self.get_uptime();
217
218 let mut incoming_protos = HashMap::new();
219 let mut outgoing_protos = HashMap::new();
220 let mut errors = HashMap::new();
221
222 self.incoming_protos.iter_sync(|proto_name, counter| {
224 incoming_protos.insert(proto_name.clone(), counter.load(Ordering::Relaxed));
225 true
226 });
227
228 self.outgoing_protos.iter_sync(|proto_name, counter| {
229 outgoing_protos.insert(proto_name.clone(), counter.load(Ordering::Relaxed));
230 true
231 });
232
233 self.errors.iter_sync(|error_type, counter| {
234 errors.insert(error_type.clone(), counter.load(Ordering::Relaxed));
235 true
236 });
237
238 let (udp, udpps) = self.get_udp_stats(uptime);
239 let tasks = self.tasks.load(Ordering::Relaxed);
240 MetricsSnapshot { incoming_protos, outgoing_protos, uptime, errors, udp, udpps, tasks }
241 }
242
243 pub fn get_uptime(&self) -> u32 {
245 let now = get_unix_secs_now();
246 now.saturating_sub(self.start_time)
247 }
248
249 fn get_udp_stats(&self, uptime_seconds: u32) -> (UdpStats, UdpStats) {
250 static LAST_INCOMING_BYTES: AtomicU64 = AtomicU64::new(0);
251 static LAST_INCOMING_PACKETS: AtomicU64 = AtomicU64::new(0);
252 static LAST_OUTGOING_BYTES: AtomicU64 = AtomicU64::new(0);
253 static LAST_OUTGOING_PACKETS: AtomicU64 = AtomicU64::new(0);
254 static LAST_UPTIME_SECONDS: AtomicU32 = AtomicU32::new(0);
255
256 let incoming_packets = self.incoming_packets.load(Ordering::Relaxed);
257 let incoming_bytes = self.incoming_bytes.load(Ordering::Relaxed);
258 let outgoing_packets = self.outgoing_packets.load(Ordering::Relaxed);
259 let outgoing_bytes = self.outgoing_bytes.load(Ordering::Relaxed);
260
261 let lus = LAST_UPTIME_SECONDS.swap(uptime_seconds, Ordering::Relaxed);
262 let lip = LAST_INCOMING_PACKETS.swap(incoming_packets, Ordering::Relaxed);
263 let lib = LAST_INCOMING_BYTES.swap(incoming_bytes, Ordering::Relaxed);
264 let lop = LAST_OUTGOING_PACKETS.swap(outgoing_packets, Ordering::Relaxed);
265 let lob = LAST_OUTGOING_BYTES.swap(outgoing_bytes, Ordering::Relaxed);
266
267 let mut seconds = (uptime_seconds.saturating_sub(lus)) as u64;
270 if seconds == 0 {
271 seconds = 1;
272 }
273 let dp_in = incoming_packets.saturating_sub(lip);
274 let db_in = incoming_bytes.saturating_sub(lib);
275 let dp_out = outgoing_packets.saturating_sub(lop);
276 let db_out = outgoing_bytes.saturating_sub(lob);
277
278 let udp = UdpStats { incoming_packets, incoming_bytes, outgoing_packets, outgoing_bytes };
279 let udpps = UdpStats {
280 incoming_packets: dp_in / seconds,
281 incoming_bytes: db_in / seconds,
282 outgoing_packets: dp_out / seconds,
283 outgoing_bytes: db_out / seconds,
284 };
285
286 (udp, udpps)
287 }
288
289 pub fn get_json(&self) -> serde_json::Value {
291 serde_json::to_value(self.get_snapshot()).unwrap_or_else(|e| {
292 warn!("Failed to serialize metrics: {}", e);
293 serde_json::json!({})
294 })
295 }
296
297 pub fn get_prometheus(&self) -> String {
299 self.get_snapshot().to_prometheus_string()
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn udp_packet_totals_are_tracked() {
309 let m = Metrics::new();
310 m.add_incoming_udp_packet(100);
311 m.add_incoming_udp_packet(250);
312 let snapshot = m.get_snapshot();
313 assert_eq!(snapshot.udp.incoming_packets, 2);
314 assert_eq!(snapshot.udp.incoming_bytes, 350);
315 }
316
317 #[test]
318 fn protocol_counters_and_prometheus_include_counts() {
319 let m = Metrics::new();
320 m.add_incoming_proto("ping");
321 m.add_incoming_proto("ping");
322 m.add_incoming_proto("peers");
323
324 let snapshot = m.get_snapshot();
325 assert_eq!(snapshot.incoming_protos.get("ping"), Some(&2));
326 assert_eq!(snapshot.incoming_protos.get("peers"), Some(&1));
327
328 let prom = snapshot.to_prometheus_string();
329 assert!(prom.contains("amadeus_incoming_protos_total{type=\"ping\"} 2"));
330 assert!(prom.contains("amadeus_incoming_protos_total{type=\"peers\"} 1"));
331 }
332
333 #[derive(Debug)]
334 struct DummyErr;
335 impl crate::utils::misc::Typename for DummyErr {
336 fn typename(&self) -> &'static str {
337 "dummy"
338 }
339 }
340
341 #[test]
342 fn error_counters_by_typename_and_prometheus() {
343 let m = Metrics::new();
344 let e = DummyErr;
345 m.add_error(&e);
346 m.add_error(&e);
347
348 let snapshot = m.get_snapshot();
349 assert_eq!(snapshot.errors.get("dummy"), Some(&2));
350
351 let prom = snapshot.to_prometheus_string();
352 assert!(prom.contains("amadeus_packet_errors_total{type=\"dummy\"} 2"));
353 }
354
355 #[test]
356 fn uptime_is_nonnegative_and_present() {
357 let m = Metrics::new();
358 let snapshot = m.get_snapshot();
359 let _uptime = snapshot.uptime;
361 }
362
363 #[test]
364 fn prometheus_packet_totals_reflect_counters() {
365 let m = Metrics::new();
366 m.add_incoming_udp_packet(10);
367 m.add_incoming_udp_packet(20);
368 m.add_outgoing_udp_packet(15);
369 let prom = m.get_snapshot().to_prometheus_string();
370 assert!(prom.contains("amadeus_udp_packets_total{type=\"incoming\"} 2"));
371 assert!(prom.contains("amadeus_udp_bytes_total{type=\"incoming\"} 30"));
372 assert!(prom.contains("amadeus_udp_packets_total{type=\"outgoing\"} 1"));
373 assert!(prom.contains("amadeus_udp_bytes_total{type=\"outgoing\"} 15"));
374 }
375
376 #[test]
377 fn sent_packet_counters_and_prometheus_include_counts() {
378 let m = Metrics::new();
379 m.add_outgoing_proto("ping");
380 m.add_outgoing_proto("ping");
381 m.add_outgoing_proto("pong");
382
383 let snapshot = m.get_snapshot();
384 assert_eq!(snapshot.outgoing_protos.get("ping"), Some(&2));
385 assert_eq!(snapshot.outgoing_protos.get("pong"), Some(&1));
386
387 let prom = snapshot.to_prometheus_string();
388 assert!(prom.contains("amadeus_outgoing_protos_total{type=\"ping\"} 2"));
389 assert!(prom.contains("amadeus_outgoing_protos_total{type=\"pong\"} 1"));
390 }
391
392 #[test]
393 fn metrics_snapshot_serialization() {
394 let m = Metrics::new();
395 m.add_incoming_proto("test");
396 m.add_outgoing_proto("test");
397 m.add_incoming_udp_packet(100);
398
399 let snapshot = m.get_snapshot();
400
401 let json = serde_json::to_string(&snapshot).expect("Should serialize");
403 let deserialized: MetricsSnapshot = serde_json::from_str(&json).expect("Should deserialize");
404
405 assert_eq!(deserialized.incoming_protos.get("test"), Some(&1));
406 assert_eq!(deserialized.outgoing_protos.get("test"), Some(&1));
407 assert_eq!(deserialized.udp.incoming_packets, 1);
408 assert_eq!(deserialized.udp.incoming_bytes, 100);
409 assert_eq!(deserialized.tasks, 0);
410 }
411
412 #[test]
413 fn prometheus_generation_from_snapshot() {
414 let m = Metrics::new();
415 m.add_incoming_proto("test_proto");
416 m.add_incoming_udp_packet(50);
417
418 let snapshot = m.get_snapshot();
419 let prometheus = snapshot.to_prometheus_string();
420
421 assert!(prometheus.contains("amadeus_incoming_protos_total{type=\"test_proto\"} 1"));
422 assert!(prometheus.contains("amadeus_udp_packets_total{type=\"incoming\"} 1"));
423 assert!(prometheus.contains("amadeus_udp_bytes_total{type=\"incoming\"} 50"));
424 }
425
426 #[test]
427 fn tasks_are_tracked_correctly() {
428 let m = Metrics::new();
429
430 let snapshot = m.get_snapshot();
432 assert_eq!(snapshot.tasks, 0);
433
434 m.inc_tasks();
436 m.inc_tasks();
437 m.inc_tasks();
438
439 let snapshot = m.get_snapshot();
440 assert_eq!(snapshot.tasks, 3);
441
442 m.dec_tasks();
444
445 let snapshot = m.get_snapshot();
446 assert_eq!(snapshot.tasks, 2);
447 }
448
449 #[test]
450 fn prometheus_includes_tasks_gauge() {
451 let m = Metrics::new();
452 m.inc_tasks();
453 m.inc_tasks();
454
455 let snapshot = m.get_snapshot();
456 let prometheus = snapshot.to_prometheus_string();
457
458 assert!(prometheus.contains("amadeus_tasks_active 2"));
459 }
460}