1use super::{
14 AuthenticationStatistics, ConnectionStatistics, ErrorStatistics, RateLimitingStatistics,
15 RelayStatistics, SessionStatistics,
16};
17use crate::endpoint::RelayStats;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Instant;
22
23#[derive(Debug)]
25pub struct RelayStatisticsCollector {
26 queue_stats: Arc<Mutex<RelayStats>>,
28
29 error_counts: Arc<Mutex<HashMap<String, u64>>>,
31
32 auth_stats: Arc<Mutex<AuthenticationStatistics>>,
34
35 rate_limit_stats: Arc<Mutex<RateLimitingStatistics>>,
37
38 start_time: Instant,
40
41 last_snapshot: Arc<Mutex<RelayStatistics>>,
43
44 active_sessions: AtomicU32,
46
47 total_sessions: AtomicU64,
49
50 active_connections: AtomicU32,
52
53 total_bytes_sent: AtomicU64,
55
56 total_bytes_received: AtomicU64,
58}
59
60impl Clone for RelayStatisticsCollector {
61 fn clone(&self) -> Self {
62 Self {
63 queue_stats: Arc::clone(&self.queue_stats),
64 error_counts: Arc::clone(&self.error_counts),
65 auth_stats: Arc::clone(&self.auth_stats),
66 rate_limit_stats: Arc::clone(&self.rate_limit_stats),
67 start_time: self.start_time,
68 last_snapshot: Arc::clone(&self.last_snapshot),
69 active_sessions: AtomicU32::new(self.active_sessions.load(Ordering::Relaxed)),
70 total_sessions: AtomicU64::new(self.total_sessions.load(Ordering::Relaxed)),
71 active_connections: AtomicU32::new(self.active_connections.load(Ordering::Relaxed)),
72 total_bytes_sent: AtomicU64::new(self.total_bytes_sent.load(Ordering::Relaxed)),
73 total_bytes_received: AtomicU64::new(self.total_bytes_received.load(Ordering::Relaxed)),
74 }
75 }
76}
77
78impl RelayStatisticsCollector {
79 pub fn new() -> Self {
81 Self {
82 queue_stats: Arc::new(Mutex::new(RelayStats::default())),
83 error_counts: Arc::new(Mutex::new(HashMap::new())),
84 auth_stats: Arc::new(Mutex::new(AuthenticationStatistics::default())),
85 rate_limit_stats: Arc::new(Mutex::new(RateLimitingStatistics::default())),
86 start_time: Instant::now(),
87 last_snapshot: Arc::new(Mutex::new(RelayStatistics::default())),
88 active_sessions: AtomicU32::new(0),
89 total_sessions: AtomicU64::new(0),
90 active_connections: AtomicU32::new(0),
91 total_bytes_sent: AtomicU64::new(0),
92 total_bytes_received: AtomicU64::new(0),
93 }
94 }
95
96 pub fn update_session_count(&self, active: u32, total: u64) {
98 self.active_sessions.store(active, Ordering::Relaxed);
99 self.total_sessions.store(total, Ordering::Relaxed);
100 }
101
102 pub fn update_connection_count(&self, active: u32) {
104 self.active_connections.store(active, Ordering::Relaxed);
105 }
106
107 pub fn add_bytes_transferred(&self, sent: u64, received: u64) {
109 self.total_bytes_sent.fetch_add(sent, Ordering::Relaxed);
110 self.total_bytes_received
111 .fetch_add(received, Ordering::Relaxed);
112 }
113
114 #[allow(clippy::unwrap_used)]
116 pub fn update_queue_stats(&self, stats: &RelayStats) {
117 let mut queue_stats = self.queue_stats.lock().unwrap();
118 *queue_stats = stats.clone();
119 }
120
121 #[allow(clippy::unwrap_used)]
123 pub fn record_auth_attempt(&self, success: bool, error: Option<&str>) {
124 let mut auth_stats = self.auth_stats.lock().unwrap();
125 auth_stats.total_auth_attempts += 1;
126
127 if success {
128 auth_stats.successful_auths += 1;
129 } else {
130 auth_stats.failed_auths += 1;
131
132 if let Some(error_msg) = error {
133 if error_msg.contains("replay") {
134 auth_stats.replay_attacks_blocked += 1;
135 } else if error_msg.contains("signature") {
136 auth_stats.invalid_signatures += 1;
137 } else if error_msg.contains("unknown") || error_msg.contains("trusted") {
138 auth_stats.unknown_peer_keys += 1;
139 }
140 }
141 }
142
143 let elapsed = self.start_time.elapsed().as_secs_f64();
145 if elapsed > 0.0 {
146 auth_stats.auth_rate = auth_stats.total_auth_attempts as f64 / elapsed;
147 }
148 }
149
150 #[allow(clippy::unwrap_used)]
152 pub fn record_rate_limit(&self, allowed: bool) {
153 let mut rate_stats = self.rate_limit_stats.lock().unwrap();
154 rate_stats.total_requests += 1;
155
156 if allowed {
157 rate_stats.requests_allowed += 1;
158 } else {
159 rate_stats.requests_blocked += 1;
160 }
161
162 if rate_stats.total_requests > 0 {
164 rate_stats.efficiency_percentage =
165 (rate_stats.requests_allowed as f64 / rate_stats.total_requests as f64) * 100.0;
166 }
167 }
168
169 #[allow(clippy::unwrap_used)]
171 pub fn record_error(&self, error_type: &str) {
172 let mut error_counts = self.error_counts.lock().unwrap();
173 *error_counts.entry(error_type.to_string()).or_insert(0) += 1;
174 }
175
176 #[allow(clippy::unwrap_used)]
178 pub fn collect_statistics(&self) -> RelayStatistics {
179 let session_stats = self.collect_session_statistics();
180 let connection_stats = self.collect_connection_statistics();
181 let auth_stats = self.auth_stats.lock().unwrap().clone();
182 let rate_limit_stats = self.rate_limit_stats.lock().unwrap().clone();
183 let error_stats = self.collect_error_statistics();
184
185 let stats = RelayStatistics {
186 session_stats,
187 connection_stats,
188 auth_stats,
189 rate_limit_stats,
190 error_stats,
191 };
192
193 {
195 let mut last_snapshot = self.last_snapshot.lock().unwrap();
196 *last_snapshot = stats.clone();
197 }
198
199 stats
200 }
201
202 #[allow(clippy::unwrap_used)]
204 pub fn get_last_snapshot(&self) -> RelayStatistics {
205 self.last_snapshot.lock().unwrap().clone()
206 }
207
208 fn collect_session_statistics(&self) -> SessionStatistics {
210 let active_sessions = self.active_sessions.load(Ordering::Relaxed);
211 let total_sessions = self.total_sessions.load(Ordering::Relaxed);
212 let total_bytes_sent = self.total_bytes_sent.load(Ordering::Relaxed);
213 let total_bytes_received = self.total_bytes_received.load(Ordering::Relaxed);
214
215 let mut stats = SessionStatistics::default();
216 stats.active_sessions = active_sessions;
217 stats.total_sessions_created = total_sessions;
218 stats.total_bytes_forwarded = total_bytes_sent + total_bytes_received;
219
220 let elapsed = self.start_time.elapsed().as_secs_f64();
222 if total_sessions > 0 && elapsed > 0.0 {
223 stats.avg_session_duration = elapsed / total_sessions as f64;
224 }
225
226 stats
227 }
228
229 fn collect_connection_statistics(&self) -> ConnectionStatistics {
231 let active_connections = self.active_connections.load(Ordering::Relaxed);
232 let total_bytes_sent = self.total_bytes_sent.load(Ordering::Relaxed);
233 let total_bytes_received = self.total_bytes_received.load(Ordering::Relaxed);
234
235 let mut stats = ConnectionStatistics::default();
236 stats.active_connections = active_connections;
237 stats.total_bytes_sent = total_bytes_sent;
238 stats.total_bytes_received = total_bytes_received;
239
240 let elapsed = self.start_time.elapsed().as_secs_f64();
242 if elapsed > 0.0 {
243 let total_bytes = total_bytes_sent + total_bytes_received;
244 stats.avg_bandwidth_usage = total_bytes as f64 / elapsed;
245 }
246
247 stats.peak_concurrent_connections = active_connections;
249
250 stats
251 }
252
253 #[allow(clippy::unwrap_used)]
255 fn collect_error_statistics(&self) -> ErrorStatistics {
256 let error_counts = self.error_counts.lock().unwrap();
257 let queue_stats = self.queue_stats.lock().unwrap();
258
259 let mut error_stats = ErrorStatistics::default();
260 error_stats.error_breakdown = error_counts.clone();
261
262 for (error_type, count) in error_counts.iter() {
264 if error_type.contains("protocol") || error_type.contains("frame") {
265 error_stats.protocol_errors += count;
266 } else if error_type.contains("resource") || error_type.contains("exhausted") {
267 error_stats.resource_exhausted += count;
268 } else if error_type.contains("session") {
269 error_stats.session_errors += count;
270 } else if error_type.contains("auth") {
271 error_stats.auth_failures += count;
272 } else if error_type.contains("network") || error_type.contains("connection") {
273 error_stats.network_errors += count;
274 } else {
275 error_stats.internal_errors += count;
276 }
277 }
278
279 error_stats.resource_exhausted += queue_stats.requests_dropped;
281 error_stats.protocol_errors += queue_stats.requests_failed;
282
283 let total_errors = error_stats.protocol_errors
285 + error_stats.resource_exhausted
286 + error_stats.session_errors
287 + error_stats.auth_failures
288 + error_stats.network_errors
289 + error_stats.internal_errors;
290
291 let elapsed = self.start_time.elapsed().as_secs_f64();
292 if elapsed > 0.0 {
293 error_stats.error_rate = total_errors as f64 / elapsed;
294 }
295
296 error_stats
297 }
298
299 #[allow(clippy::unwrap_used)]
301 pub fn reset(&self) {
302 {
303 let mut queue_stats = self.queue_stats.lock().unwrap();
304 *queue_stats = RelayStats::default();
305 }
306 {
307 let mut error_counts = self.error_counts.lock().unwrap();
308 error_counts.clear();
309 }
310 {
311 let mut auth_stats = self.auth_stats.lock().unwrap();
312 *auth_stats = AuthenticationStatistics::default();
313 }
314 {
315 let mut rate_limit_stats = self.rate_limit_stats.lock().unwrap();
316 *rate_limit_stats = RateLimitingStatistics::default();
317 }
318
319 self.active_sessions.store(0, Ordering::Relaxed);
320 self.total_sessions.store(0, Ordering::Relaxed);
321 self.active_connections.store(0, Ordering::Relaxed);
322 self.total_bytes_sent.store(0, Ordering::Relaxed);
323 self.total_bytes_received.store(0, Ordering::Relaxed);
324 }
325}
326
327impl Default for RelayStatisticsCollector {
328 fn default() -> Self {
329 Self::new()
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_statistics_collector_creation() {
339 let collector = RelayStatisticsCollector::new();
340 let stats = collector.collect_statistics();
341
342 assert_eq!(stats.session_stats.active_sessions, 0);
344 assert_eq!(stats.connection_stats.total_connections, 0);
345 assert_eq!(stats.auth_stats.total_auth_attempts, 0);
346 assert!(stats.is_healthy());
347 }
348
349 #[test]
350 fn test_auth_tracking() {
351 let collector = RelayStatisticsCollector::new();
352
353 collector.record_auth_attempt(true, None);
355 collector.record_auth_attempt(false, Some("signature verification failed"));
356 collector.record_auth_attempt(false, Some("replay attack detected"));
357
358 let stats = collector.collect_statistics();
359 assert_eq!(stats.auth_stats.total_auth_attempts, 3);
360 assert_eq!(stats.auth_stats.successful_auths, 1);
361 assert_eq!(stats.auth_stats.failed_auths, 2);
362 assert_eq!(stats.auth_stats.invalid_signatures, 1);
363 assert_eq!(stats.auth_stats.replay_attacks_blocked, 1);
364 }
365
366 #[test]
367 fn test_rate_limiting_tracking() {
368 let collector = RelayStatisticsCollector::new();
369
370 collector.record_rate_limit(true);
372 collector.record_rate_limit(true);
373 collector.record_rate_limit(false);
374 collector.record_rate_limit(true);
375
376 let stats = collector.collect_statistics();
377 assert_eq!(stats.rate_limit_stats.total_requests, 4);
378 assert_eq!(stats.rate_limit_stats.requests_allowed, 3);
379 assert_eq!(stats.rate_limit_stats.requests_blocked, 1);
380 assert_eq!(stats.rate_limit_stats.efficiency_percentage, 75.0);
381 }
382
383 #[test]
384 fn test_error_tracking() {
385 let collector = RelayStatisticsCollector::new();
386
387 collector.record_error("protocol_error");
389 collector.record_error("resource_exhausted");
390 collector.record_error("session_timeout");
391 collector.record_error("auth_failed");
392
393 let stats = collector.collect_statistics();
394 assert_eq!(stats.error_stats.protocol_errors, 1);
395 assert_eq!(stats.error_stats.resource_exhausted, 1);
396 assert_eq!(stats.error_stats.session_errors, 1);
397 assert_eq!(stats.error_stats.auth_failures, 1);
398 assert_eq!(stats.error_stats.error_breakdown.len(), 4);
399 }
400
401 #[test]
402 fn test_session_count_updates() {
403 let collector = RelayStatisticsCollector::new();
404
405 collector.update_session_count(5, 100);
407
408 let stats = collector.collect_statistics();
409 assert_eq!(stats.session_stats.active_sessions, 5);
410 assert_eq!(stats.session_stats.total_sessions_created, 100);
411 }
412
413 #[test]
414 fn test_bytes_transferred() {
415 let collector = RelayStatisticsCollector::new();
416
417 collector.add_bytes_transferred(1000, 2000);
419 collector.add_bytes_transferred(500, 500);
420
421 let stats = collector.collect_statistics();
422 assert_eq!(stats.connection_stats.total_bytes_sent, 1500);
423 assert_eq!(stats.connection_stats.total_bytes_received, 2500);
424 assert_eq!(stats.session_stats.total_bytes_forwarded, 4000);
425 }
426
427 #[test]
428 fn test_success_rate_calculation() {
429 let collector = RelayStatisticsCollector::new();
430
431 collector.record_auth_attempt(true, None);
433 collector.record_auth_attempt(true, None);
434 collector.record_auth_attempt(true, None);
435 collector.record_auth_attempt(true, None);
436
437 collector.record_rate_limit(true);
440 collector.record_rate_limit(true);
441
442 collector.record_auth_attempt(false, None);
444 collector.record_error("protocol_error");
445
446 let stats = collector.collect_statistics();
447
448 let success_rate = stats.success_rate();
450 assert!(success_rate > 0.5);
451 assert!(success_rate < 1.0);
452 }
453
454 #[test]
455 fn test_reset_functionality() {
456 let collector = RelayStatisticsCollector::new();
457
458 collector.record_auth_attempt(true, None);
460 collector.record_error("test_error");
461 collector.record_rate_limit(false);
462 collector.update_session_count(10, 50);
463 collector.add_bytes_transferred(1000, 2000);
464
465 let stats_before = collector.collect_statistics();
467 assert!(stats_before.auth_stats.total_auth_attempts > 0);
468 assert_eq!(stats_before.session_stats.active_sessions, 10);
469
470 collector.reset();
472 let stats_after = collector.collect_statistics();
473 assert_eq!(stats_after.auth_stats.total_auth_attempts, 0);
474 assert_eq!(stats_after.rate_limit_stats.total_requests, 0);
475 assert_eq!(stats_after.session_stats.active_sessions, 0);
476 assert_eq!(stats_after.connection_stats.total_bytes_sent, 0);
477 }
478}