ant_quic/relay/
statistics.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! Comprehensive relay statistics collection and aggregation.
9//!
10//! This module provides statistics collection for the MASQUE relay infrastructure.
11//! It tracks authentication, rate limiting, errors, and relay queue statistics.
12
13use super::{
14    AuthenticationStatistics, ConnectionStatistics, ErrorStatistics, RateLimitingStatistics,
15    RelayStatistics, SessionStatistics,
16};
17use crate::endpoint::RelayStats;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Instant;
22
23/// Comprehensive relay statistics collector that aggregates stats from all relay components
24#[derive(Debug)]
25pub struct RelayStatisticsCollector {
26    /// Basic relay queue statistics
27    queue_stats: Arc<Mutex<RelayStats>>,
28
29    /// Error tracking
30    error_counts: Arc<Mutex<HashMap<String, u64>>>,
31
32    /// Authentication tracking
33    auth_stats: Arc<Mutex<AuthenticationStatistics>>,
34
35    /// Rate limiting tracking
36    rate_limit_stats: Arc<Mutex<RateLimitingStatistics>>,
37
38    /// Collection start time for rate calculations
39    start_time: Instant,
40
41    /// Last statistics snapshot
42    last_snapshot: Arc<Mutex<RelayStatistics>>,
43
44    /// Active sessions count (updated externally)
45    active_sessions: AtomicU32,
46
47    /// Total sessions created (updated externally)
48    total_sessions: AtomicU64,
49
50    /// Active connections count (updated externally)
51    active_connections: AtomicU32,
52
53    /// Total bytes sent (updated externally)
54    total_bytes_sent: AtomicU64,
55
56    /// Total bytes received (updated externally)
57    total_bytes_received: AtomicU64,
58}
59
60impl Clone for RelayStatisticsCollector {
61    fn clone(&self) -> Self {
62        Self {
63            queue_stats: Arc::clone(&self.queue_stats),
64            error_counts: Arc::clone(&self.error_counts),
65            auth_stats: Arc::clone(&self.auth_stats),
66            rate_limit_stats: Arc::clone(&self.rate_limit_stats),
67            start_time: self.start_time,
68            last_snapshot: Arc::clone(&self.last_snapshot),
69            active_sessions: AtomicU32::new(self.active_sessions.load(Ordering::Relaxed)),
70            total_sessions: AtomicU64::new(self.total_sessions.load(Ordering::Relaxed)),
71            active_connections: AtomicU32::new(self.active_connections.load(Ordering::Relaxed)),
72            total_bytes_sent: AtomicU64::new(self.total_bytes_sent.load(Ordering::Relaxed)),
73            total_bytes_received: AtomicU64::new(self.total_bytes_received.load(Ordering::Relaxed)),
74        }
75    }
76}
77
78impl RelayStatisticsCollector {
79    /// Create a new statistics collector
80    pub fn new() -> Self {
81        Self {
82            queue_stats: Arc::new(Mutex::new(RelayStats::default())),
83            error_counts: Arc::new(Mutex::new(HashMap::new())),
84            auth_stats: Arc::new(Mutex::new(AuthenticationStatistics::default())),
85            rate_limit_stats: Arc::new(Mutex::new(RateLimitingStatistics::default())),
86            start_time: Instant::now(),
87            last_snapshot: Arc::new(Mutex::new(RelayStatistics::default())),
88            active_sessions: AtomicU32::new(0),
89            total_sessions: AtomicU64::new(0),
90            active_connections: AtomicU32::new(0),
91            total_bytes_sent: AtomicU64::new(0),
92            total_bytes_received: AtomicU64::new(0),
93        }
94    }
95
96    /// Update session count (called by MASQUE relay server)
97    pub fn update_session_count(&self, active: u32, total: u64) {
98        self.active_sessions.store(active, Ordering::Relaxed);
99        self.total_sessions.store(total, Ordering::Relaxed);
100    }
101
102    /// Update connection count (called by MASQUE relay components)
103    pub fn update_connection_count(&self, active: u32) {
104        self.active_connections.store(active, Ordering::Relaxed);
105    }
106
107    /// Update bytes transferred (called by MASQUE relay components)
108    pub fn add_bytes_transferred(&self, sent: u64, received: u64) {
109        self.total_bytes_sent.fetch_add(sent, Ordering::Relaxed);
110        self.total_bytes_received
111            .fetch_add(received, Ordering::Relaxed);
112    }
113
114    /// Update queue statistics (called from endpoint)
115    #[allow(clippy::unwrap_used)]
116    pub fn update_queue_stats(&self, stats: &RelayStats) {
117        let mut queue_stats = self.queue_stats.lock().unwrap();
118        *queue_stats = stats.clone();
119    }
120
121    /// Record an authentication attempt
122    #[allow(clippy::unwrap_used)]
123    pub fn record_auth_attempt(&self, success: bool, error: Option<&str>) {
124        let mut auth_stats = self.auth_stats.lock().unwrap();
125        auth_stats.total_auth_attempts += 1;
126
127        if success {
128            auth_stats.successful_auths += 1;
129        } else {
130            auth_stats.failed_auths += 1;
131
132            if let Some(error_msg) = error {
133                if error_msg.contains("replay") {
134                    auth_stats.replay_attacks_blocked += 1;
135                } else if error_msg.contains("signature") {
136                    auth_stats.invalid_signatures += 1;
137                } else if error_msg.contains("unknown") || error_msg.contains("trusted") {
138                    auth_stats.unknown_peer_keys += 1;
139                }
140            }
141        }
142
143        // Update auth rate (auth attempts per second)
144        let elapsed = self.start_time.elapsed().as_secs_f64();
145        if elapsed > 0.0 {
146            auth_stats.auth_rate = auth_stats.total_auth_attempts as f64 / elapsed;
147        }
148    }
149
150    /// Record a rate limiting decision
151    #[allow(clippy::unwrap_used)]
152    pub fn record_rate_limit(&self, allowed: bool) {
153        let mut rate_stats = self.rate_limit_stats.lock().unwrap();
154        rate_stats.total_requests += 1;
155
156        if allowed {
157            rate_stats.requests_allowed += 1;
158        } else {
159            rate_stats.requests_blocked += 1;
160        }
161
162        // Update efficiency percentage
163        if rate_stats.total_requests > 0 {
164            rate_stats.efficiency_percentage =
165                (rate_stats.requests_allowed as f64 / rate_stats.total_requests as f64) * 100.0;
166        }
167    }
168
169    /// Record an error occurrence
170    #[allow(clippy::unwrap_used)]
171    pub fn record_error(&self, error_type: &str) {
172        let mut error_counts = self.error_counts.lock().unwrap();
173        *error_counts.entry(error_type.to_string()).or_insert(0) += 1;
174    }
175
176    /// Collect comprehensive statistics from all sources
177    #[allow(clippy::unwrap_used)]
178    pub fn collect_statistics(&self) -> RelayStatistics {
179        let session_stats = self.collect_session_statistics();
180        let connection_stats = self.collect_connection_statistics();
181        let auth_stats = self.auth_stats.lock().unwrap().clone();
182        let rate_limit_stats = self.rate_limit_stats.lock().unwrap().clone();
183        let error_stats = self.collect_error_statistics();
184
185        let stats = RelayStatistics {
186            session_stats,
187            connection_stats,
188            auth_stats,
189            rate_limit_stats,
190            error_stats,
191        };
192
193        // Update last snapshot
194        {
195            let mut last_snapshot = self.last_snapshot.lock().unwrap();
196            *last_snapshot = stats.clone();
197        }
198
199        stats
200    }
201
202    /// Get the last collected statistics snapshot
203    #[allow(clippy::unwrap_used)]
204    pub fn get_last_snapshot(&self) -> RelayStatistics {
205        self.last_snapshot.lock().unwrap().clone()
206    }
207
208    /// Collect session statistics from atomic counters
209    fn collect_session_statistics(&self) -> SessionStatistics {
210        let active_sessions = self.active_sessions.load(Ordering::Relaxed);
211        let total_sessions = self.total_sessions.load(Ordering::Relaxed);
212        let total_bytes_sent = self.total_bytes_sent.load(Ordering::Relaxed);
213        let total_bytes_received = self.total_bytes_received.load(Ordering::Relaxed);
214
215        let mut stats = SessionStatistics::default();
216        stats.active_sessions = active_sessions;
217        stats.total_sessions_created = total_sessions;
218        stats.total_bytes_forwarded = total_bytes_sent + total_bytes_received;
219
220        // Calculate average session duration if we have historical data
221        let elapsed = self.start_time.elapsed().as_secs_f64();
222        if total_sessions > 0 && elapsed > 0.0 {
223            stats.avg_session_duration = elapsed / total_sessions as f64;
224        }
225
226        stats
227    }
228
229    /// Collect connection statistics from atomic counters
230    fn collect_connection_statistics(&self) -> ConnectionStatistics {
231        let active_connections = self.active_connections.load(Ordering::Relaxed);
232        let total_bytes_sent = self.total_bytes_sent.load(Ordering::Relaxed);
233        let total_bytes_received = self.total_bytes_received.load(Ordering::Relaxed);
234
235        let mut stats = ConnectionStatistics::default();
236        stats.active_connections = active_connections;
237        stats.total_bytes_sent = total_bytes_sent;
238        stats.total_bytes_received = total_bytes_received;
239
240        // Calculate average bandwidth usage
241        let elapsed = self.start_time.elapsed().as_secs_f64();
242        if elapsed > 0.0 {
243            let total_bytes = total_bytes_sent + total_bytes_received;
244            stats.avg_bandwidth_usage = total_bytes as f64 / elapsed;
245        }
246
247        // Peak concurrent connections would need to be tracked over time
248        stats.peak_concurrent_connections = active_connections;
249
250        stats
251    }
252
253    /// Collect error statistics
254    #[allow(clippy::unwrap_used)]
255    fn collect_error_statistics(&self) -> ErrorStatistics {
256        let error_counts = self.error_counts.lock().unwrap();
257        let queue_stats = self.queue_stats.lock().unwrap();
258
259        let mut error_stats = ErrorStatistics::default();
260        error_stats.error_breakdown = error_counts.clone();
261
262        // Categorize errors
263        for (error_type, count) in error_counts.iter() {
264            if error_type.contains("protocol") || error_type.contains("frame") {
265                error_stats.protocol_errors += count;
266            } else if error_type.contains("resource") || error_type.contains("exhausted") {
267                error_stats.resource_exhausted += count;
268            } else if error_type.contains("session") {
269                error_stats.session_errors += count;
270            } else if error_type.contains("auth") {
271                error_stats.auth_failures += count;
272            } else if error_type.contains("network") || error_type.contains("connection") {
273                error_stats.network_errors += count;
274            } else {
275                error_stats.internal_errors += count;
276            }
277        }
278
279        // Add queue-related failures
280        error_stats.resource_exhausted += queue_stats.requests_dropped;
281        error_stats.protocol_errors += queue_stats.requests_failed;
282
283        // Calculate error rate
284        let total_errors = error_stats.protocol_errors
285            + error_stats.resource_exhausted
286            + error_stats.session_errors
287            + error_stats.auth_failures
288            + error_stats.network_errors
289            + error_stats.internal_errors;
290
291        let elapsed = self.start_time.elapsed().as_secs_f64();
292        if elapsed > 0.0 {
293            error_stats.error_rate = total_errors as f64 / elapsed;
294        }
295
296        error_stats
297    }
298
299    /// Reset all statistics (useful for testing)
300    #[allow(clippy::unwrap_used)]
301    pub fn reset(&self) {
302        {
303            let mut queue_stats = self.queue_stats.lock().unwrap();
304            *queue_stats = RelayStats::default();
305        }
306        {
307            let mut error_counts = self.error_counts.lock().unwrap();
308            error_counts.clear();
309        }
310        {
311            let mut auth_stats = self.auth_stats.lock().unwrap();
312            *auth_stats = AuthenticationStatistics::default();
313        }
314        {
315            let mut rate_limit_stats = self.rate_limit_stats.lock().unwrap();
316            *rate_limit_stats = RateLimitingStatistics::default();
317        }
318
319        self.active_sessions.store(0, Ordering::Relaxed);
320        self.total_sessions.store(0, Ordering::Relaxed);
321        self.active_connections.store(0, Ordering::Relaxed);
322        self.total_bytes_sent.store(0, Ordering::Relaxed);
323        self.total_bytes_received.store(0, Ordering::Relaxed);
324    }
325}
326
327impl Default for RelayStatisticsCollector {
328    fn default() -> Self {
329        Self::new()
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn test_statistics_collector_creation() {
339        let collector = RelayStatisticsCollector::new();
340        let stats = collector.collect_statistics();
341
342        // Should start with empty statistics
343        assert_eq!(stats.session_stats.active_sessions, 0);
344        assert_eq!(stats.connection_stats.total_connections, 0);
345        assert_eq!(stats.auth_stats.total_auth_attempts, 0);
346        assert!(stats.is_healthy());
347    }
348
349    #[test]
350    fn test_auth_tracking() {
351        let collector = RelayStatisticsCollector::new();
352
353        // Record some authentication attempts
354        collector.record_auth_attempt(true, None);
355        collector.record_auth_attempt(false, Some("signature verification failed"));
356        collector.record_auth_attempt(false, Some("replay attack detected"));
357
358        let stats = collector.collect_statistics();
359        assert_eq!(stats.auth_stats.total_auth_attempts, 3);
360        assert_eq!(stats.auth_stats.successful_auths, 1);
361        assert_eq!(stats.auth_stats.failed_auths, 2);
362        assert_eq!(stats.auth_stats.invalid_signatures, 1);
363        assert_eq!(stats.auth_stats.replay_attacks_blocked, 1);
364    }
365
366    #[test]
367    fn test_rate_limiting_tracking() {
368        let collector = RelayStatisticsCollector::new();
369
370        // Record some rate limiting decisions
371        collector.record_rate_limit(true);
372        collector.record_rate_limit(true);
373        collector.record_rate_limit(false);
374        collector.record_rate_limit(true);
375
376        let stats = collector.collect_statistics();
377        assert_eq!(stats.rate_limit_stats.total_requests, 4);
378        assert_eq!(stats.rate_limit_stats.requests_allowed, 3);
379        assert_eq!(stats.rate_limit_stats.requests_blocked, 1);
380        assert_eq!(stats.rate_limit_stats.efficiency_percentage, 75.0);
381    }
382
383    #[test]
384    fn test_error_tracking() {
385        let collector = RelayStatisticsCollector::new();
386
387        // Record various errors
388        collector.record_error("protocol_error");
389        collector.record_error("resource_exhausted");
390        collector.record_error("session_timeout");
391        collector.record_error("auth_failed");
392
393        let stats = collector.collect_statistics();
394        assert_eq!(stats.error_stats.protocol_errors, 1);
395        assert_eq!(stats.error_stats.resource_exhausted, 1);
396        assert_eq!(stats.error_stats.session_errors, 1);
397        assert_eq!(stats.error_stats.auth_failures, 1);
398        assert_eq!(stats.error_stats.error_breakdown.len(), 4);
399    }
400
401    #[test]
402    fn test_session_count_updates() {
403        let collector = RelayStatisticsCollector::new();
404
405        // Update session counts
406        collector.update_session_count(5, 100);
407
408        let stats = collector.collect_statistics();
409        assert_eq!(stats.session_stats.active_sessions, 5);
410        assert_eq!(stats.session_stats.total_sessions_created, 100);
411    }
412
413    #[test]
414    fn test_bytes_transferred() {
415        let collector = RelayStatisticsCollector::new();
416
417        // Add some bytes transferred
418        collector.add_bytes_transferred(1000, 2000);
419        collector.add_bytes_transferred(500, 500);
420
421        let stats = collector.collect_statistics();
422        assert_eq!(stats.connection_stats.total_bytes_sent, 1500);
423        assert_eq!(stats.connection_stats.total_bytes_received, 2500);
424        assert_eq!(stats.session_stats.total_bytes_forwarded, 4000);
425    }
426
427    #[test]
428    fn test_success_rate_calculation() {
429        let collector = RelayStatisticsCollector::new();
430
431        // Record more successful operations to ensure > 50% success rate
432        collector.record_auth_attempt(true, None);
433        collector.record_auth_attempt(true, None);
434        collector.record_auth_attempt(true, None);
435        collector.record_auth_attempt(true, None);
436
437        // Note: record_rate_limit doesn't affect the success_rate calculation
438        // as it's not counted in total_ops
439        collector.record_rate_limit(true);
440        collector.record_rate_limit(true);
441
442        // Record some failures (but less than successes)
443        collector.record_auth_attempt(false, None);
444        collector.record_error("protocol_error");
445
446        let stats = collector.collect_statistics();
447
448        // Should have a good success rate but not perfect due to failures
449        let success_rate = stats.success_rate();
450        assert!(success_rate > 0.5);
451        assert!(success_rate < 1.0);
452    }
453
454    #[test]
455    fn test_reset_functionality() {
456        let collector = RelayStatisticsCollector::new();
457
458        // Add some data
459        collector.record_auth_attempt(true, None);
460        collector.record_error("test_error");
461        collector.record_rate_limit(false);
462        collector.update_session_count(10, 50);
463        collector.add_bytes_transferred(1000, 2000);
464
465        // Verify data exists
466        let stats_before = collector.collect_statistics();
467        assert!(stats_before.auth_stats.total_auth_attempts > 0);
468        assert_eq!(stats_before.session_stats.active_sessions, 10);
469
470        // Reset and verify clean state
471        collector.reset();
472        let stats_after = collector.collect_statistics();
473        assert_eq!(stats_after.auth_stats.total_auth_attempts, 0);
474        assert_eq!(stats_after.rate_limit_stats.total_requests, 0);
475        assert_eq!(stats_after.session_stats.active_sessions, 0);
476        assert_eq!(stats_after.connection_stats.total_bytes_sent, 0);
477    }
478}