ant_quic/relay/
statistics.rs1use super::{
4 RelayStatistics, SessionStatistics, ConnectionStatistics,
5 AuthenticationStatistics, RateLimitingStatistics, ErrorStatistics,
6 SessionManager, RelayConnection,
7};
8use crate::endpoint::RelayStats;
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Clone)]
15pub struct RelayStatisticsCollector {
16 queue_stats: Arc<Mutex<RelayStats>>,
18
19 session_managers: Arc<Mutex<Vec<Arc<SessionManager>>>>,
21
22 connections: Arc<Mutex<HashMap<u32, Arc<RelayConnection>>>>,
24
25 error_counts: Arc<Mutex<HashMap<String, u64>>>,
27
28 auth_stats: Arc<Mutex<AuthenticationStatistics>>,
30
31 rate_limit_stats: Arc<Mutex<RateLimitingStatistics>>,
33
34 start_time: Instant,
36
37 last_snapshot: Arc<Mutex<RelayStatistics>>,
39}
40
41impl RelayStatisticsCollector {
42 pub fn new() -> Self {
44 Self {
45 queue_stats: Arc::new(Mutex::new(RelayStats::default())),
46 session_managers: Arc::new(Mutex::new(Vec::new())),
47 connections: Arc::new(Mutex::new(HashMap::new())),
48 error_counts: Arc::new(Mutex::new(HashMap::new())),
49 auth_stats: Arc::new(Mutex::new(AuthenticationStatistics::default())),
50 rate_limit_stats: Arc::new(Mutex::new(RateLimitingStatistics::default())),
51 start_time: Instant::now(),
52 last_snapshot: Arc::new(Mutex::new(RelayStatistics::default())),
53 }
54 }
55
56 pub fn register_session_manager(&self, session_manager: Arc<SessionManager>) {
58 let mut managers = self.session_managers.lock().unwrap();
59 managers.push(session_manager);
60 }
61
62 pub fn register_connection(&self, session_id: u32, connection: Arc<RelayConnection>) {
64 let mut connections = self.connections.lock().unwrap();
65 connections.insert(session_id, connection);
66 }
67
68 pub fn unregister_connection(&self, session_id: u32) {
70 let mut connections = self.connections.lock().unwrap();
71 connections.remove(&session_id);
72 }
73
74 pub fn update_queue_stats(&self, stats: &RelayStats) {
76 let mut queue_stats = self.queue_stats.lock().unwrap();
77 *queue_stats = stats.clone();
78 }
79
80 pub fn record_auth_attempt(&self, success: bool, error: Option<&str>) {
82 let mut auth_stats = self.auth_stats.lock().unwrap();
83 auth_stats.total_auth_attempts += 1;
84
85 if success {
86 auth_stats.successful_auths += 1;
87 } else {
88 auth_stats.failed_auths += 1;
89
90 if let Some(error_msg) = error {
91 if error_msg.contains("replay") {
92 auth_stats.replay_attacks_blocked += 1;
93 } else if error_msg.contains("signature") {
94 auth_stats.invalid_signatures += 1;
95 } else if error_msg.contains("unknown") || error_msg.contains("trusted") {
96 auth_stats.unknown_peer_keys += 1;
97 }
98 }
99 }
100
101 let elapsed = self.start_time.elapsed().as_secs_f64();
103 if elapsed > 0.0 {
104 auth_stats.auth_rate = auth_stats.total_auth_attempts as f64 / elapsed;
105 }
106 }
107
108 pub fn record_rate_limit(&self, allowed: bool) {
110 let mut rate_stats = self.rate_limit_stats.lock().unwrap();
111 rate_stats.total_requests += 1;
112
113 if allowed {
114 rate_stats.requests_allowed += 1;
115 } else {
116 rate_stats.requests_blocked += 1;
117 }
118
119 if rate_stats.total_requests > 0 {
121 rate_stats.efficiency_percentage =
122 (rate_stats.requests_allowed as f64 / rate_stats.total_requests as f64) * 100.0;
123 }
124 }
125
126 pub fn record_error(&self, error_type: &str) {
128 let mut error_counts = self.error_counts.lock().unwrap();
129 *error_counts.entry(error_type.to_string()).or_insert(0) += 1;
130 }
131
132 pub fn collect_statistics(&self) -> RelayStatistics {
134 let session_stats = self.collect_session_statistics();
135 let connection_stats = self.collect_connection_statistics();
136 let auth_stats = self.auth_stats.lock().unwrap().clone();
137 let rate_limit_stats = self.rate_limit_stats.lock().unwrap().clone();
138 let error_stats = self.collect_error_statistics();
139
140 let stats = RelayStatistics {
141 session_stats,
142 connection_stats,
143 auth_stats,
144 rate_limit_stats,
145 error_stats,
146 };
147
148 {
150 let mut last_snapshot = self.last_snapshot.lock().unwrap();
151 *last_snapshot = stats.clone();
152 }
153
154 stats
155 }
156
157 pub fn get_last_snapshot(&self) -> RelayStatistics {
159 self.last_snapshot.lock().unwrap().clone()
160 }
161
162 fn collect_session_statistics(&self) -> SessionStatistics {
164 let managers = self.session_managers.lock().unwrap();
165 let mut total_stats = SessionStatistics::default();
166
167 for manager in managers.iter() {
168 let mgr_stats = manager.get_statistics();
169
170 total_stats.active_sessions += mgr_stats.active_sessions as u32;
172 total_stats.pending_sessions += mgr_stats.pending_sessions as u32;
173 total_stats.total_bytes_forwarded += mgr_stats.total_bytes_sent + mgr_stats.total_bytes_received;
174
175 if mgr_stats.total_sessions > 0 {
177 total_stats.total_sessions_created += mgr_stats.total_sessions as u64;
178 }
179 }
180
181 let elapsed = self.start_time.elapsed().as_secs_f64();
184 if total_stats.total_sessions_created > 0 && elapsed > 0.0 {
185 total_stats.avg_session_duration = elapsed / total_stats.total_sessions_created as f64;
186 }
187
188 total_stats
189 }
190
191 fn collect_connection_statistics(&self) -> ConnectionStatistics {
193 let connections = self.connections.lock().unwrap();
194 let mut total_stats = ConnectionStatistics::default();
195
196 total_stats.total_connections = connections.len() as u64;
197
198 for connection in connections.values() {
199 let conn_stats = connection.get_stats();
200
201 if conn_stats.is_active {
202 total_stats.active_connections += 1;
203 }
204
205 total_stats.total_bytes_sent += conn_stats.bytes_sent;
206 total_stats.total_bytes_received += conn_stats.bytes_received;
207 }
208
209 let elapsed = self.start_time.elapsed().as_secs_f64();
211 if elapsed > 0.0 {
212 let total_bytes = total_stats.total_bytes_sent + total_stats.total_bytes_received;
213 total_stats.avg_bandwidth_usage = total_bytes as f64 / elapsed;
214 }
215
216 total_stats.peak_concurrent_connections = total_stats.active_connections;
218
219 total_stats
220 }
221
222 fn collect_error_statistics(&self) -> ErrorStatistics {
224 let error_counts = self.error_counts.lock().unwrap();
225 let queue_stats = self.queue_stats.lock().unwrap();
226
227 let mut error_stats = ErrorStatistics::default();
228 error_stats.error_breakdown = error_counts.clone();
229
230 for (error_type, count) in error_counts.iter() {
232 if error_type.contains("protocol") || error_type.contains("frame") {
233 error_stats.protocol_errors += count;
234 } else if error_type.contains("resource") || error_type.contains("exhausted") {
235 error_stats.resource_exhausted += count;
236 } else if error_type.contains("session") {
237 error_stats.session_errors += count;
238 } else if error_type.contains("auth") {
239 error_stats.auth_failures += count;
240 } else if error_type.contains("network") || error_type.contains("connection") {
241 error_stats.network_errors += count;
242 } else {
243 error_stats.internal_errors += count;
244 }
245 }
246
247 error_stats.resource_exhausted += queue_stats.requests_dropped;
249 error_stats.protocol_errors += queue_stats.requests_failed;
250
251 let total_errors = error_stats.protocol_errors + error_stats.resource_exhausted
253 + error_stats.session_errors + error_stats.auth_failures
254 + error_stats.network_errors + error_stats.internal_errors;
255
256 let elapsed = self.start_time.elapsed().as_secs_f64();
257 if elapsed > 0.0 {
258 error_stats.error_rate = total_errors as f64 / elapsed;
259 }
260
261 error_stats
262 }
263
264 pub fn reset(&self) {
266 {
267 let mut queue_stats = self.queue_stats.lock().unwrap();
268 *queue_stats = RelayStats::default();
269 }
270 {
271 let mut error_counts = self.error_counts.lock().unwrap();
272 error_counts.clear();
273 }
274 {
275 let mut auth_stats = self.auth_stats.lock().unwrap();
276 *auth_stats = AuthenticationStatistics::default();
277 }
278 {
279 let mut rate_limit_stats = self.rate_limit_stats.lock().unwrap();
280 *rate_limit_stats = RateLimitingStatistics::default();
281 }
282 }
283}
284
285impl Default for RelayStatisticsCollector {
286 fn default() -> Self {
287 Self::new()
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn test_statistics_collector_creation() {
297 let collector = RelayStatisticsCollector::new();
298 let stats = collector.collect_statistics();
299
300 assert_eq!(stats.session_stats.active_sessions, 0);
302 assert_eq!(stats.connection_stats.total_connections, 0);
303 assert_eq!(stats.auth_stats.total_auth_attempts, 0);
304 assert!(stats.is_healthy());
305 }
306
307 #[test]
308 fn test_auth_tracking() {
309 let collector = RelayStatisticsCollector::new();
310
311 collector.record_auth_attempt(true, None);
313 collector.record_auth_attempt(false, Some("signature verification failed"));
314 collector.record_auth_attempt(false, Some("replay attack detected"));
315
316 let stats = collector.collect_statistics();
317 assert_eq!(stats.auth_stats.total_auth_attempts, 3);
318 assert_eq!(stats.auth_stats.successful_auths, 1);
319 assert_eq!(stats.auth_stats.failed_auths, 2);
320 assert_eq!(stats.auth_stats.invalid_signatures, 1);
321 assert_eq!(stats.auth_stats.replay_attacks_blocked, 1);
322 }
323
324 #[test]
325 fn test_rate_limiting_tracking() {
326 let collector = RelayStatisticsCollector::new();
327
328 collector.record_rate_limit(true);
330 collector.record_rate_limit(true);
331 collector.record_rate_limit(false);
332 collector.record_rate_limit(true);
333
334 let stats = collector.collect_statistics();
335 assert_eq!(stats.rate_limit_stats.total_requests, 4);
336 assert_eq!(stats.rate_limit_stats.requests_allowed, 3);
337 assert_eq!(stats.rate_limit_stats.requests_blocked, 1);
338 assert_eq!(stats.rate_limit_stats.efficiency_percentage, 75.0);
339 }
340
341 #[test]
342 fn test_error_tracking() {
343 let collector = RelayStatisticsCollector::new();
344
345 collector.record_error("protocol_error");
347 collector.record_error("resource_exhausted");
348 collector.record_error("session_timeout");
349 collector.record_error("auth_failed");
350
351 let stats = collector.collect_statistics();
352 assert_eq!(stats.error_stats.protocol_errors, 1);
353 assert_eq!(stats.error_stats.resource_exhausted, 1);
354 assert_eq!(stats.error_stats.session_errors, 1);
355 assert_eq!(stats.error_stats.auth_failures, 1);
356 assert_eq!(stats.error_stats.error_breakdown.len(), 4);
357 }
358
359 #[test]
360 fn test_success_rate_calculation() {
361 let collector = RelayStatisticsCollector::new();
362
363 collector.record_auth_attempt(true, None);
365 collector.record_auth_attempt(true, None);
366 collector.record_rate_limit(true);
367 collector.record_rate_limit(true);
368
369 collector.record_auth_attempt(false, None);
371 collector.record_error("protocol_error");
372
373 let stats = collector.collect_statistics();
374
375 let success_rate = stats.success_rate();
377 assert!(success_rate > 0.5);
378 assert!(success_rate < 1.0);
379 }
380
381 #[test]
382 fn test_reset_functionality() {
383 let collector = RelayStatisticsCollector::new();
384
385 collector.record_auth_attempt(true, None);
387 collector.record_error("test_error");
388 collector.record_rate_limit(false);
389
390 let stats_before = collector.collect_statistics();
392 assert!(stats_before.auth_stats.total_auth_attempts > 0);
393
394 collector.reset();
396 let stats_after = collector.collect_statistics();
397 assert_eq!(stats_after.auth_stats.total_auth_attempts, 0);
398 assert_eq!(stats_after.rate_limit_stats.total_requests, 0);
399 }
400}