1use serde::{Deserialize, Serialize};
39use std::collections::HashMap;
40use std::time::{Duration, Instant};
41
42fn instant_now() -> Instant {
44 Instant::now()
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct TrafficAnalyzerConfig {
50 pub window_size: Duration,
52 pub history_size: usize,
54 pub anomaly_threshold: f64,
56 pub min_samples: usize,
58 pub enable_peer_profiling: bool,
60 pub enable_protocol_tracking: bool,
62}
63
64impl Default for TrafficAnalyzerConfig {
65 fn default() -> Self {
66 Self {
67 window_size: Duration::from_secs(60),
68 history_size: 100,
69 anomaly_threshold: 3.0,
70 min_samples: 10,
71 enable_peer_profiling: true,
72 enable_protocol_tracking: true,
73 }
74 }
75}
76
77impl TrafficAnalyzerConfig {
78 pub fn short_term() -> Self {
80 Self {
81 window_size: Duration::from_secs(60),
82 history_size: 60,
83 ..Default::default()
84 }
85 }
86
87 pub fn long_term() -> Self {
89 Self {
90 window_size: Duration::from_secs(3600),
91 history_size: 24,
92 ..Default::default()
93 }
94 }
95
96 pub fn realtime() -> Self {
98 Self {
99 window_size: Duration::from_secs(5),
100 history_size: 720, anomaly_threshold: 2.5,
102 min_samples: 5,
103 ..Default::default()
104 }
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub enum TrafficEvent {
111 ConnectionEstablished {
113 peer_id: String,
114 #[serde(skip, default = "instant_now")]
115 timestamp: Instant,
116 },
117 ConnectionClosed {
119 peer_id: String,
120 duration: Duration,
121 bytes_transferred: u64,
122 },
123 Query {
125 peer_id: String,
126 latency: Duration,
127 success: bool,
128 },
129 BandwidthSample {
131 bytes_sent: u64,
132 bytes_received: u64,
133 duration: Duration,
134 },
135 ProtocolUsage {
137 protocol: String,
138 message_count: u64,
139 },
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct TrafficPattern {
145 pub pattern_type: PatternType,
147 pub description: String,
149 pub confidence: f64,
151 #[serde(skip, default = "instant_now")]
153 pub start_time: Instant,
154 pub duration: Duration,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
160pub enum PatternType {
161 Steady,
163 Increasing,
165 Decreasing,
167 Bursty,
169 Periodic,
171 Anomalous,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct TrafficAnomaly {
178 pub anomaly_type: AnomalyType,
180 pub description: String,
182 pub severity: f64,
184 #[serde(skip, default = "instant_now")]
186 pub timestamp: Instant,
187 pub peer_id: Option<String>,
189}
190
191#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
193pub enum AnomalyType {
194 BandwidthSpike,
196 ConnectionAnomaly,
198 QueryFailureSpike,
200 LatencySpike,
202 SuspiciousPeer,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct PeerProfile {
209 pub peer_id: String,
211 pub total_connections: usize,
213 pub total_queries: u64,
215 pub successful_queries: u64,
217 pub average_latency: Duration,
219 pub total_bytes: u64,
221 #[serde(skip, default = "instant_now")]
223 pub first_seen: Instant,
224 #[serde(skip, default = "instant_now")]
226 pub last_seen: Instant,
227 pub behavior_score: f64,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct TrafficAnalysis {
234 #[serde(skip, default = "instant_now")]
236 pub timestamp: Instant,
237 pub total_bandwidth: u64,
239 pub total_connections: usize,
241 pub total_queries: u64,
243 pub query_success_rate: f64,
245 pub average_latency: Duration,
247 pub patterns: Vec<TrafficPattern>,
249 pub anomalies: Vec<TrafficAnomaly>,
251 pub peer_profiles: HashMap<String, PeerProfile>,
253 pub protocol_distribution: HashMap<String, u64>,
255 pub bandwidth_trend: TrendDirection,
257 pub connection_trend: TrendDirection,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
263pub enum TrendDirection {
264 Increasing,
266 Decreasing,
268 Steady,
270 Unknown,
272}
273
274pub struct TrafficAnalyzer {
276 config: TrafficAnalyzerConfig,
277 events: Vec<TrafficEvent>,
278 peer_data: HashMap<String, PeerData>,
279 bandwidth_history: Vec<BandwidthSample>,
280 connection_history: Vec<usize>,
281 query_history: Vec<QuerySample>,
282 start_time: Instant,
283}
284
285#[derive(Debug, Clone)]
286struct PeerData {
287 connections: usize,
288 queries: u64,
289 successful_queries: u64,
290 latencies: Vec<Duration>,
291 bytes_transferred: u64,
292 first_seen: Instant,
293 last_seen: Instant,
294}
295
296#[derive(Debug, Clone)]
297struct BandwidthSample {
298 timestamp: Instant,
299 bytes: u64,
300}
301
302#[derive(Debug, Clone)]
303#[allow(dead_code)]
304struct QuerySample {
305 timestamp: Instant,
306 success_rate: f64,
307 latency: Duration,
308}
309
310impl TrafficAnalyzer {
311 pub fn new(config: TrafficAnalyzerConfig) -> Self {
313 Self {
314 config,
315 events: Vec::new(),
316 peer_data: HashMap::new(),
317 bandwidth_history: Vec::new(),
318 connection_history: Vec::new(),
319 query_history: Vec::new(),
320 start_time: Instant::now(),
321 }
322 }
323
324 pub fn record_connection(&mut self, peer_id: String, bytes: u64) {
326 let timestamp = Instant::now();
327 self.events.push(TrafficEvent::ConnectionEstablished {
328 peer_id: peer_id.clone(),
329 timestamp,
330 });
331
332 let data = self.peer_data.entry(peer_id).or_insert(PeerData {
333 connections: 0,
334 queries: 0,
335 successful_queries: 0,
336 latencies: Vec::new(),
337 bytes_transferred: 0,
338 first_seen: timestamp,
339 last_seen: timestamp,
340 });
341
342 data.connections += 1;
343 data.bytes_transferred += bytes;
344 data.last_seen = timestamp;
345
346 self.connection_history.push(self.peer_data.len());
347 }
348
349 pub fn record_query(&mut self, peer_id: String, latency: Duration, success: bool) {
351 self.events.push(TrafficEvent::Query {
352 peer_id: peer_id.clone(),
353 latency,
354 success,
355 });
356
357 if let Some(data) = self.peer_data.get_mut(&peer_id) {
358 data.queries += 1;
359 if success {
360 data.successful_queries += 1;
361 }
362 data.latencies.push(latency);
363 }
364
365 let success_rate = if success { 1.0 } else { 0.0 };
366 self.query_history.push(QuerySample {
367 timestamp: Instant::now(),
368 success_rate,
369 latency,
370 });
371 }
372
373 pub fn record_bandwidth(&mut self, bytes_sent: u64, bytes_received: u64) {
375 let total = bytes_sent + bytes_received;
376 self.bandwidth_history.push(BandwidthSample {
377 timestamp: Instant::now(),
378 bytes: total,
379 });
380
381 self.events.push(TrafficEvent::BandwidthSample {
382 bytes_sent,
383 bytes_received,
384 duration: Duration::from_secs(1),
385 });
386 }
387
388 pub fn analyze(&self) -> Result<TrafficAnalysis, TrafficAnalyzerError> {
390 let timestamp = Instant::now();
391
392 let total_bandwidth: u64 = self.bandwidth_history.iter().map(|s| s.bytes).sum();
394
395 let total_connections = self.connection_history.last().copied().unwrap_or(0);
397 let total_queries: u64 = self.peer_data.values().map(|p| p.queries).sum();
398 let successful_queries: u64 = self.peer_data.values().map(|p| p.successful_queries).sum();
399
400 let query_success_rate = if total_queries > 0 {
401 (successful_queries as f64 / total_queries as f64) * 100.0
402 } else {
403 0.0
404 };
405
406 let all_latencies: Vec<Duration> = self
408 .peer_data
409 .values()
410 .flat_map(|p| p.latencies.iter().copied())
411 .collect();
412
413 let average_latency = if !all_latencies.is_empty() {
414 let sum: Duration = all_latencies.iter().sum();
415 sum / all_latencies.len() as u32
416 } else {
417 Duration::ZERO
418 };
419
420 let patterns = self.detect_patterns();
422
423 let anomalies = self.detect_anomalies();
425
426 let peer_profiles = self.build_peer_profiles();
428
429 let bandwidth_trend = self.calculate_trend(
431 &self
432 .bandwidth_history
433 .iter()
434 .map(|s| s.bytes as f64)
435 .collect::<Vec<_>>(),
436 );
437 let connection_trend = self.calculate_trend(
438 &self
439 .connection_history
440 .iter()
441 .map(|&c| c as f64)
442 .collect::<Vec<_>>(),
443 );
444
445 Ok(TrafficAnalysis {
446 timestamp,
447 total_bandwidth,
448 total_connections,
449 total_queries,
450 query_success_rate,
451 average_latency,
452 patterns,
453 anomalies,
454 peer_profiles,
455 protocol_distribution: HashMap::new(),
456 bandwidth_trend,
457 connection_trend,
458 })
459 }
460
461 fn detect_patterns(&self) -> Vec<TrafficPattern> {
463 let mut patterns = Vec::new();
464
465 if self.bandwidth_history.len() >= self.config.min_samples {
467 let values: Vec<f64> = self
468 .bandwidth_history
469 .iter()
470 .map(|s| s.bytes as f64)
471 .collect();
472 let (mean, stddev) = Self::calculate_statistics(&values);
473
474 if stddev > mean * 0.5 {
475 patterns.push(TrafficPattern {
476 pattern_type: PatternType::Bursty,
477 description: "Traffic shows bursty pattern with high variance".to_string(),
478 confidence: 0.8,
479 start_time: self.start_time,
480 duration: Instant::now().duration_since(self.start_time),
481 });
482 }
483 }
484
485 patterns
486 }
487
488 fn detect_anomalies(&self) -> Vec<TrafficAnomaly> {
490 let mut anomalies = Vec::new();
491
492 if self.bandwidth_history.len() >= self.config.min_samples {
494 let values: Vec<f64> = self
495 .bandwidth_history
496 .iter()
497 .map(|s| s.bytes as f64)
498 .collect();
499 let (mean, stddev) = Self::calculate_statistics(&values);
500
501 for sample in &self.bandwidth_history {
502 let z_score = (sample.bytes as f64 - mean).abs() / stddev.max(1.0);
503 if z_score > self.config.anomaly_threshold {
504 anomalies.push(TrafficAnomaly {
505 anomaly_type: AnomalyType::BandwidthSpike,
506 description: format!(
507 "Bandwidth spike detected: {} bytes ({:.1} σ)",
508 sample.bytes, z_score
509 ),
510 severity: (z_score / self.config.anomaly_threshold).min(1.0),
511 timestamp: sample.timestamp,
512 peer_id: None,
513 });
514 }
515 }
516 }
517
518 if self.query_history.len() >= self.config.min_samples {
520 let success_rates: Vec<f64> =
521 self.query_history.iter().map(|q| q.success_rate).collect();
522 let recent_rate = success_rates.iter().rev().take(10).sum::<f64>() / 10.0;
523
524 if recent_rate < 0.5 {
525 anomalies.push(TrafficAnomaly {
526 anomaly_type: AnomalyType::QueryFailureSpike,
527 description: format!(
528 "High query failure rate: {:.1}%",
529 (1.0 - recent_rate) * 100.0
530 ),
531 severity: 1.0 - recent_rate,
532 timestamp: Instant::now(),
533 peer_id: None,
534 });
535 }
536 }
537
538 anomalies
539 }
540
541 fn build_peer_profiles(&self) -> HashMap<String, PeerProfile> {
543 self.peer_data
544 .iter()
545 .map(|(peer_id, data)| {
546 let average_latency = if !data.latencies.is_empty() {
547 let sum: Duration = data.latencies.iter().sum();
548 sum / data.latencies.len() as u32
549 } else {
550 Duration::ZERO
551 };
552
553 let behavior_score = if data.queries > 0 {
554 (data.successful_queries as f64 / data.queries as f64) * 100.0 / 100.0
555 } else {
556 1.0
557 };
558
559 let profile = PeerProfile {
560 peer_id: peer_id.clone(),
561 total_connections: data.connections,
562 total_queries: data.queries,
563 successful_queries: data.successful_queries,
564 average_latency,
565 total_bytes: data.bytes_transferred,
566 first_seen: data.first_seen,
567 last_seen: data.last_seen,
568 behavior_score,
569 };
570
571 (peer_id.clone(), profile)
572 })
573 .collect()
574 }
575
576 fn calculate_trend(&self, values: &[f64]) -> TrendDirection {
578 if values.len() < self.config.min_samples {
579 return TrendDirection::Unknown;
580 }
581
582 let n = values.len() as f64;
584 let x_mean = (0..values.len()).map(|i| i as f64).sum::<f64>() / n;
585 let y_mean = values.iter().sum::<f64>() / n;
586
587 let mut numerator = 0.0;
588 let mut denominator = 0.0;
589
590 for (i, &y) in values.iter().enumerate() {
591 let x = i as f64;
592 numerator += (x - x_mean) * (y - y_mean);
593 denominator += (x - x_mean) * (x - x_mean);
594 }
595
596 let slope = if denominator != 0.0 {
597 numerator / denominator
598 } else {
599 0.0
600 };
601
602 if slope > y_mean * 0.01 {
603 TrendDirection::Increasing
604 } else if slope < -y_mean * 0.01 {
605 TrendDirection::Decreasing
606 } else {
607 TrendDirection::Steady
608 }
609 }
610
611 fn calculate_statistics(values: &[f64]) -> (f64, f64) {
613 if values.is_empty() {
614 return (0.0, 0.0);
615 }
616
617 let mean = values.iter().sum::<f64>() / values.len() as f64;
618 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
619 let stddev = variance.sqrt();
620
621 (mean, stddev)
622 }
623
624 pub fn get_stats(&self) -> TrafficAnalyzerStats {
626 TrafficAnalyzerStats {
627 total_events: self.events.len(),
628 total_peers: self.peer_data.len(),
629 bandwidth_samples: self.bandwidth_history.len(),
630 query_samples: self.query_history.len(),
631 uptime: Instant::now().duration_since(self.start_time),
632 }
633 }
634
635 pub fn clear(&mut self) {
637 self.events.clear();
638 self.peer_data.clear();
639 self.bandwidth_history.clear();
640 self.connection_history.clear();
641 self.query_history.clear();
642 self.start_time = Instant::now();
643 }
644}
645
646#[derive(Debug, Clone, Serialize, Deserialize)]
648pub struct TrafficAnalyzerStats {
649 pub total_events: usize,
651 pub total_peers: usize,
653 pub bandwidth_samples: usize,
655 pub query_samples: usize,
657 pub uptime: Duration,
659}
660
661#[derive(Debug, thiserror::Error)]
663pub enum TrafficAnalyzerError {
664 #[error("Insufficient data for analysis")]
665 InsufficientData,
666
667 #[error("Analysis failed: {0}")]
668 AnalysisFailed(String),
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674
675 #[test]
676 fn test_config_presets() {
677 let short = TrafficAnalyzerConfig::short_term();
678 assert_eq!(short.window_size, Duration::from_secs(60));
679
680 let long = TrafficAnalyzerConfig::long_term();
681 assert_eq!(long.window_size, Duration::from_secs(3600));
682
683 let realtime = TrafficAnalyzerConfig::realtime();
684 assert_eq!(realtime.window_size, Duration::from_secs(5));
685 }
686
687 #[test]
688 fn test_analyzer_creation() {
689 let config = TrafficAnalyzerConfig::default();
690 let analyzer = TrafficAnalyzer::new(config);
691 let stats = analyzer.get_stats();
692 assert_eq!(stats.total_events, 0);
693 assert_eq!(stats.total_peers, 0);
694 }
695
696 #[test]
697 fn test_record_connection() {
698 let config = TrafficAnalyzerConfig::default();
699 let mut analyzer = TrafficAnalyzer::new(config);
700
701 analyzer.record_connection("peer1".to_string(), 1024);
702 let stats = analyzer.get_stats();
703 assert_eq!(stats.total_peers, 1);
704 }
705
706 #[test]
707 fn test_record_query() {
708 let config = TrafficAnalyzerConfig::default();
709 let mut analyzer = TrafficAnalyzer::new(config);
710
711 analyzer.record_connection("peer1".to_string(), 0);
712 analyzer.record_query("peer1".to_string(), Duration::from_millis(50), true);
713
714 let stats = analyzer.get_stats();
715 assert_eq!(stats.query_samples, 1);
716 }
717
718 #[test]
719 fn test_record_bandwidth() {
720 let config = TrafficAnalyzerConfig::default();
721 let mut analyzer = TrafficAnalyzer::new(config);
722
723 analyzer.record_bandwidth(1000, 2000);
724 let stats = analyzer.get_stats();
725 assert_eq!(stats.bandwidth_samples, 1);
726 }
727
728 #[test]
729 fn test_analyze() {
730 let config = TrafficAnalyzerConfig::default();
731 let mut analyzer = TrafficAnalyzer::new(config);
732
733 analyzer.record_connection("peer1".to_string(), 1024);
734 analyzer.record_query("peer1".to_string(), Duration::from_millis(50), true);
735 analyzer.record_bandwidth(1000, 2000);
736
737 let analysis = analyzer.analyze().unwrap();
738 assert_eq!(analysis.total_connections, 1);
739 assert_eq!(analysis.total_queries, 1);
740 assert_eq!(analysis.query_success_rate, 100.0);
741 }
742
743 #[test]
744 fn test_peer_profile() {
745 let config = TrafficAnalyzerConfig::default();
746 let mut analyzer = TrafficAnalyzer::new(config);
747
748 analyzer.record_connection("peer1".to_string(), 1024);
749 analyzer.record_query("peer1".to_string(), Duration::from_millis(50), true);
750
751 let analysis = analyzer.analyze().unwrap();
752 let profile = analysis.peer_profiles.get("peer1").unwrap();
753 assert_eq!(profile.total_connections, 1);
754 assert_eq!(profile.total_queries, 1);
755 assert_eq!(profile.behavior_score, 1.0);
756 }
757
758 #[test]
759 fn test_clear() {
760 let config = TrafficAnalyzerConfig::default();
761 let mut analyzer = TrafficAnalyzer::new(config);
762
763 analyzer.record_connection("peer1".to_string(), 1024);
764 analyzer.clear();
765
766 let stats = analyzer.get_stats();
767 assert_eq!(stats.total_events, 0);
768 assert_eq!(stats.total_peers, 0);
769 }
770
771 #[test]
772 fn test_trend_calculation() {
773 let config = TrafficAnalyzerConfig::default();
774 let analyzer = TrafficAnalyzer::new(config);
775
776 let increasing = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
778 assert_eq!(
779 analyzer.calculate_trend(&increasing),
780 TrendDirection::Increasing
781 );
782
783 let decreasing = vec![10.0, 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0];
785 assert_eq!(
786 analyzer.calculate_trend(&decreasing),
787 TrendDirection::Decreasing
788 );
789
790 let steady = vec![5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0];
792 assert_eq!(analyzer.calculate_trend(&steady), TrendDirection::Steady);
793 }
794
795 #[test]
796 fn test_statistics_calculation() {
797 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
798 let (mean, stddev) = TrafficAnalyzer::calculate_statistics(&values);
799 assert_eq!(mean, 3.0);
800 assert!((stddev - 1.414).abs() < 0.01);
801 }
802}