odin_protocol/
metrics.rs

1//! Performance metrics collection and monitoring
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
7use tokio::sync::RwLock;
8use serde::{Deserialize, Serialize};
9
10/// Metrics collector for ODIN Protocol performance monitoring
11#[derive(Debug)]
12pub struct MetricsCollector {
13    /// Start time of the metrics collection
14    start_time: Instant,
15    /// Messages sent counter
16    messages_sent: AtomicU64,
17    /// Messages received counter
18    messages_received: AtomicU64,
19    /// Broadcasts sent counter
20    broadcasts_sent: AtomicU64,
21    /// Errors encountered counter
22    errors: AtomicU64,
23    /// Total processing time
24    total_processing_time: AtomicU64,
25    /// Connection count
26    connections: AtomicU64,
27    /// Custom metrics
28    custom_metrics: Arc<RwLock<HashMap<String, f64>>>,
29    /// Performance samples
30    samples: Arc<RwLock<Vec<PerformanceSample>>>,
31}
32
33impl MetricsCollector {
34    /// Create a new metrics collector
35    pub fn new() -> Self {
36        Self {
37            start_time: Instant::now(),
38            messages_sent: AtomicU64::new(0),
39            messages_received: AtomicU64::new(0),
40            broadcasts_sent: AtomicU64::new(0),
41            errors: AtomicU64::new(0),
42            total_processing_time: AtomicU64::new(0),
43            connections: AtomicU64::new(0),
44            custom_metrics: Arc::new(RwLock::new(HashMap::new())),
45            samples: Arc::new(RwLock::new(Vec::new())),
46        }
47    }
48    
49    /// Record a message sent
50    pub fn record_message_sent(&self) {
51        self.messages_sent.fetch_add(1, Ordering::Relaxed);
52    }
53    
54    /// Record a message received
55    pub fn record_message_received(&self) {
56        self.messages_received.fetch_add(1, Ordering::Relaxed);
57    }
58    
59    /// Record a broadcast sent
60    pub fn record_broadcast_sent(&self) {
61        self.broadcasts_sent.fetch_add(1, Ordering::Relaxed);
62    }
63    
64    /// Record an error
65    pub fn record_error(&self) {
66        self.errors.fetch_add(1, Ordering::Relaxed);
67    }
68    
69    /// Record processing time
70    pub fn record_processing_time(&self, duration: Duration) {
71        self.total_processing_time.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
72    }
73    
74    /// Record startup
75    pub fn record_startup(&self) {
76        // Reset counters on startup
77        self.messages_sent.store(0, Ordering::Relaxed);
78        self.messages_received.store(0, Ordering::Relaxed);
79        self.broadcasts_sent.store(0, Ordering::Relaxed);
80        self.errors.store(0, Ordering::Relaxed);
81        self.total_processing_time.store(0, Ordering::Relaxed);
82    }
83    
84    /// Record shutdown
85    pub fn record_shutdown(&self) {
86        // Could log final metrics here
87    }
88    
89    /// Add or update a connection
90    pub fn add_connection(&self) {
91        self.connections.fetch_add(1, Ordering::Relaxed);
92    }
93    
94    /// Remove a connection
95    pub fn remove_connection(&self) {
96        self.connections.fetch_sub(1, Ordering::Relaxed);
97    }
98    
99    /// Set a custom metric
100    pub async fn set_custom_metric(&self, name: String, value: f64) {
101        let mut metrics = self.custom_metrics.write().await;
102        metrics.insert(name, value);
103    }
104    
105    /// Get all metrics as a map
106    pub fn get_metrics(&self) -> HashMap<String, f64> {
107        let uptime = self.start_time.elapsed().as_secs_f64();
108        let messages_sent = self.messages_sent.load(Ordering::Relaxed) as f64;
109        let messages_received = self.messages_received.load(Ordering::Relaxed) as f64;
110        let broadcasts_sent = self.broadcasts_sent.load(Ordering::Relaxed) as f64;
111        let errors = self.errors.load(Ordering::Relaxed) as f64;
112        let total_processing_time = self.total_processing_time.load(Ordering::Relaxed) as f64 / 1_000_000_000.0; // Convert to seconds
113        let connections = self.connections.load(Ordering::Relaxed) as f64;
114        
115        let mut metrics = HashMap::new();
116        metrics.insert("uptime_seconds".to_string(), uptime);
117        metrics.insert("messages_sent".to_string(), messages_sent);
118        metrics.insert("messages_received".to_string(), messages_received);
119        metrics.insert("broadcasts_sent".to_string(), broadcasts_sent);
120        metrics.insert("errors".to_string(), errors);
121        metrics.insert("total_processing_time_seconds".to_string(), total_processing_time);
122        metrics.insert("connections".to_string(), connections);
123        
124        // Calculate derived metrics
125        if uptime > 0.0 {
126            metrics.insert("messages_per_second".to_string(), (messages_sent + messages_received) / uptime);
127            metrics.insert("error_rate".to_string(), errors / (messages_sent + messages_received + 1.0));
128        }
129        
130        if messages_sent + messages_received > 0.0 {
131            metrics.insert("avg_processing_time_ms".to_string(), 
132                (total_processing_time * 1000.0) / (messages_sent + messages_received));
133        }
134        
135        metrics
136    }
137    
138    /// Get uptime in seconds
139    pub fn get_uptime(&self) -> f64 {
140        self.start_time.elapsed().as_secs_f64()
141    }
142    
143    /// Get messages sent count
144    pub fn get_messages_sent(&self) -> u64 {
145        self.messages_sent.load(Ordering::Relaxed)
146    }
147    
148    /// Get messages received count
149    pub fn get_messages_received(&self) -> u64 {
150        self.messages_received.load(Ordering::Relaxed)
151    }
152    
153    /// Get error count
154    pub fn get_errors(&self) -> u64 {
155        self.errors.load(Ordering::Relaxed)
156    }
157    
158    /// Get connection count
159    pub fn get_connections(&self) -> u64 {
160        self.connections.load(Ordering::Relaxed)
161    }
162    
163    /// Record a performance sample
164    pub async fn record_sample(&self, operation: String, duration: Duration, success: bool) {
165        let sample = PerformanceSample {
166            timestamp: SystemTime::now()
167                .duration_since(UNIX_EPOCH)
168                .unwrap_or_default()
169                .as_secs(),
170            operation,
171            duration_ms: duration.as_millis() as f64,
172            success,
173        };
174        
175        let mut samples = self.samples.write().await;
176        samples.push(sample);
177        
178        // Keep only the last 1000 samples to prevent memory growth
179        if samples.len() > 1000 {
180            let drain_count = samples.len() - 1000;
181            samples.drain(0..drain_count);
182        }
183    }
184    
185    /// Get performance statistics
186    pub async fn get_performance_stats(&self) -> PerformanceStats {
187        let samples = self.samples.read().await;
188        let metrics = self.get_metrics();
189        
190        if samples.is_empty() {
191            return PerformanceStats::default();
192        }
193        
194        let successful_samples: Vec<_> = samples.iter().filter(|s| s.success).collect();
195        let failed_samples: Vec<_> = samples.iter().filter(|s| !s.success).collect();
196        
197        let total_samples = samples.len() as f64;
198        let success_rate = successful_samples.len() as f64 / total_samples;
199        
200        let durations: Vec<f64> = successful_samples.iter().map(|s| s.duration_ms).collect();
201        let avg_duration = if !durations.is_empty() {
202            durations.iter().sum::<f64>() / durations.len() as f64
203        } else {
204            0.0
205        };
206        
207        let mut sorted_durations = durations.clone();
208        sorted_durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
209        
210        let p95_duration = if !sorted_durations.is_empty() {
211            let index = (sorted_durations.len() as f64 * 0.95) as usize;
212            sorted_durations.get(index.min(sorted_durations.len() - 1)).copied().unwrap_or(0.0)
213        } else {
214            0.0
215        };
216        
217        PerformanceStats {
218            total_samples: total_samples as u64,
219            successful_samples: successful_samples.len() as u64,
220            failed_samples: failed_samples.len() as u64,
221            success_rate,
222            avg_duration_ms: avg_duration,
223            p95_duration_ms: p95_duration,
224            messages_per_second: metrics.get("messages_per_second").copied().unwrap_or(0.0),
225            error_rate: metrics.get("error_rate").copied().unwrap_or(0.0),
226            uptime_seconds: metrics.get("uptime_seconds").copied().unwrap_or(0.0),
227        }
228    }
229    
230    /// Reset all metrics
231    pub async fn reset(&self) {
232        self.messages_sent.store(0, Ordering::Relaxed);
233        self.messages_received.store(0, Ordering::Relaxed);
234        self.broadcasts_sent.store(0, Ordering::Relaxed);
235        self.errors.store(0, Ordering::Relaxed);
236        self.total_processing_time.store(0, Ordering::Relaxed);
237        self.connections.store(0, Ordering::Relaxed);
238        
239        let mut custom_metrics = self.custom_metrics.write().await;
240        custom_metrics.clear();
241        
242        let mut samples = self.samples.write().await;
243        samples.clear();
244    }
245}
246
247impl Default for MetricsCollector {
248    fn default() -> Self {
249        Self::new()
250    }
251}
252
253/// Performance sample for detailed analysis
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct PerformanceSample {
256    /// Timestamp of the sample
257    pub timestamp: u64,
258    /// Operation name
259    pub operation: String,
260    /// Duration in milliseconds
261    pub duration_ms: f64,
262    /// Whether the operation was successful
263    pub success: bool,
264}
265
266/// Aggregated performance statistics
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct PerformanceStats {
269    /// Total number of samples
270    pub total_samples: u64,
271    /// Number of successful samples
272    pub successful_samples: u64,
273    /// Number of failed samples
274    pub failed_samples: u64,
275    /// Success rate (0.0 to 1.0)
276    pub success_rate: f64,
277    /// Average duration in milliseconds
278    pub avg_duration_ms: f64,
279    /// 95th percentile duration in milliseconds
280    pub p95_duration_ms: f64,
281    /// Messages per second throughput
282    pub messages_per_second: f64,
283    /// Error rate (0.0 to 1.0)
284    pub error_rate: f64,
285    /// Uptime in seconds
286    pub uptime_seconds: f64,
287}
288
289impl Default for PerformanceStats {
290    fn default() -> Self {
291        Self {
292            total_samples: 0,
293            successful_samples: 0,
294            failed_samples: 0,
295            success_rate: 0.0,
296            avg_duration_ms: 0.0,
297            p95_duration_ms: 0.0,
298            messages_per_second: 0.0,
299            error_rate: 0.0,
300            uptime_seconds: 0.0,
301        }
302    }
303}
304
305/// Metrics exporter for different formats
306pub struct MetricsExporter;
307
308impl MetricsExporter {
309    /// Export metrics as JSON
310    pub fn to_json(stats: &PerformanceStats) -> serde_json::Result<String> {
311        serde_json::to_string_pretty(stats)
312    }
313    
314    /// Export metrics as Prometheus format
315    pub fn to_prometheus(metrics: &HashMap<String, f64>) -> String {
316        let mut output = String::new();
317        
318        for (name, value) in metrics {
319            let sanitized_name = name.replace(".", "_").replace(" ", "_");
320            output.push_str(&format!(
321                "# TYPE odin_{} gauge\nodin_{} {}\n",
322                sanitized_name, sanitized_name, value
323            ));
324        }
325        
326        output
327    }
328    
329    /// Export metrics as CSV
330    pub fn to_csv(samples: &[PerformanceSample]) -> String {
331        let mut output = String::from("timestamp,operation,duration_ms,success\n");
332        
333        for sample in samples {
334            output.push_str(&format!(
335                "{},{},{},{}\n",
336                sample.timestamp, sample.operation, sample.duration_ms, sample.success
337            ));
338        }
339        
340        output
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use tokio::time::{sleep, Duration as TokioDuration};
348    
349    #[tokio::test]
350    async fn test_metrics_collection() {
351        let collector = MetricsCollector::new();
352        
353        collector.record_message_sent();
354        collector.record_message_received();
355        collector.record_error();
356        
357        let metrics = collector.get_metrics();
358        assert_eq!(metrics.get("messages_sent"), Some(&1.0));
359        assert_eq!(metrics.get("messages_received"), Some(&1.0));
360        assert_eq!(metrics.get("errors"), Some(&1.0));
361    }
362    
363    #[tokio::test]
364    async fn test_custom_metrics() {
365        let collector = MetricsCollector::new();
366        
367        collector.set_custom_metric("custom_metric".to_string(), 42.0).await;
368        
369        // Custom metrics are not included in get_metrics() by default
370        // This is intentional to keep the interface clean
371    }
372    
373    #[tokio::test]
374    async fn test_performance_samples() {
375        let collector = MetricsCollector::new();
376        
377        collector.record_sample(
378            "test_operation".to_string(),
379            Duration::from_millis(100),
380            true,
381        ).await;
382        
383        collector.record_sample(
384            "test_operation".to_string(),
385            Duration::from_millis(200),
386            false,
387        ).await;
388        
389        let stats = collector.get_performance_stats().await;
390        assert_eq!(stats.total_samples, 2);
391        assert_eq!(stats.successful_samples, 1);
392        assert_eq!(stats.failed_samples, 1);
393        assert_eq!(stats.success_rate, 0.5);
394    }
395    
396    #[tokio::test]
397    async fn test_metrics_reset() {
398        let collector = MetricsCollector::new();
399        
400        collector.record_message_sent();
401        collector.record_message_received();
402        
403        let metrics_before = collector.get_metrics();
404        assert_eq!(metrics_before.get("messages_sent"), Some(&1.0));
405        
406        collector.reset().await;
407        
408        let metrics_after = collector.get_metrics();
409        assert_eq!(metrics_after.get("messages_sent"), Some(&0.0));
410    }
411    
412    #[test]
413    fn test_metrics_exporter() {
414        let mut metrics = HashMap::new();
415        metrics.insert("messages_sent".to_string(), 100.0);
416        metrics.insert("uptime_seconds".to_string(), 3600.0);
417        
418        let prometheus = MetricsExporter::to_prometheus(&metrics);
419        assert!(prometheus.contains("odin_messages_sent"));
420        assert!(prometheus.contains("odin_uptime_seconds"));
421        
422        let samples = vec![
423            PerformanceSample {
424                timestamp: 1234567890,
425                operation: "test".to_string(),
426                duration_ms: 100.0,
427                success: true,
428            }
429        ];
430        
431        let csv = MetricsExporter::to_csv(&samples);
432        assert!(csv.contains("timestamp,operation,duration_ms,success"));
433        assert!(csv.contains("1234567890,test,100,true"));
434    }
435    
436    #[tokio::test]
437    async fn test_derived_metrics() {
438        let collector = MetricsCollector::new();
439        
440        // Record some activity
441        for _ in 0..10 {
442            collector.record_message_sent();
443        }
444        
445        for _ in 0..5 {
446            collector.record_message_received();
447        }
448        
449        collector.record_error();
450        
451        // Wait a small amount of time for uptime calculation
452        sleep(TokioDuration::from_millis(10)).await;
453        
454        let metrics = collector.get_metrics();
455        
456        // Check that derived metrics are calculated
457        assert!(metrics.contains_key("messages_per_second"));
458        assert!(metrics.contains_key("error_rate"));
459        assert!(metrics.get("uptime_seconds").unwrap() > &0.0);
460        
461        let error_rate = metrics.get("error_rate").unwrap();
462        assert!(*error_rate > 0.0 && *error_rate < 1.0); // Should be 1/16 ≈ 0.0625
463    }
464}