ant_quic/relay/
statistics.rs

1//! Comprehensive relay statistics collection and aggregation.
2
3use super::{
4    RelayStatistics, SessionStatistics, ConnectionStatistics, 
5    AuthenticationStatistics, RateLimitingStatistics, ErrorStatistics,
6    SessionManager, RelayConnection,
7};
8use crate::endpoint::RelayStats;
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13/// Comprehensive relay statistics collector that aggregates stats from all relay components
14#[derive(Debug, Clone)]
15pub struct RelayStatisticsCollector {
16    /// Basic relay queue statistics
17    queue_stats: Arc<Mutex<RelayStats>>,
18    
19    /// Session managers being tracked
20    session_managers: Arc<Mutex<Vec<Arc<SessionManager>>>>,
21    
22    /// Connection tracking
23    connections: Arc<Mutex<HashMap<u32, Arc<RelayConnection>>>>,
24    
25    /// Error tracking
26    error_counts: Arc<Mutex<HashMap<String, u64>>>,
27    
28    /// Authentication tracking
29    auth_stats: Arc<Mutex<AuthenticationStatistics>>,
30    
31    /// Rate limiting tracking
32    rate_limit_stats: Arc<Mutex<RateLimitingStatistics>>,
33    
34    /// Collection start time for rate calculations
35    start_time: Instant,
36    
37    /// Last statistics snapshot
38    last_snapshot: Arc<Mutex<RelayStatistics>>,
39}
40
41impl RelayStatisticsCollector {
42    /// Create a new statistics collector
43    pub fn new() -> Self {
44        Self {
45            queue_stats: Arc::new(Mutex::new(RelayStats::default())),
46            session_managers: Arc::new(Mutex::new(Vec::new())),
47            connections: Arc::new(Mutex::new(HashMap::new())),
48            error_counts: Arc::new(Mutex::new(HashMap::new())),
49            auth_stats: Arc::new(Mutex::new(AuthenticationStatistics::default())),
50            rate_limit_stats: Arc::new(Mutex::new(RateLimitingStatistics::default())),
51            start_time: Instant::now(),
52            last_snapshot: Arc::new(Mutex::new(RelayStatistics::default())),
53        }
54    }
55    
56    /// Register a session manager for statistics collection
57    pub fn register_session_manager(&self, session_manager: Arc<SessionManager>) {
58        let mut managers = self.session_managers.lock().unwrap();
59        managers.push(session_manager);
60    }
61    
62    /// Register a relay connection for statistics collection  
63    pub fn register_connection(&self, session_id: u32, connection: Arc<RelayConnection>) {
64        let mut connections = self.connections.lock().unwrap();
65        connections.insert(session_id, connection);
66    }
67    
68    /// Unregister a relay connection
69    pub fn unregister_connection(&self, session_id: u32) {
70        let mut connections = self.connections.lock().unwrap();
71        connections.remove(&session_id);
72    }
73    
74    /// Update queue statistics (called from endpoint)
75    pub fn update_queue_stats(&self, stats: &RelayStats) {
76        let mut queue_stats = self.queue_stats.lock().unwrap();
77        *queue_stats = stats.clone();
78    }
79    
80    /// Record an authentication attempt
81    pub fn record_auth_attempt(&self, success: bool, error: Option<&str>) {
82        let mut auth_stats = self.auth_stats.lock().unwrap();
83        auth_stats.total_auth_attempts += 1;
84        
85        if success {
86            auth_stats.successful_auths += 1;
87        } else {
88            auth_stats.failed_auths += 1;
89            
90            if let Some(error_msg) = error {
91                if error_msg.contains("replay") {
92                    auth_stats.replay_attacks_blocked += 1;
93                } else if error_msg.contains("signature") {
94                    auth_stats.invalid_signatures += 1;
95                } else if error_msg.contains("unknown") || error_msg.contains("trusted") {
96                    auth_stats.unknown_peer_keys += 1;
97                }
98            }
99        }
100        
101        // Update auth rate (auth attempts per second)
102        let elapsed = self.start_time.elapsed().as_secs_f64();
103        if elapsed > 0.0 {
104            auth_stats.auth_rate = auth_stats.total_auth_attempts as f64 / elapsed;
105        }
106    }
107    
108    /// Record a rate limiting decision
109    pub fn record_rate_limit(&self, allowed: bool) {
110        let mut rate_stats = self.rate_limit_stats.lock().unwrap();
111        rate_stats.total_requests += 1;
112        
113        if allowed {
114            rate_stats.requests_allowed += 1;
115        } else {
116            rate_stats.requests_blocked += 1;
117        }
118        
119        // Update efficiency percentage
120        if rate_stats.total_requests > 0 {
121            rate_stats.efficiency_percentage = 
122                (rate_stats.requests_allowed as f64 / rate_stats.total_requests as f64) * 100.0;
123        }
124    }
125    
126    /// Record an error occurrence
127    pub fn record_error(&self, error_type: &str) {
128        let mut error_counts = self.error_counts.lock().unwrap();
129        *error_counts.entry(error_type.to_string()).or_insert(0) += 1;
130    }
131    
132    /// Collect comprehensive statistics from all sources
133    pub fn collect_statistics(&self) -> RelayStatistics {
134        let session_stats = self.collect_session_statistics();
135        let connection_stats = self.collect_connection_statistics();
136        let auth_stats = self.auth_stats.lock().unwrap().clone();
137        let rate_limit_stats = self.rate_limit_stats.lock().unwrap().clone();
138        let error_stats = self.collect_error_statistics();
139        
140        let stats = RelayStatistics {
141            session_stats,
142            connection_stats,
143            auth_stats,
144            rate_limit_stats,
145            error_stats,
146        };
147        
148        // Update last snapshot
149        {
150            let mut last_snapshot = self.last_snapshot.lock().unwrap();
151            *last_snapshot = stats.clone();
152        }
153        
154        stats
155    }
156    
157    /// Get the last collected statistics snapshot
158    pub fn get_last_snapshot(&self) -> RelayStatistics {
159        self.last_snapshot.lock().unwrap().clone()
160    }
161    
162    /// Collect session statistics from all registered session managers
163    fn collect_session_statistics(&self) -> SessionStatistics {
164        let managers = self.session_managers.lock().unwrap();
165        let mut total_stats = SessionStatistics::default();
166        
167        for manager in managers.iter() {
168            let mgr_stats = manager.get_statistics();
169            
170            // Aggregate session counts
171            total_stats.active_sessions += mgr_stats.active_sessions as u32;
172            total_stats.pending_sessions += mgr_stats.pending_sessions as u32;
173            total_stats.total_bytes_forwarded += mgr_stats.total_bytes_sent + mgr_stats.total_bytes_received;
174            
175            // For derived stats, we take the maximum or average as appropriate
176            if mgr_stats.total_sessions > 0 {
177                total_stats.total_sessions_created += mgr_stats.total_sessions as u64;
178            }
179        }
180        
181        // Calculate average session duration if we have historical data
182        // This would need to be tracked over time in a real implementation
183        let elapsed = self.start_time.elapsed().as_secs_f64();
184        if total_stats.total_sessions_created > 0 && elapsed > 0.0 {
185            total_stats.avg_session_duration = elapsed / total_stats.total_sessions_created as f64;
186        }
187        
188        total_stats
189    }
190    
191    /// Collect connection statistics from all registered connections
192    fn collect_connection_statistics(&self) -> ConnectionStatistics {
193        let connections = self.connections.lock().unwrap();
194        let mut total_stats = ConnectionStatistics::default();
195        
196        total_stats.total_connections = connections.len() as u64;
197        
198        for connection in connections.values() {
199            let conn_stats = connection.get_stats();
200            
201            if conn_stats.is_active {
202                total_stats.active_connections += 1;
203            }
204            
205            total_stats.total_bytes_sent += conn_stats.bytes_sent;
206            total_stats.total_bytes_received += conn_stats.bytes_received;
207        }
208        
209        // Calculate average bandwidth usage
210        let elapsed = self.start_time.elapsed().as_secs_f64();
211        if elapsed > 0.0 {
212            let total_bytes = total_stats.total_bytes_sent + total_stats.total_bytes_received;
213            total_stats.avg_bandwidth_usage = total_bytes as f64 / elapsed;
214        }
215        
216        // Peak concurrent connections would need to be tracked over time
217        total_stats.peak_concurrent_connections = total_stats.active_connections;
218        
219        total_stats
220    }
221    
222    /// Collect error statistics
223    fn collect_error_statistics(&self) -> ErrorStatistics {
224        let error_counts = self.error_counts.lock().unwrap();
225        let queue_stats = self.queue_stats.lock().unwrap();
226        
227        let mut error_stats = ErrorStatistics::default();
228        error_stats.error_breakdown = error_counts.clone();
229        
230        // Categorize errors
231        for (error_type, count) in error_counts.iter() {
232            if error_type.contains("protocol") || error_type.contains("frame") {
233                error_stats.protocol_errors += count;
234            } else if error_type.contains("resource") || error_type.contains("exhausted") {
235                error_stats.resource_exhausted += count;
236            } else if error_type.contains("session") {
237                error_stats.session_errors += count;
238            } else if error_type.contains("auth") {
239                error_stats.auth_failures += count;
240            } else if error_type.contains("network") || error_type.contains("connection") {
241                error_stats.network_errors += count;
242            } else {
243                error_stats.internal_errors += count;
244            }
245        }
246        
247        // Add queue-related failures
248        error_stats.resource_exhausted += queue_stats.requests_dropped;
249        error_stats.protocol_errors += queue_stats.requests_failed;
250        
251        // Calculate error rate
252        let total_errors = error_stats.protocol_errors + error_stats.resource_exhausted 
253            + error_stats.session_errors + error_stats.auth_failures 
254            + error_stats.network_errors + error_stats.internal_errors;
255            
256        let elapsed = self.start_time.elapsed().as_secs_f64();
257        if elapsed > 0.0 {
258            error_stats.error_rate = total_errors as f64 / elapsed;
259        }
260        
261        error_stats
262    }
263    
264    /// Reset all statistics (useful for testing)
265    pub fn reset(&self) {
266        {
267            let mut queue_stats = self.queue_stats.lock().unwrap();
268            *queue_stats = RelayStats::default();
269        }
270        {
271            let mut error_counts = self.error_counts.lock().unwrap();
272            error_counts.clear();
273        }
274        {
275            let mut auth_stats = self.auth_stats.lock().unwrap();
276            *auth_stats = AuthenticationStatistics::default();
277        }
278        {
279            let mut rate_limit_stats = self.rate_limit_stats.lock().unwrap();
280            *rate_limit_stats = RateLimitingStatistics::default();
281        }
282    }
283}
284
285impl Default for RelayStatisticsCollector {
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    
295    #[test]
296    fn test_statistics_collector_creation() {
297        let collector = RelayStatisticsCollector::new();
298        let stats = collector.collect_statistics();
299        
300        // Should start with empty statistics
301        assert_eq!(stats.session_stats.active_sessions, 0);
302        assert_eq!(stats.connection_stats.total_connections, 0);
303        assert_eq!(stats.auth_stats.total_auth_attempts, 0);
304        assert!(stats.is_healthy());
305    }
306    
307    #[test]
308    fn test_auth_tracking() {
309        let collector = RelayStatisticsCollector::new();
310        
311        // Record some authentication attempts
312        collector.record_auth_attempt(true, None);
313        collector.record_auth_attempt(false, Some("signature verification failed"));
314        collector.record_auth_attempt(false, Some("replay attack detected"));
315        
316        let stats = collector.collect_statistics();
317        assert_eq!(stats.auth_stats.total_auth_attempts, 3);
318        assert_eq!(stats.auth_stats.successful_auths, 1);
319        assert_eq!(stats.auth_stats.failed_auths, 2);
320        assert_eq!(stats.auth_stats.invalid_signatures, 1);
321        assert_eq!(stats.auth_stats.replay_attacks_blocked, 1);
322    }
323    
324    #[test]
325    fn test_rate_limiting_tracking() {
326        let collector = RelayStatisticsCollector::new();
327        
328        // Record some rate limiting decisions
329        collector.record_rate_limit(true);
330        collector.record_rate_limit(true);
331        collector.record_rate_limit(false);
332        collector.record_rate_limit(true);
333        
334        let stats = collector.collect_statistics();
335        assert_eq!(stats.rate_limit_stats.total_requests, 4);
336        assert_eq!(stats.rate_limit_stats.requests_allowed, 3);
337        assert_eq!(stats.rate_limit_stats.requests_blocked, 1);
338        assert_eq!(stats.rate_limit_stats.efficiency_percentage, 75.0);
339    }
340    
341    #[test]
342    fn test_error_tracking() {
343        let collector = RelayStatisticsCollector::new();
344        
345        // Record various errors
346        collector.record_error("protocol_error");
347        collector.record_error("resource_exhausted");
348        collector.record_error("session_timeout");
349        collector.record_error("auth_failed");
350        
351        let stats = collector.collect_statistics();
352        assert_eq!(stats.error_stats.protocol_errors, 1);
353        assert_eq!(stats.error_stats.resource_exhausted, 1);
354        assert_eq!(stats.error_stats.session_errors, 1);
355        assert_eq!(stats.error_stats.auth_failures, 1);
356        assert_eq!(stats.error_stats.error_breakdown.len(), 4);
357    }
358    
359    #[test]
360    fn test_success_rate_calculation() {
361        let collector = RelayStatisticsCollector::new();
362        
363        // Record some successful operations
364        collector.record_auth_attempt(true, None);
365        collector.record_auth_attempt(true, None);
366        collector.record_rate_limit(true);
367        collector.record_rate_limit(true);
368        
369        // Record some failures
370        collector.record_auth_attempt(false, None);
371        collector.record_error("protocol_error");
372        
373        let stats = collector.collect_statistics();
374        
375        // Should have a good success rate but not perfect due to failures
376        let success_rate = stats.success_rate();
377        assert!(success_rate > 0.5);
378        assert!(success_rate < 1.0);
379    }
380    
381    #[test]
382    fn test_reset_functionality() {
383        let collector = RelayStatisticsCollector::new();
384        
385        // Add some data
386        collector.record_auth_attempt(true, None);
387        collector.record_error("test_error");
388        collector.record_rate_limit(false);
389        
390        // Verify data exists
391        let stats_before = collector.collect_statistics();
392        assert!(stats_before.auth_stats.total_auth_attempts > 0);
393        
394        // Reset and verify clean state
395        collector.reset();
396        let stats_after = collector.collect_statistics();
397        assert_eq!(stats_after.auth_stats.total_auth_attempts, 0);
398        assert_eq!(stats_after.rate_limit_stats.total_requests, 0);
399    }
400}