use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use crate::brp_messages::EntityData;
use crate::error::{Error, Result};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[non_exhaustive]
pub enum AnomalyType {
PhysicsViolation,
PotentialMemoryLeak,
StateInconsistency,
PerformanceSpike,
EntityCountSpike,
RapidValueChange,
}
impl AnomalyType {
#[must_use]
pub fn description(&self) -> &'static str {
match self {
Self::PhysicsViolation => "Entity violating physics constraints",
Self::PotentialMemoryLeak => "Entity potentially consuming resources without purpose",
Self::StateInconsistency => "Entity has contradictory component values",
Self::PerformanceSpike => "System performance degradation detected",
Self::EntityCountSpike => "Abnormal increase in entity count",
Self::RapidValueChange => "Component value changing too rapidly",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Anomaly {
pub anomaly_type: AnomalyType,
pub entity_id: Option<u64>,
pub component: Option<String>,
pub severity: f32, pub description: String,
pub detected_at: chrono::DateTime<chrono::Utc>,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyConfig {
pub window_size: usize,
pub z_score_threshold: f32,
pub iqr_multiplier: f32,
pub min_samples: usize,
pub performance_threshold: f32,
pub entity_growth_threshold: f32,
pub whitelist: Vec<AnomalyPattern>,
}
impl Default for AnomalyConfig {
fn default() -> Self {
Self {
window_size: 100,
z_score_threshold: 3.0,
iqr_multiplier: 1.5,
min_samples: 10,
performance_threshold: 2.0,
entity_growth_threshold: 10.0,
whitelist: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyPattern {
pub anomaly_type: AnomalyType,
pub entity_pattern: Option<String>, pub threshold_override: Option<f32>,
}
#[derive(Debug, Clone)]
struct DataPoint {
value: f32,
timestamp: Instant,
}
pub struct RingBuffer<T> {
data: VecDeque<T>,
capacity: usize,
}
impl<T> RingBuffer<T> {
#[must_use]
pub fn new(capacity: usize) -> Self {
let capacity = capacity.max(1); Self {
data: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn push(&mut self, value: T) {
if self.data.len() >= self.capacity {
self.data.pop_front();
}
self.data.push_back(value);
}
pub fn values(&self) -> impl Iterator<Item = &T> {
self.data.iter()
}
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn clear(&mut self) {
self.data.clear();
}
}
pub struct Statistics;
impl Statistics {
#[must_use]
pub fn mean(values: &[f32]) -> f32 {
if values.is_empty() {
return 0.0;
}
values.iter().sum::<f32>() / values.len() as f32
}
#[must_use]
pub fn std_dev(values: &[f32]) -> f32 {
if values.len() < 2 {
return 0.0;
}
let mean = Self::mean(values);
let variance =
values.iter().map(|x| (x - mean).powi(2)).sum::<f32>() / (values.len() - 1) as f32;
if variance.is_finite() && variance >= 0.0 {
variance.sqrt()
} else {
0.0
}
}
#[must_use]
pub fn z_score(value: f32, mean: f32, std_dev: f32) -> f32 {
if std_dev == 0.0 || !std_dev.is_finite() || !value.is_finite() || !mean.is_finite() {
return 0.0;
}
(value - mean) / std_dev
}
#[must_use]
pub fn quartiles(values: &[f32]) -> (f32, f32, f32) {
if values.is_empty() {
return (0.0, 0.0, 0.0);
}
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let len = sorted.len();
let q1_idx = len / 4;
let q2_idx = len / 2;
let q3_idx = (3 * len) / 4;
(sorted[q1_idx], sorted[q2_idx], sorted[q3_idx.min(len - 1)])
}
#[must_use]
pub fn is_outlier_iqr(value: f32, values: &[f32], multiplier: f32) -> bool {
let (q1, _, q3) = Self::quartiles(values);
let iqr = q3 - q1;
let lower_bound = q1 - multiplier * iqr;
let upper_bound = q3 + multiplier * iqr;
value < lower_bound || value > upper_bound
}
}
pub trait AnomalyDetector: Send + Sync {
fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>>;
fn name(&self) -> &str;
fn configure(&mut self, config: &AnomalyConfig);
}
pub struct PhysicsDetector {
velocity_history: HashMap<u64, RingBuffer<DataPoint>>,
config: AnomalyConfig,
}
impl PhysicsDetector {
#[must_use]
pub fn new(config: AnomalyConfig) -> Self {
Self {
velocity_history: HashMap::new(),
config,
}
}
fn extract_velocity_magnitude(&self, entity: &EntityData) -> Option<f32> {
entity
.components
.get("Velocity")
.and_then(|v| v.get("linear"))
.and_then(|linear| {
let x = linear.get("x")?.as_f64()? as f32;
let y = linear.get("y")?.as_f64()? as f32;
let z = linear.get("z").and_then(|z| z.as_f64()).unwrap_or(0.0) as f32;
Some((x * x + y * y + z * z).sqrt())
})
}
}
impl AnomalyDetector for PhysicsDetector {
fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
let mut anomalies = Vec::new();
let now = Instant::now();
for entity in entities {
if let Some(velocity_mag) = self.extract_velocity_magnitude(entity) {
let history = self
.velocity_history
.entry(entity.id)
.or_insert_with(|| RingBuffer::new(self.config.window_size));
history.push(DataPoint {
value: velocity_mag,
timestamp: now,
});
if history.len() >= self.config.min_samples {
let values: Vec<f32> = history.values().map(|dp| dp.value).collect();
let mean = Statistics::mean(&values);
let std_dev = Statistics::std_dev(&values);
let z_score = Statistics::z_score(velocity_mag, mean, std_dev);
if z_score.abs() > self.config.z_score_threshold {
let severity = (z_score.abs() / self.config.z_score_threshold).min(1.0);
let metadata = [
("velocity_magnitude", serde_json::json!(velocity_mag)),
("mean_velocity", serde_json::json!(mean)),
("z_score", serde_json::json!(z_score)),
]
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
anomalies.push(Anomaly {
anomaly_type: AnomalyType::PhysicsViolation,
entity_id: Some(entity.id),
component: Some("Velocity".to_string()),
severity,
description: format!(
"Entity {} velocity magnitude {:.2} is {:.2} standard deviations from mean {:.2}",
entity.id, velocity_mag, z_score, mean
),
detected_at: chrono::Utc::now(),
metadata,
});
}
}
}
}
Ok(anomalies)
}
fn name(&self) -> &str {
"PhysicsDetector"
}
fn configure(&mut self, config: &AnomalyConfig) {
self.config = config.clone();
}
}
pub struct PerformanceDetector {
frame_times: RingBuffer<DataPoint>,
entity_counts: RingBuffer<DataPoint>,
config: AnomalyConfig,
last_entity_count: Option<usize>,
}
impl PerformanceDetector {
#[must_use]
pub fn new(config: AnomalyConfig) -> Self {
Self {
frame_times: RingBuffer::new(config.window_size),
entity_counts: RingBuffer::new(config.window_size),
config,
last_entity_count: None,
}
}
}
impl AnomalyDetector for PerformanceDetector {
fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
let mut anomalies = Vec::new();
let now = Instant::now();
let current_count = entities.len();
self.entity_counts.push(DataPoint {
value: current_count as f32,
timestamp: now,
});
if self.entity_counts.len() >= self.config.min_samples {
let values: Vec<f32> = self.entity_counts.values().map(|dp| dp.value).collect();
if let (Some(first), Some(last)) = (values.first(), values.last()) {
let growth_rate = (last - first) / self.config.window_size as f32;
if growth_rate > self.config.entity_growth_threshold {
let severity = (growth_rate / self.config.entity_growth_threshold).min(1.0);
let metadata = [
("growth_rate", serde_json::json!(growth_rate)),
("entity_count", serde_json::json!(current_count)),
]
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
anomalies.push(Anomaly {
anomaly_type: AnomalyType::EntityCountSpike,
entity_id: None,
component: None,
severity,
description: format!(
"Entity count growing at {:.2} entities per sample (threshold: {:.2})",
growth_rate, self.config.entity_growth_threshold
),
detected_at: chrono::Utc::now(),
metadata,
});
}
}
}
Ok(anomalies)
}
fn name(&self) -> &str {
"PerformanceDetector"
}
fn configure(&mut self, config: &AnomalyConfig) {
self.config = config.clone();
}
}
pub struct ConsistencyDetector {
config: AnomalyConfig,
}
impl ConsistencyDetector {
#[must_use]
pub fn new(config: AnomalyConfig) -> Self {
Self { config }
}
fn check_health_alive_consistency(&self, entity: &EntityData) -> Option<Anomaly> {
let health = entity
.components
.get("Health")
.and_then(|h| h.get("current"))
.and_then(|c| c.as_f64())? as f32;
let is_alive = entity
.components
.get("Alive")
.and_then(|a| a.as_bool())
.unwrap_or(true);
if health <= 0.0 && is_alive {
let metadata = [
("health", serde_json::json!(health)),
("alive", serde_json::json!(is_alive)),
]
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
return Some(Anomaly {
anomaly_type: AnomalyType::StateInconsistency,
entity_id: Some(entity.id),
component: Some("Health/Alive".to_string()),
severity: 0.9, description: format!(
"Entity {} has health {:.1} but is marked as alive",
entity.id, health
),
detected_at: chrono::Utc::now(),
metadata,
});
}
None
}
}
impl AnomalyDetector for ConsistencyDetector {
fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
let mut anomalies = Vec::new();
for entity in entities {
if let Some(anomaly) = self.check_health_alive_consistency(entity) {
anomalies.push(anomaly);
}
}
Ok(anomalies)
}
fn name(&self) -> &str {
"ConsistencyDetector"
}
fn configure(&mut self, config: &AnomalyConfig) {
self.config = config.clone();
}
}
pub struct AnomalyDetectionSystem {
detectors: Vec<Box<dyn AnomalyDetector>>,
config: AnomalyConfig,
monitoring_channel: Option<mpsc::Receiver<Vec<EntityData>>>,
anomaly_sender: Option<mpsc::Sender<Vec<Anomaly>>>,
}
impl AnomalyDetectionSystem {
#[must_use]
pub fn new(config: AnomalyConfig) -> Self {
let mut detectors: Vec<Box<dyn AnomalyDetector>> = Vec::new();
detectors.push(Box::new(PhysicsDetector::new(config.clone())));
detectors.push(Box::new(PerformanceDetector::new(config.clone())));
detectors.push(Box::new(ConsistencyDetector::new(config.clone())));
Self {
detectors,
config,
monitoring_channel: None,
anomaly_sender: None,
}
}
pub fn setup_channels(
&mut self,
) -> (mpsc::Sender<Vec<EntityData>>, mpsc::Receiver<Vec<Anomaly>>) {
let (entity_sender, entity_receiver) = mpsc::channel::<Vec<EntityData>>(100);
let (anomaly_sender, anomaly_receiver) = mpsc::channel::<Vec<Anomaly>>(100);
self.monitoring_channel = Some(entity_receiver);
self.anomaly_sender = Some(anomaly_sender);
(entity_sender, anomaly_receiver)
}
pub fn detect_anomalies(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
let mut all_anomalies = Vec::new();
for detector in &mut self.detectors {
match detector.detect(entities) {
Ok(mut anomalies) => {
debug!("{} detected {} anomalies", detector.name(), anomalies.len());
all_anomalies.append(&mut anomalies);
}
Err(e) => {
warn!("Detector {} failed: {}", detector.name(), e);
}
}
}
all_anomalies = self.filter_whitelisted(all_anomalies);
all_anomalies.sort_by(|a, b| {
b.severity
.partial_cmp(&a.severity)
.unwrap_or(std::cmp::Ordering::Equal)
});
info!("Detected {} anomalies total", all_anomalies.len());
Ok(all_anomalies)
}
pub async fn start_monitoring(mut self) -> Result<()> {
let mut receiver = self
.monitoring_channel
.take()
.ok_or_else(|| Error::Brp("Monitoring channel not set up".to_string()))?;
let anomaly_sender = self
.anomaly_sender
.take()
.ok_or_else(|| Error::Brp("Anomaly sender not set up".to_string()))?;
info!("Starting anomaly detection monitoring loop");
while let Some(entities) = receiver.recv().await {
match self.detect_anomalies(&entities) {
Ok(anomalies) => {
if !anomalies.is_empty() {
if let Err(e) = anomaly_sender.send(anomalies).await {
warn!("Failed to send anomalies: {}", e);
break;
}
}
}
Err(e) => {
warn!("Anomaly detection failed: {}", e);
}
}
}
info!("Anomaly detection monitoring loop ended");
Ok(())
}
fn filter_whitelisted(&self, anomalies: Vec<Anomaly>) -> Vec<Anomaly> {
anomalies
.into_iter()
.filter(|anomaly| !self.is_whitelisted(anomaly))
.collect()
}
fn is_whitelisted(&self, anomaly: &Anomaly) -> bool {
self.config
.whitelist
.iter()
.any(|pattern| pattern.anomaly_type == anomaly.anomaly_type)
}
pub fn update_config(&mut self, config: AnomalyConfig) {
for detector in &mut self.detectors {
detector.configure(&config);
}
self.config = config;
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_ring_buffer() {
let mut buffer = RingBuffer::new(3);
assert!(buffer.is_empty());
buffer.push(1);
buffer.push(2);
buffer.push(3);
assert_eq!(buffer.len(), 3);
buffer.push(4);
let values: Vec<_> = buffer.values().cloned().collect();
assert_eq!(values, vec![2, 3, 4]);
}
#[test]
fn test_statistics() {
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
assert_eq!(Statistics::mean(&values), 3.0);
let std_dev = Statistics::std_dev(&values);
assert!((std_dev - 1.58).abs() < 0.01);
let z_score = Statistics::z_score(6.0, 3.0, std_dev);
assert!((z_score - 1.9).abs() < 0.1);
}
#[test]
fn test_quartiles() {
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
let (q1, q2, q3) = Statistics::quartiles(&values);
assert!(q1 <= q2);
assert!(q2 <= q3);
assert!(q1 >= 1.0 && q1 <= 3.0);
assert!(q2 >= 3.0 && q2 <= 5.0);
assert!(q3 >= 5.0 && q3 <= 8.0);
}
#[test]
fn test_physics_detector() {
let config = AnomalyConfig::default();
let mut detector = PhysicsDetector::new(config);
let entity = EntityData {
id: 1,
components: [(
"Velocity".to_string(),
json!({
"linear": {"x": 100.0, "y": 0.0, "z": 0.0}
}),
)]
.into_iter()
.collect(),
};
let anomalies = detector.detect(&[entity.clone()]).unwrap();
assert!(anomalies.is_empty());
for _ in 0..15 {
let _ = detector.detect(&[entity.clone()]);
}
let extreme_entity = EntityData {
id: 1,
components: [(
"Velocity".to_string(),
json!({
"linear": {"x": 1000.0, "y": 0.0, "z": 0.0}
}),
)]
.into_iter()
.collect(),
};
let anomalies = detector.detect(&[extreme_entity]).unwrap();
assert!(!anomalies.is_empty());
assert_eq!(anomalies[0].anomaly_type, AnomalyType::PhysicsViolation);
}
#[test]
fn test_consistency_detector() {
let config = AnomalyConfig::default();
let mut detector = ConsistencyDetector::new(config);
let entity = EntityData {
id: 1,
components: [
("Health".to_string(), json!({"current": -10.0})),
("Alive".to_string(), json!(true)),
]
.into_iter()
.collect(),
};
let anomalies = detector.detect(&[entity]).unwrap();
assert!(!anomalies.is_empty());
assert_eq!(anomalies[0].anomaly_type, AnomalyType::StateInconsistency);
assert!(anomalies[0].severity > 0.8);
}
#[test]
fn test_performance_detector() {
let config = AnomalyConfig {
entity_growth_threshold: 0.5, min_samples: 5, window_size: 10, ..Default::default()
};
let mut detector = PerformanceDetector::new(config);
for i in 1..=10 {
let entities: Vec<EntityData> = (0..i)
.map(|id| EntityData {
id: id as u64,
components: HashMap::new(),
})
.collect();
let _anomalies = detector.detect(&entities).unwrap();
}
let entities: Vec<EntityData> = (0..50)
.map(|id| EntityData {
id: id as u64,
components: HashMap::new(),
})
.collect();
let anomalies = detector.detect(&entities).unwrap();
assert!(
!anomalies.is_empty(),
"Expected entity count spike anomaly but none found"
);
assert_eq!(anomalies[0].anomaly_type, AnomalyType::EntityCountSpike);
}
#[test]
fn test_anomaly_detection_system() {
let config = AnomalyConfig::default();
let mut system = AnomalyDetectionSystem::new(config);
let entities = vec![
EntityData {
id: 1,
components: [
("Health".to_string(), json!({"current": -5.0})),
("Alive".to_string(), json!(true)),
]
.into_iter()
.collect(),
},
EntityData {
id: 2,
components: [(
"Velocity".to_string(),
json!({
"linear": {"x": 10.0, "y": 0.0, "z": 0.0}
}),
)]
.into_iter()
.collect(),
},
];
let anomalies = system.detect_anomalies(&entities).unwrap();
assert!(!anomalies.is_empty());
assert!(anomalies
.iter()
.any(|a| a.anomaly_type == AnomalyType::StateInconsistency));
}
}