1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
7use tokio::sync::RwLock;
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug)]
12pub struct MetricsCollector {
13 start_time: Instant,
15 messages_sent: AtomicU64,
17 messages_received: AtomicU64,
19 broadcasts_sent: AtomicU64,
21 errors: AtomicU64,
23 total_processing_time: AtomicU64,
25 connections: AtomicU64,
27 custom_metrics: Arc<RwLock<HashMap<String, f64>>>,
29 samples: Arc<RwLock<Vec<PerformanceSample>>>,
31}
32
33impl MetricsCollector {
34 pub fn new() -> Self {
36 Self {
37 start_time: Instant::now(),
38 messages_sent: AtomicU64::new(0),
39 messages_received: AtomicU64::new(0),
40 broadcasts_sent: AtomicU64::new(0),
41 errors: AtomicU64::new(0),
42 total_processing_time: AtomicU64::new(0),
43 connections: AtomicU64::new(0),
44 custom_metrics: Arc::new(RwLock::new(HashMap::new())),
45 samples: Arc::new(RwLock::new(Vec::new())),
46 }
47 }
48
49 pub fn record_message_sent(&self) {
51 self.messages_sent.fetch_add(1, Ordering::Relaxed);
52 }
53
54 pub fn record_message_received(&self) {
56 self.messages_received.fetch_add(1, Ordering::Relaxed);
57 }
58
59 pub fn record_broadcast_sent(&self) {
61 self.broadcasts_sent.fetch_add(1, Ordering::Relaxed);
62 }
63
64 pub fn record_error(&self) {
66 self.errors.fetch_add(1, Ordering::Relaxed);
67 }
68
69 pub fn record_processing_time(&self, duration: Duration) {
71 self.total_processing_time.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
72 }
73
74 pub fn record_startup(&self) {
76 self.messages_sent.store(0, Ordering::Relaxed);
78 self.messages_received.store(0, Ordering::Relaxed);
79 self.broadcasts_sent.store(0, Ordering::Relaxed);
80 self.errors.store(0, Ordering::Relaxed);
81 self.total_processing_time.store(0, Ordering::Relaxed);
82 }
83
84 pub fn record_shutdown(&self) {
86 }
88
89 pub fn add_connection(&self) {
91 self.connections.fetch_add(1, Ordering::Relaxed);
92 }
93
94 pub fn remove_connection(&self) {
96 self.connections.fetch_sub(1, Ordering::Relaxed);
97 }
98
99 pub async fn set_custom_metric(&self, name: String, value: f64) {
101 let mut metrics = self.custom_metrics.write().await;
102 metrics.insert(name, value);
103 }
104
105 pub fn get_metrics(&self) -> HashMap<String, f64> {
107 let uptime = self.start_time.elapsed().as_secs_f64();
108 let messages_sent = self.messages_sent.load(Ordering::Relaxed) as f64;
109 let messages_received = self.messages_received.load(Ordering::Relaxed) as f64;
110 let broadcasts_sent = self.broadcasts_sent.load(Ordering::Relaxed) as f64;
111 let errors = self.errors.load(Ordering::Relaxed) as f64;
112 let total_processing_time = self.total_processing_time.load(Ordering::Relaxed) as f64 / 1_000_000_000.0; let connections = self.connections.load(Ordering::Relaxed) as f64;
114
115 let mut metrics = HashMap::new();
116 metrics.insert("uptime_seconds".to_string(), uptime);
117 metrics.insert("messages_sent".to_string(), messages_sent);
118 metrics.insert("messages_received".to_string(), messages_received);
119 metrics.insert("broadcasts_sent".to_string(), broadcasts_sent);
120 metrics.insert("errors".to_string(), errors);
121 metrics.insert("total_processing_time_seconds".to_string(), total_processing_time);
122 metrics.insert("connections".to_string(), connections);
123
124 if uptime > 0.0 {
126 metrics.insert("messages_per_second".to_string(), (messages_sent + messages_received) / uptime);
127 metrics.insert("error_rate".to_string(), errors / (messages_sent + messages_received + 1.0));
128 }
129
130 if messages_sent + messages_received > 0.0 {
131 metrics.insert("avg_processing_time_ms".to_string(),
132 (total_processing_time * 1000.0) / (messages_sent + messages_received));
133 }
134
135 metrics
136 }
137
138 pub fn get_uptime(&self) -> f64 {
140 self.start_time.elapsed().as_secs_f64()
141 }
142
143 pub fn get_messages_sent(&self) -> u64 {
145 self.messages_sent.load(Ordering::Relaxed)
146 }
147
148 pub fn get_messages_received(&self) -> u64 {
150 self.messages_received.load(Ordering::Relaxed)
151 }
152
153 pub fn get_errors(&self) -> u64 {
155 self.errors.load(Ordering::Relaxed)
156 }
157
158 pub fn get_connections(&self) -> u64 {
160 self.connections.load(Ordering::Relaxed)
161 }
162
163 pub async fn record_sample(&self, operation: String, duration: Duration, success: bool) {
165 let sample = PerformanceSample {
166 timestamp: SystemTime::now()
167 .duration_since(UNIX_EPOCH)
168 .unwrap_or_default()
169 .as_secs(),
170 operation,
171 duration_ms: duration.as_millis() as f64,
172 success,
173 };
174
175 let mut samples = self.samples.write().await;
176 samples.push(sample);
177
178 if samples.len() > 1000 {
180 let drain_count = samples.len() - 1000;
181 samples.drain(0..drain_count);
182 }
183 }
184
185 pub async fn get_performance_stats(&self) -> PerformanceStats {
187 let samples = self.samples.read().await;
188 let metrics = self.get_metrics();
189
190 if samples.is_empty() {
191 return PerformanceStats::default();
192 }
193
194 let successful_samples: Vec<_> = samples.iter().filter(|s| s.success).collect();
195 let failed_samples: Vec<_> = samples.iter().filter(|s| !s.success).collect();
196
197 let total_samples = samples.len() as f64;
198 let success_rate = successful_samples.len() as f64 / total_samples;
199
200 let durations: Vec<f64> = successful_samples.iter().map(|s| s.duration_ms).collect();
201 let avg_duration = if !durations.is_empty() {
202 durations.iter().sum::<f64>() / durations.len() as f64
203 } else {
204 0.0
205 };
206
207 let mut sorted_durations = durations.clone();
208 sorted_durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
209
210 let p95_duration = if !sorted_durations.is_empty() {
211 let index = (sorted_durations.len() as f64 * 0.95) as usize;
212 sorted_durations.get(index.min(sorted_durations.len() - 1)).copied().unwrap_or(0.0)
213 } else {
214 0.0
215 };
216
217 PerformanceStats {
218 total_samples: total_samples as u64,
219 successful_samples: successful_samples.len() as u64,
220 failed_samples: failed_samples.len() as u64,
221 success_rate,
222 avg_duration_ms: avg_duration,
223 p95_duration_ms: p95_duration,
224 messages_per_second: metrics.get("messages_per_second").copied().unwrap_or(0.0),
225 error_rate: metrics.get("error_rate").copied().unwrap_or(0.0),
226 uptime_seconds: metrics.get("uptime_seconds").copied().unwrap_or(0.0),
227 }
228 }
229
230 pub async fn reset(&self) {
232 self.messages_sent.store(0, Ordering::Relaxed);
233 self.messages_received.store(0, Ordering::Relaxed);
234 self.broadcasts_sent.store(0, Ordering::Relaxed);
235 self.errors.store(0, Ordering::Relaxed);
236 self.total_processing_time.store(0, Ordering::Relaxed);
237 self.connections.store(0, Ordering::Relaxed);
238
239 let mut custom_metrics = self.custom_metrics.write().await;
240 custom_metrics.clear();
241
242 let mut samples = self.samples.write().await;
243 samples.clear();
244 }
245}
246
247impl Default for MetricsCollector {
248 fn default() -> Self {
249 Self::new()
250 }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct PerformanceSample {
256 pub timestamp: u64,
258 pub operation: String,
260 pub duration_ms: f64,
262 pub success: bool,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct PerformanceStats {
269 pub total_samples: u64,
271 pub successful_samples: u64,
273 pub failed_samples: u64,
275 pub success_rate: f64,
277 pub avg_duration_ms: f64,
279 pub p95_duration_ms: f64,
281 pub messages_per_second: f64,
283 pub error_rate: f64,
285 pub uptime_seconds: f64,
287}
288
289impl Default for PerformanceStats {
290 fn default() -> Self {
291 Self {
292 total_samples: 0,
293 successful_samples: 0,
294 failed_samples: 0,
295 success_rate: 0.0,
296 avg_duration_ms: 0.0,
297 p95_duration_ms: 0.0,
298 messages_per_second: 0.0,
299 error_rate: 0.0,
300 uptime_seconds: 0.0,
301 }
302 }
303}
304
305pub struct MetricsExporter;
307
308impl MetricsExporter {
309 pub fn to_json(stats: &PerformanceStats) -> serde_json::Result<String> {
311 serde_json::to_string_pretty(stats)
312 }
313
314 pub fn to_prometheus(metrics: &HashMap<String, f64>) -> String {
316 let mut output = String::new();
317
318 for (name, value) in metrics {
319 let sanitized_name = name.replace(".", "_").replace(" ", "_");
320 output.push_str(&format!(
321 "# TYPE odin_{} gauge\nodin_{} {}\n",
322 sanitized_name, sanitized_name, value
323 ));
324 }
325
326 output
327 }
328
329 pub fn to_csv(samples: &[PerformanceSample]) -> String {
331 let mut output = String::from("timestamp,operation,duration_ms,success\n");
332
333 for sample in samples {
334 output.push_str(&format!(
335 "{},{},{},{}\n",
336 sample.timestamp, sample.operation, sample.duration_ms, sample.success
337 ));
338 }
339
340 output
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use tokio::time::{sleep, Duration as TokioDuration};
348
349 #[tokio::test]
350 async fn test_metrics_collection() {
351 let collector = MetricsCollector::new();
352
353 collector.record_message_sent();
354 collector.record_message_received();
355 collector.record_error();
356
357 let metrics = collector.get_metrics();
358 assert_eq!(metrics.get("messages_sent"), Some(&1.0));
359 assert_eq!(metrics.get("messages_received"), Some(&1.0));
360 assert_eq!(metrics.get("errors"), Some(&1.0));
361 }
362
363 #[tokio::test]
364 async fn test_custom_metrics() {
365 let collector = MetricsCollector::new();
366
367 collector.set_custom_metric("custom_metric".to_string(), 42.0).await;
368
369 }
372
373 #[tokio::test]
374 async fn test_performance_samples() {
375 let collector = MetricsCollector::new();
376
377 collector.record_sample(
378 "test_operation".to_string(),
379 Duration::from_millis(100),
380 true,
381 ).await;
382
383 collector.record_sample(
384 "test_operation".to_string(),
385 Duration::from_millis(200),
386 false,
387 ).await;
388
389 let stats = collector.get_performance_stats().await;
390 assert_eq!(stats.total_samples, 2);
391 assert_eq!(stats.successful_samples, 1);
392 assert_eq!(stats.failed_samples, 1);
393 assert_eq!(stats.success_rate, 0.5);
394 }
395
396 #[tokio::test]
397 async fn test_metrics_reset() {
398 let collector = MetricsCollector::new();
399
400 collector.record_message_sent();
401 collector.record_message_received();
402
403 let metrics_before = collector.get_metrics();
404 assert_eq!(metrics_before.get("messages_sent"), Some(&1.0));
405
406 collector.reset().await;
407
408 let metrics_after = collector.get_metrics();
409 assert_eq!(metrics_after.get("messages_sent"), Some(&0.0));
410 }
411
412 #[test]
413 fn test_metrics_exporter() {
414 let mut metrics = HashMap::new();
415 metrics.insert("messages_sent".to_string(), 100.0);
416 metrics.insert("uptime_seconds".to_string(), 3600.0);
417
418 let prometheus = MetricsExporter::to_prometheus(&metrics);
419 assert!(prometheus.contains("odin_messages_sent"));
420 assert!(prometheus.contains("odin_uptime_seconds"));
421
422 let samples = vec![
423 PerformanceSample {
424 timestamp: 1234567890,
425 operation: "test".to_string(),
426 duration_ms: 100.0,
427 success: true,
428 }
429 ];
430
431 let csv = MetricsExporter::to_csv(&samples);
432 assert!(csv.contains("timestamp,operation,duration_ms,success"));
433 assert!(csv.contains("1234567890,test,100,true"));
434 }
435
436 #[tokio::test]
437 async fn test_derived_metrics() {
438 let collector = MetricsCollector::new();
439
440 for _ in 0..10 {
442 collector.record_message_sent();
443 }
444
445 for _ in 0..5 {
446 collector.record_message_received();
447 }
448
449 collector.record_error();
450
451 sleep(TokioDuration::from_millis(10)).await;
453
454 let metrics = collector.get_metrics();
455
456 assert!(metrics.contains_key("messages_per_second"));
458 assert!(metrics.contains_key("error_rate"));
459 assert!(metrics.get("uptime_seconds").unwrap() > &0.0);
460
461 let error_rate = metrics.get("error_rate").unwrap();
462 assert!(*error_rate > 0.0 && *error_rate < 1.0); }
464}