1use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::time::Instant;
5use tokio::sync::mpsc;
6use tracing::{debug, info, warn};
7
8use crate::brp_messages::EntityData;
9use crate::error::{Error, Result};
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13#[non_exhaustive]
14pub enum AnomalyType {
15 PhysicsViolation,
17 PotentialMemoryLeak,
19 StateInconsistency,
21 PerformanceSpike,
23 EntityCountSpike,
25 RapidValueChange,
27}
28
29impl AnomalyType {
30 #[must_use]
32 pub fn description(&self) -> &'static str {
33 match self {
34 Self::PhysicsViolation => "Entity violating physics constraints",
35 Self::PotentialMemoryLeak => "Entity potentially consuming resources without purpose",
36 Self::StateInconsistency => "Entity has contradictory component values",
37 Self::PerformanceSpike => "System performance degradation detected",
38 Self::EntityCountSpike => "Abnormal increase in entity count",
39 Self::RapidValueChange => "Component value changing too rapidly",
40 }
41 }
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct Anomaly {
47 pub anomaly_type: AnomalyType,
48 pub entity_id: Option<u64>,
49 pub component: Option<String>,
50 pub severity: f32, pub description: String,
52 pub detected_at: chrono::DateTime<chrono::Utc>,
53 pub metadata: HashMap<String, serde_json::Value>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct AnomalyConfig {
59 pub window_size: usize,
61 pub z_score_threshold: f32,
63 pub iqr_multiplier: f32,
65 pub min_samples: usize,
67 pub performance_threshold: f32,
69 pub entity_growth_threshold: f32,
71 pub whitelist: Vec<AnomalyPattern>,
73}
74
75impl Default for AnomalyConfig {
76 fn default() -> Self {
77 Self {
78 window_size: 100,
79 z_score_threshold: 3.0,
80 iqr_multiplier: 1.5,
81 min_samples: 10,
82 performance_threshold: 2.0,
83 entity_growth_threshold: 10.0,
84 whitelist: Vec::new(),
85 }
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct AnomalyPattern {
92 pub anomaly_type: AnomalyType,
93 pub entity_pattern: Option<String>, pub threshold_override: Option<f32>,
95}
96
97#[derive(Debug, Clone)]
99struct DataPoint {
100 value: f32,
101 timestamp: Instant,
102}
103
104pub struct RingBuffer<T> {
106 data: VecDeque<T>,
107 capacity: usize,
108}
109
110impl<T> RingBuffer<T> {
111 #[must_use]
113 pub fn new(capacity: usize) -> Self {
114 let capacity = capacity.max(1); Self {
116 data: VecDeque::with_capacity(capacity),
117 capacity,
118 }
119 }
120
121 pub fn push(&mut self, value: T) {
123 if self.data.len() >= self.capacity {
124 self.data.pop_front();
125 }
126 self.data.push_back(value);
127 }
128
129 pub fn values(&self) -> impl Iterator<Item = &T> {
131 self.data.iter()
132 }
133
134 #[must_use]
136 pub fn len(&self) -> usize {
137 self.data.len()
138 }
139
140 #[must_use]
142 pub fn is_empty(&self) -> bool {
143 self.data.is_empty()
144 }
145
146 pub fn clear(&mut self) {
148 self.data.clear();
149 }
150}
151
152pub struct Statistics;
154
155impl Statistics {
156 #[must_use]
158 pub fn mean(values: &[f32]) -> f32 {
159 if values.is_empty() {
160 return 0.0;
161 }
162 values.iter().sum::<f32>() / values.len() as f32
163 }
164
165 #[must_use]
167 pub fn std_dev(values: &[f32]) -> f32 {
168 if values.len() < 2 {
169 return 0.0;
170 }
171 let mean = Self::mean(values);
172 let variance =
173 values.iter().map(|x| (x - mean).powi(2)).sum::<f32>() / (values.len() - 1) as f32;
174 if variance.is_finite() && variance >= 0.0 {
175 variance.sqrt()
176 } else {
177 0.0
178 }
179 }
180
181 #[must_use]
183 pub fn z_score(value: f32, mean: f32, std_dev: f32) -> f32 {
184 if std_dev == 0.0 || !std_dev.is_finite() || !value.is_finite() || !mean.is_finite() {
185 return 0.0;
186 }
187 (value - mean) / std_dev
188 }
189
190 #[must_use]
192 pub fn quartiles(values: &[f32]) -> (f32, f32, f32) {
193 if values.is_empty() {
194 return (0.0, 0.0, 0.0);
195 }
196
197 let mut sorted = values.to_vec();
198 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
199
200 let len = sorted.len();
201 let q1_idx = len / 4;
202 let q2_idx = len / 2;
203 let q3_idx = (3 * len) / 4;
204
205 (sorted[q1_idx], sorted[q2_idx], sorted[q3_idx.min(len - 1)])
206 }
207
208 #[must_use]
210 pub fn is_outlier_iqr(value: f32, values: &[f32], multiplier: f32) -> bool {
211 let (q1, _, q3) = Self::quartiles(values);
212 let iqr = q3 - q1;
213 let lower_bound = q1 - multiplier * iqr;
214 let upper_bound = q3 + multiplier * iqr;
215 value < lower_bound || value > upper_bound
216 }
217}
218
219pub trait AnomalyDetector: Send + Sync {
221 fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>>;
223
224 fn name(&self) -> &str;
226
227 fn configure(&mut self, config: &AnomalyConfig);
229}
230
231pub struct PhysicsDetector {
233 velocity_history: HashMap<u64, RingBuffer<DataPoint>>,
234 config: AnomalyConfig,
235}
236
237impl PhysicsDetector {
238 #[must_use]
240 pub fn new(config: AnomalyConfig) -> Self {
241 Self {
242 velocity_history: HashMap::new(),
243 config,
244 }
245 }
246
247 fn extract_velocity_magnitude(&self, entity: &EntityData) -> Option<f32> {
248 entity
249 .components
250 .get("Velocity")
251 .and_then(|v| v.get("linear"))
252 .and_then(|linear| {
253 let x = linear.get("x")?.as_f64()? as f32;
254 let y = linear.get("y")?.as_f64()? as f32;
255 let z = linear.get("z").and_then(|z| z.as_f64()).unwrap_or(0.0) as f32;
256 Some((x * x + y * y + z * z).sqrt())
257 })
258 }
259}
260
261impl AnomalyDetector for PhysicsDetector {
262 fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
263 let mut anomalies = Vec::new();
264 let now = Instant::now();
265
266 for entity in entities {
267 if let Some(velocity_mag) = self.extract_velocity_magnitude(entity) {
268 let history = self
269 .velocity_history
270 .entry(entity.id)
271 .or_insert_with(|| RingBuffer::new(self.config.window_size));
272
273 history.push(DataPoint {
274 value: velocity_mag,
275 timestamp: now,
276 });
277
278 if history.len() >= self.config.min_samples {
280 let values: Vec<f32> = history.values().map(|dp| dp.value).collect();
281 let mean = Statistics::mean(&values);
282 let std_dev = Statistics::std_dev(&values);
283 let z_score = Statistics::z_score(velocity_mag, mean, std_dev);
284
285 if z_score.abs() > self.config.z_score_threshold {
286 let severity = (z_score.abs() / self.config.z_score_threshold).min(1.0);
287
288 let metadata = [
289 ("velocity_magnitude", serde_json::json!(velocity_mag)),
290 ("mean_velocity", serde_json::json!(mean)),
291 ("z_score", serde_json::json!(z_score)),
292 ]
293 .into_iter()
294 .map(|(k, v)| (k.to_string(), v))
295 .collect();
296
297 anomalies.push(Anomaly {
298 anomaly_type: AnomalyType::PhysicsViolation,
299 entity_id: Some(entity.id),
300 component: Some("Velocity".to_string()),
301 severity,
302 description: format!(
303 "Entity {} velocity magnitude {:.2} is {:.2} standard deviations from mean {:.2}",
304 entity.id, velocity_mag, z_score, mean
305 ),
306 detected_at: chrono::Utc::now(),
307 metadata,
308 });
309 }
310 }
311 }
312 }
313
314 Ok(anomalies)
315 }
316
317 fn name(&self) -> &str {
318 "PhysicsDetector"
319 }
320
321 fn configure(&mut self, config: &AnomalyConfig) {
322 self.config = config.clone();
323 }
324}
325
326pub struct PerformanceDetector {
328 frame_times: RingBuffer<DataPoint>,
329 entity_counts: RingBuffer<DataPoint>,
330 config: AnomalyConfig,
331 last_entity_count: Option<usize>,
332}
333
334impl PerformanceDetector {
335 #[must_use]
337 pub fn new(config: AnomalyConfig) -> Self {
338 Self {
339 frame_times: RingBuffer::new(config.window_size),
340 entity_counts: RingBuffer::new(config.window_size),
341 config,
342 last_entity_count: None,
343 }
344 }
345}
346
347impl AnomalyDetector for PerformanceDetector {
348 fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
349 let mut anomalies = Vec::new();
350 let now = Instant::now();
351
352 let current_count = entities.len();
354 self.entity_counts.push(DataPoint {
355 value: current_count as f32,
356 timestamp: now,
357 });
358
359 if self.entity_counts.len() >= self.config.min_samples {
360 let values: Vec<f32> = self.entity_counts.values().map(|dp| dp.value).collect();
361
362 if let (Some(first), Some(last)) = (values.first(), values.last()) {
364 let growth_rate = (last - first) / self.config.window_size as f32;
365
366 if growth_rate > self.config.entity_growth_threshold {
367 let severity = (growth_rate / self.config.entity_growth_threshold).min(1.0);
368
369 let metadata = [
370 ("growth_rate", serde_json::json!(growth_rate)),
371 ("entity_count", serde_json::json!(current_count)),
372 ]
373 .into_iter()
374 .map(|(k, v)| (k.to_string(), v))
375 .collect();
376
377 anomalies.push(Anomaly {
378 anomaly_type: AnomalyType::EntityCountSpike,
379 entity_id: None,
380 component: None,
381 severity,
382 description: format!(
383 "Entity count growing at {:.2} entities per sample (threshold: {:.2})",
384 growth_rate, self.config.entity_growth_threshold
385 ),
386 detected_at: chrono::Utc::now(),
387 metadata,
388 });
389 }
390 }
391 }
392
393 Ok(anomalies)
394 }
395
396 fn name(&self) -> &str {
397 "PerformanceDetector"
398 }
399
400 fn configure(&mut self, config: &AnomalyConfig) {
401 self.config = config.clone();
402 }
403}
404
405pub struct ConsistencyDetector {
407 config: AnomalyConfig,
408}
409
410impl ConsistencyDetector {
411 #[must_use]
413 pub fn new(config: AnomalyConfig) -> Self {
414 Self { config }
415 }
416
417 fn check_health_alive_consistency(&self, entity: &EntityData) -> Option<Anomaly> {
418 let health = entity
419 .components
420 .get("Health")
421 .and_then(|h| h.get("current"))
422 .and_then(|c| c.as_f64())? as f32;
423
424 let is_alive = entity
425 .components
426 .get("Alive")
427 .and_then(|a| a.as_bool())
428 .unwrap_or(true);
429
430 if health <= 0.0 && is_alive {
431 let metadata = [
432 ("health", serde_json::json!(health)),
433 ("alive", serde_json::json!(is_alive)),
434 ]
435 .into_iter()
436 .map(|(k, v)| (k.to_string(), v))
437 .collect();
438
439 return Some(Anomaly {
440 anomaly_type: AnomalyType::StateInconsistency,
441 entity_id: Some(entity.id),
442 component: Some("Health/Alive".to_string()),
443 severity: 0.9, description: format!(
445 "Entity {} has health {:.1} but is marked as alive",
446 entity.id, health
447 ),
448 detected_at: chrono::Utc::now(),
449 metadata,
450 });
451 }
452
453 None
454 }
455}
456
457impl AnomalyDetector for ConsistencyDetector {
458 fn detect(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
459 let mut anomalies = Vec::new();
460
461 for entity in entities {
462 if let Some(anomaly) = self.check_health_alive_consistency(entity) {
463 anomalies.push(anomaly);
464 }
465 }
466
467 Ok(anomalies)
468 }
469
470 fn name(&self) -> &str {
471 "ConsistencyDetector"
472 }
473
474 fn configure(&mut self, config: &AnomalyConfig) {
475 self.config = config.clone();
476 }
477}
478
479pub struct AnomalyDetectionSystem {
481 detectors: Vec<Box<dyn AnomalyDetector>>,
482 config: AnomalyConfig,
483 monitoring_channel: Option<mpsc::Receiver<Vec<EntityData>>>,
484 anomaly_sender: Option<mpsc::Sender<Vec<Anomaly>>>,
485}
486
487impl AnomalyDetectionSystem {
488 #[must_use]
490 pub fn new(config: AnomalyConfig) -> Self {
491 let mut detectors: Vec<Box<dyn AnomalyDetector>> = Vec::new();
492
493 detectors.push(Box::new(PhysicsDetector::new(config.clone())));
494 detectors.push(Box::new(PerformanceDetector::new(config.clone())));
495 detectors.push(Box::new(ConsistencyDetector::new(config.clone())));
496
497 Self {
498 detectors,
499 config,
500 monitoring_channel: None,
501 anomaly_sender: None,
502 }
503 }
504
505 pub fn setup_channels(
507 &mut self,
508 ) -> (mpsc::Sender<Vec<EntityData>>, mpsc::Receiver<Vec<Anomaly>>) {
509 let (entity_sender, entity_receiver) = mpsc::channel::<Vec<EntityData>>(100);
510 let (anomaly_sender, anomaly_receiver) = mpsc::channel::<Vec<Anomaly>>(100);
511
512 self.monitoring_channel = Some(entity_receiver);
513 self.anomaly_sender = Some(anomaly_sender);
514
515 (entity_sender, anomaly_receiver)
516 }
517
518 pub fn detect_anomalies(&mut self, entities: &[EntityData]) -> Result<Vec<Anomaly>> {
520 let mut all_anomalies = Vec::new();
521
522 for detector in &mut self.detectors {
523 match detector.detect(entities) {
524 Ok(mut anomalies) => {
525 debug!("{} detected {} anomalies", detector.name(), anomalies.len());
526 all_anomalies.append(&mut anomalies);
527 }
528 Err(e) => {
529 warn!("Detector {} failed: {}", detector.name(), e);
530 }
531 }
532 }
533
534 all_anomalies = self.filter_whitelisted(all_anomalies);
536
537 all_anomalies.sort_by(|a, b| {
539 b.severity
540 .partial_cmp(&a.severity)
541 .unwrap_or(std::cmp::Ordering::Equal)
542 });
543
544 info!("Detected {} anomalies total", all_anomalies.len());
545 Ok(all_anomalies)
546 }
547
548 pub async fn start_monitoring(mut self) -> Result<()> {
550 let mut receiver = self
551 .monitoring_channel
552 .take()
553 .ok_or_else(|| Error::Brp("Monitoring channel not set up".to_string()))?;
554
555 let anomaly_sender = self
556 .anomaly_sender
557 .take()
558 .ok_or_else(|| Error::Brp("Anomaly sender not set up".to_string()))?;
559
560 info!("Starting anomaly detection monitoring loop");
561
562 while let Some(entities) = receiver.recv().await {
563 match self.detect_anomalies(&entities) {
564 Ok(anomalies) => {
565 if !anomalies.is_empty() {
566 if let Err(e) = anomaly_sender.send(anomalies).await {
567 warn!("Failed to send anomalies: {}", e);
568 break;
569 }
570 }
571 }
572 Err(e) => {
573 warn!("Anomaly detection failed: {}", e);
574 }
575 }
576 }
577
578 info!("Anomaly detection monitoring loop ended");
579 Ok(())
580 }
581
582 fn filter_whitelisted(&self, anomalies: Vec<Anomaly>) -> Vec<Anomaly> {
583 anomalies
584 .into_iter()
585 .filter(|anomaly| !self.is_whitelisted(anomaly))
586 .collect()
587 }
588
589 fn is_whitelisted(&self, anomaly: &Anomaly) -> bool {
590 self.config
593 .whitelist
594 .iter()
595 .any(|pattern| pattern.anomaly_type == anomaly.anomaly_type)
596 }
597
598 pub fn update_config(&mut self, config: AnomalyConfig) {
600 for detector in &mut self.detectors {
601 detector.configure(&config);
602 }
603 self.config = config;
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610 use serde_json::json;
611
612 #[test]
613 fn test_ring_buffer() {
614 let mut buffer = RingBuffer::new(3);
615
616 assert!(buffer.is_empty());
617
618 buffer.push(1);
619 buffer.push(2);
620 buffer.push(3);
621
622 assert_eq!(buffer.len(), 3);
623
624 buffer.push(4); let values: Vec<_> = buffer.values().cloned().collect();
627 assert_eq!(values, vec![2, 3, 4]);
628 }
629
630 #[test]
631 fn test_statistics() {
632 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
633
634 assert_eq!(Statistics::mean(&values), 3.0);
635
636 let std_dev = Statistics::std_dev(&values);
637 assert!((std_dev - 1.58).abs() < 0.01);
638
639 let z_score = Statistics::z_score(6.0, 3.0, std_dev);
640 assert!((z_score - 1.9).abs() < 0.1);
641 }
642
643 #[test]
644 fn test_quartiles() {
645 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
646 let (q1, q2, q3) = Statistics::quartiles(&values);
647
648 assert!(q1 <= q2);
651 assert!(q2 <= q3);
652 assert!(q1 >= 1.0 && q1 <= 3.0);
653 assert!(q2 >= 3.0 && q2 <= 5.0);
654 assert!(q3 >= 5.0 && q3 <= 8.0);
655 }
656
657 #[test]
658 fn test_physics_detector() {
659 let config = AnomalyConfig::default();
660 let mut detector = PhysicsDetector::new(config);
661
662 let entity = EntityData {
664 id: 1,
665 components: [(
666 "Velocity".to_string(),
667 json!({
668 "linear": {"x": 100.0, "y": 0.0, "z": 0.0}
669 }),
670 )]
671 .into_iter()
672 .collect(),
673 };
674
675 let anomalies = detector.detect(&[entity.clone()]).unwrap();
677 assert!(anomalies.is_empty());
678
679 for _ in 0..15 {
681 let _ = detector.detect(&[entity.clone()]);
682 }
683
684 let extreme_entity = EntityData {
686 id: 1,
687 components: [(
688 "Velocity".to_string(),
689 json!({
690 "linear": {"x": 1000.0, "y": 0.0, "z": 0.0}
691 }),
692 )]
693 .into_iter()
694 .collect(),
695 };
696
697 let anomalies = detector.detect(&[extreme_entity]).unwrap();
698 assert!(!anomalies.is_empty());
699 assert_eq!(anomalies[0].anomaly_type, AnomalyType::PhysicsViolation);
700 }
701
702 #[test]
703 fn test_consistency_detector() {
704 let config = AnomalyConfig::default();
705 let mut detector = ConsistencyDetector::new(config);
706
707 let entity = EntityData {
709 id: 1,
710 components: [
711 ("Health".to_string(), json!({"current": -10.0})),
712 ("Alive".to_string(), json!(true)),
713 ]
714 .into_iter()
715 .collect(),
716 };
717
718 let anomalies = detector.detect(&[entity]).unwrap();
719 assert!(!anomalies.is_empty());
720 assert_eq!(anomalies[0].anomaly_type, AnomalyType::StateInconsistency);
721 assert!(anomalies[0].severity > 0.8);
722 }
723
724 #[test]
725 fn test_performance_detector() {
726 let config = AnomalyConfig {
727 entity_growth_threshold: 0.5, min_samples: 5, window_size: 10, ..Default::default()
731 };
732 let mut detector = PerformanceDetector::new(config);
733
734 for i in 1..=10 {
737 let entities: Vec<EntityData> = (0..i)
738 .map(|id| EntityData {
739 id: id as u64,
740 components: HashMap::new(),
741 })
742 .collect();
743
744 let _anomalies = detector.detect(&entities).unwrap();
745 }
746
747 let entities: Vec<EntityData> = (0..50)
749 .map(|id| EntityData {
750 id: id as u64,
751 components: HashMap::new(),
752 })
753 .collect();
754
755 let anomalies = detector.detect(&entities).unwrap();
756 assert!(
757 !anomalies.is_empty(),
758 "Expected entity count spike anomaly but none found"
759 );
760 assert_eq!(anomalies[0].anomaly_type, AnomalyType::EntityCountSpike);
761 }
762
763 #[test]
764 fn test_anomaly_detection_system() {
765 let config = AnomalyConfig::default();
766 let mut system = AnomalyDetectionSystem::new(config);
767
768 let entities = vec![
770 EntityData {
771 id: 1,
772 components: [
773 ("Health".to_string(), json!({"current": -5.0})),
774 ("Alive".to_string(), json!(true)),
775 ]
776 .into_iter()
777 .collect(),
778 },
779 EntityData {
780 id: 2,
781 components: [(
782 "Velocity".to_string(),
783 json!({
784 "linear": {"x": 10.0, "y": 0.0, "z": 0.0}
785 }),
786 )]
787 .into_iter()
788 .collect(),
789 },
790 ];
791
792 let anomalies = system.detect_anomalies(&entities).unwrap();
793
794 assert!(!anomalies.is_empty());
796 assert!(anomalies
797 .iter()
798 .any(|a| a.anomaly_type == AnomalyType::StateInconsistency));
799 }
800}