use std::collections::{HashMap, VecDeque};
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
pub struct DetectionConfig {
pub z_score_threshold: f64,
pub min_samples: usize,
pub max_samples: usize,
pub rate_window_secs: u64,
pub max_rate_increase: f64,
pub retention_secs: u64,
}
impl Default for DetectionConfig {
fn default() -> Self {
Self {
z_score_threshold: 3.0,
min_samples: 30,
max_samples: 1000,
rate_window_secs: 300, max_rate_increase: 5.0,
retention_secs: 3600, }
}
}
#[derive(Debug, Clone)]
pub struct BehaviorSample {
pub value: f64,
pub timestamp: SystemTime,
pub metric_type: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AnomalyType {
StatisticalOutlier,
RateAnomaly,
PatternAnomaly,
RangeAnomaly,
}
#[derive(Debug, Clone)]
pub struct Anomaly {
pub peer_id: String,
pub anomaly_type: AnomalyType,
pub severity: f64,
pub description: String,
pub sample_value: f64,
pub expected_range: (f64, f64),
pub detected_at: SystemTime,
}
#[derive(Debug)]
struct PeerHistory {
samples: VecDeque<BehaviorSample>,
anomaly_count: u64,
last_anomaly: Option<SystemTime>,
}
impl PeerHistory {
fn new() -> Self {
Self {
samples: VecDeque::new(),
anomaly_count: 0,
last_anomaly: None,
}
}
}
pub struct AnomalyDetector {
config: DetectionConfig,
peer_history: HashMap<String, PeerHistory>,
detected_anomalies: Vec<Anomaly>,
}
impl AnomalyDetector {
#[must_use]
#[inline]
pub fn new(config: DetectionConfig) -> Self {
Self {
config,
peer_history: HashMap::new(),
detected_anomalies: Vec::new(),
}
}
pub fn record_sample(&mut self, peer_id: &str, sample: BehaviorSample) {
let history = self
.peer_history
.entry(peer_id.to_string())
.or_insert_with(PeerHistory::new);
history.samples.push_back(sample);
while history.samples.len() > self.config.max_samples {
history.samples.pop_front();
}
self.cleanup_old_samples(peer_id);
}
#[must_use]
pub fn is_anomalous(&mut self, peer_id: &str, sample: &BehaviorSample) -> bool {
if let Some(anomaly) = self.detect_anomaly(peer_id, sample) {
self.record_anomaly(anomaly);
true
} else {
false
}
}
fn detect_anomaly(&self, peer_id: &str, sample: &BehaviorSample) -> Option<Anomaly> {
let history = self.peer_history.get(peer_id)?;
if history.samples.len() < self.config.min_samples {
return None;
}
let relevant_samples: Vec<f64> = history
.samples
.iter()
.filter(|s| s.metric_type == sample.metric_type)
.map(|s| s.value)
.collect();
if relevant_samples.len() < self.config.min_samples {
return None;
}
let mean = relevant_samples.iter().sum::<f64>() / relevant_samples.len() as f64;
let variance = relevant_samples
.iter()
.map(|v| (v - mean).powi(2))
.sum::<f64>()
/ relevant_samples.len() as f64;
let std_dev = variance.sqrt();
if std_dev > 0.0 {
let z_score = (sample.value - mean).abs() / std_dev;
if z_score > self.config.z_score_threshold {
let severity = (z_score / (self.config.z_score_threshold * 2.0)).min(1.0);
return Some(Anomaly {
peer_id: peer_id.to_string(),
anomaly_type: AnomalyType::StatisticalOutlier,
severity,
description: format!(
"Value {:.2} deviates {:.2} standard deviations from mean {:.2}",
sample.value, z_score, mean
),
sample_value: sample.value,
expected_range: (
mean - self.config.z_score_threshold * std_dev,
mean + self.config.z_score_threshold * std_dev,
),
detected_at: SystemTime::now(),
});
}
}
if let Some(rate_anomaly) = self.detect_rate_anomaly(peer_id, sample, &relevant_samples) {
return Some(rate_anomaly);
}
None
}
fn detect_rate_anomaly(
&self,
peer_id: &str,
sample: &BehaviorSample,
historical_samples: &[f64],
) -> Option<Anomaly> {
if historical_samples.len() < 10 {
return None;
}
let recent_avg = historical_samples.iter().rev().take(10).sum::<f64>() / 10.0;
if recent_avg > 0.0 {
let increase_ratio = sample.value / recent_avg;
if increase_ratio > self.config.max_rate_increase {
let severity = ((increase_ratio / self.config.max_rate_increase) - 1.0).min(1.0);
return Some(Anomaly {
peer_id: peer_id.to_string(),
anomaly_type: AnomalyType::RateAnomaly,
severity,
description: format!(
"Value {:.2} is {:.2}x the recent average {:.2}",
sample.value, increase_ratio, recent_avg
),
sample_value: sample.value,
expected_range: (0.0, recent_avg * self.config.max_rate_increase),
detected_at: SystemTime::now(),
});
}
}
None
}
fn record_anomaly(&mut self, anomaly: Anomaly) {
let peer_id = anomaly.peer_id.clone();
if let Some(history) = self.peer_history.get_mut(&peer_id) {
history.anomaly_count += 1;
history.last_anomaly = Some(SystemTime::now());
}
self.detected_anomalies.push(anomaly);
if self.detected_anomalies.len() > 10000 {
self.detected_anomalies.drain(0..1000);
}
}
#[must_use]
#[inline]
pub fn get_peer_anomalies(&self, peer_id: &str) -> Vec<Anomaly> {
self.detected_anomalies
.iter()
.filter(|a| a.peer_id == peer_id)
.cloned()
.collect()
}
#[must_use]
#[inline]
pub fn get_recent_anomalies(&self, limit: usize) -> Vec<Anomaly> {
self.detected_anomalies
.iter()
.rev()
.take(limit)
.cloned()
.collect()
}
#[must_use]
#[inline]
pub fn get_anomalies_by_type(&self, anomaly_type: AnomalyType) -> Vec<Anomaly> {
self.detected_anomalies
.iter()
.filter(|a| a.anomaly_type == anomaly_type)
.cloned()
.collect()
}
#[must_use]
#[inline]
pub fn get_severe_anomalies(&self, min_severity: f64) -> Vec<Anomaly> {
self.detected_anomalies
.iter()
.filter(|a| a.severity >= min_severity)
.cloned()
.collect()
}
#[must_use]
#[inline]
pub fn get_anomaly_count(&self, peer_id: &str) -> u64 {
self.peer_history
.get(peer_id)
.map(|h| h.anomaly_count)
.unwrap_or(0)
}
#[must_use]
#[inline]
pub fn has_recent_anomalies(&self, peer_id: &str, within: Duration) -> bool {
if let Some(history) = self.peer_history.get(peer_id) {
if let Some(last_anomaly) = history.last_anomaly {
if let Ok(duration) = SystemTime::now().duration_since(last_anomaly) {
return duration < within;
}
}
}
false
}
#[must_use]
#[inline]
pub fn get_anomaly_rate(&self, peer_id: &str) -> f64 {
if let Some(history) = self.peer_history.get(peer_id) {
if history.samples.is_empty() {
return 0.0;
}
history.anomaly_count as f64 / history.samples.len() as f64
} else {
0.0
}
}
#[must_use]
#[inline]
pub fn get_statistics(&self) -> AnomalyStats {
let total_anomalies = self.detected_anomalies.len();
let total_peers = self.peer_history.len();
let by_type = [
AnomalyType::StatisticalOutlier,
AnomalyType::RateAnomaly,
AnomalyType::PatternAnomaly,
AnomalyType::RangeAnomaly,
]
.iter()
.map(|t| {
let count = self
.detected_anomalies
.iter()
.filter(|a| a.anomaly_type == *t)
.count();
(format!("{:?}", t), count)
})
.collect();
let avg_severity = if total_anomalies > 0 {
self.detected_anomalies
.iter()
.map(|a| a.severity)
.sum::<f64>()
/ total_anomalies as f64
} else {
0.0
};
AnomalyStats {
total_anomalies,
total_peers,
anomalies_by_type: by_type,
average_severity: avg_severity,
}
}
fn cleanup_old_samples(&mut self, peer_id: &str) {
if let Some(history) = self.peer_history.get_mut(peer_id) {
let now = SystemTime::now();
let retention = Duration::from_secs(self.config.retention_secs);
history.samples.retain(|s| {
if let Ok(age) = now.duration_since(s.timestamp) {
age < retention
} else {
false
}
});
}
}
#[inline]
pub fn clear_peer(&mut self, peer_id: &str) {
self.peer_history.remove(peer_id);
self.detected_anomalies.retain(|a| a.peer_id != peer_id);
}
#[inline]
pub fn clear_anomalies(&mut self) {
self.detected_anomalies.clear();
}
#[must_use]
#[inline]
pub fn peer_count(&self) -> usize {
self.peer_history.len()
}
}
#[derive(Debug, Clone)]
pub struct AnomalyStats {
pub total_anomalies: usize,
pub total_peers: usize,
pub anomalies_by_type: HashMap<String, usize>,
pub average_severity: f64,
}
#[cfg(test)]
mod tests {
use super::*;
fn create_sample(value: f64, metric_type: &str) -> BehaviorSample {
BehaviorSample {
value,
timestamp: SystemTime::now(),
metric_type: metric_type.to_string(),
}
}
#[test]
fn test_statistical_outlier_detection() {
let config = DetectionConfig {
z_score_threshold: 2.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
let value = 100.0 + (i as f64 % 5.0);
detector.record_sample("peer1", create_sample(value, "bandwidth"));
}
let normal = create_sample(102.0, "bandwidth");
assert!(!detector.is_anomalous("peer1", &normal));
let outlier = create_sample(500.0, "bandwidth");
assert!(detector.is_anomalous("peer1", &outlier));
}
#[test]
fn test_rate_anomaly_detection() {
let config = DetectionConfig {
max_rate_increase: 3.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for _ in 0..50 {
detector.record_sample("peer1", create_sample(100.0, "proofs"));
}
let spike = create_sample(400.0, "proofs");
assert!(detector.is_anomalous("peer1", &spike));
}
#[test]
fn test_min_samples_requirement() {
let config = DetectionConfig {
min_samples: 30,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..20 {
detector.record_sample("peer1", create_sample(100.0 + i as f64, "bandwidth"));
}
let outlier = create_sample(1000.0, "bandwidth");
assert!(!detector.is_anomalous("peer1", &outlier));
}
#[test]
fn test_anomaly_counting() {
let config = DetectionConfig {
z_score_threshold: 2.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
}
let sample1 = create_sample(500.0, "bandwidth");
assert!(detector.is_anomalous("peer1", &sample1));
let sample2 = create_sample(600.0, "bandwidth");
assert!(detector.is_anomalous("peer1", &sample2));
assert_eq!(detector.get_anomaly_count("peer1"), 2);
}
#[test]
fn test_get_peer_anomalies() {
let config = DetectionConfig {
z_score_threshold: 2.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
detector.record_sample("peer2", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
}
let _ = detector.is_anomalous("peer1", &create_sample(500.0, "bandwidth"));
let _ = detector.is_anomalous("peer2", &create_sample(600.0, "bandwidth"));
let peer1_anomalies = detector.get_peer_anomalies("peer1");
assert_eq!(peer1_anomalies.len(), 1);
assert_eq!(peer1_anomalies[0].peer_id, "peer1");
}
#[test]
fn test_anomaly_types() {
let config = DetectionConfig::default();
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
}
let _ = detector.is_anomalous("peer1", &create_sample(500.0, "bandwidth"));
let outliers = detector.get_anomalies_by_type(AnomalyType::StatisticalOutlier);
assert!(!outliers.is_empty());
}
#[test]
fn test_severe_anomalies() {
let config = DetectionConfig {
z_score_threshold: 1.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for _ in 0..50 {
detector.record_sample("peer1", create_sample(100.0, "bandwidth"));
}
let _ = detector.is_anomalous("peer1", &create_sample(200.0, "bandwidth"));
let _ = detector.is_anomalous("peer1", &create_sample(1000.0, "bandwidth"));
let severe = detector.get_severe_anomalies(0.5);
assert!(!severe.is_empty());
}
#[test]
fn test_recent_anomalies() {
let config = DetectionConfig {
z_score_threshold: 2.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
}
let _ = detector.is_anomalous("peer1", &create_sample(500.0, "bandwidth"));
assert!(detector.has_recent_anomalies("peer1", Duration::from_secs(60)));
assert!(!detector.has_recent_anomalies("peer1", Duration::from_secs(0)));
}
#[test]
fn test_anomaly_rate() {
let config = DetectionConfig {
z_score_threshold: 2.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
}
let _ = detector.is_anomalous("peer1", &create_sample(500.0, "bandwidth"));
let _ = detector.is_anomalous("peer1", &create_sample(600.0, "bandwidth"));
let rate = detector.get_anomaly_rate("peer1");
assert!((rate - 2.0 / 50.0).abs() < 0.001);
}
#[test]
fn test_statistics() {
let config = DetectionConfig {
z_score_threshold: 2.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
detector.record_sample("peer2", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
}
let _ = detector.is_anomalous("peer1", &create_sample(500.0, "bandwidth"));
let _ = detector.is_anomalous("peer2", &create_sample(600.0, "bandwidth"));
let stats = detector.get_statistics();
assert_eq!(stats.total_anomalies, 2);
assert_eq!(stats.total_peers, 2);
}
#[test]
fn test_clear_peer() {
let config = DetectionConfig::default();
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
}
let _ = detector.is_anomalous("peer1", &create_sample(500.0, "bandwidth"));
assert_eq!(detector.peer_count(), 1);
assert_eq!(detector.get_anomaly_count("peer1"), 1);
detector.clear_peer("peer1");
assert_eq!(detector.peer_count(), 0);
assert_eq!(detector.get_anomaly_count("peer1"), 0);
}
#[test]
fn test_metric_type_isolation() {
let config = DetectionConfig {
z_score_threshold: 2.0,
min_samples: 10,
..Default::default()
};
let mut detector = AnomalyDetector::new(config);
for i in 0..50 {
detector.record_sample("peer1", create_sample(100.0 + (i % 5) as f64, "bandwidth"));
detector.record_sample("peer1", create_sample(50.0 + (i % 3) as f64, "latency"));
}
let bandwidth_outlier = create_sample(500.0, "bandwidth");
assert!(detector.is_anomalous("peer1", &bandwidth_outlier));
let normal_latency = create_sample(51.0, "latency");
assert!(!detector.is_anomalous("peer1", &normal_latency));
}
}