use super::{Collector, MetricValue, Metrics};
use anyhow::Result;
use std::collections::{HashMap, VecDeque};
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct AnomalyRecord {
pub syscall: String,
pub duration_us: u64,
pub z_score: f32,
pub if_score: Option<f32>,
pub timestamp: Instant,
pub source_file: Option<String>,
pub source_line: Option<u32>,
pub pid: i32,
}
impl AnomalyRecord {
pub fn new(syscall: &str, duration_us: u64, z_score: f32) -> Self {
Self {
syscall: syscall.to_string(),
duration_us,
z_score,
if_score: None,
timestamp: Instant::now(),
source_file: None,
source_line: None,
pid: 0,
}
}
pub fn with_if_score(mut self, score: f32) -> Self {
self.if_score = Some(score);
self
}
pub fn with_source(mut self, file: &str, line: u32) -> Self {
self.source_file = Some(file.to_string());
self.source_line = Some(line);
self
}
pub fn with_pid(mut self, pid: i32) -> Self {
self.pid = pid;
self
}
pub fn is_high_severity(&self) -> bool {
self.z_score > 5.0
}
}
#[derive(Debug, Clone)]
pub struct OnlineStats {
count: u64,
mean: f64,
m2: f64,
}
impl Default for OnlineStats {
fn default() -> Self {
Self::new()
}
}
impl OnlineStats {
pub fn new() -> Self {
Self { count: 0, mean: 0.0, m2: 0.0 }
}
pub fn update(&mut self, value: f64) {
self.count += 1;
let delta = value - self.mean;
self.mean += delta / self.count as f64;
let delta2 = value - self.mean;
self.m2 += delta * delta2;
}
pub fn mean(&self) -> f64 {
self.mean
}
pub fn variance(&self) -> f64 {
if self.count < 2 {
0.0
} else {
self.m2 / (self.count - 1) as f64
}
}
pub fn stddev(&self) -> f64 {
self.variance().sqrt()
}
pub fn z_score(&self, value: f64) -> f64 {
let std = self.stddev();
if std == 0.0 || self.count < 10 {
0.0 } else {
(value - self.mean) / std
}
}
pub fn count(&self) -> u64 {
self.count
}
pub fn reset(&mut self) {
self.count = 0;
self.mean = 0.0;
self.m2 = 0.0;
}
}
pub struct AnomalyCollector {
stats: HashMap<String, OnlineStats>,
threshold: f32,
anomalies: VecDeque<AnomalyRecord>,
max_anomalies: usize,
total_anomalies: u64,
avg_z_score: f64,
z_score_sum: f64,
z_score_count: u64,
available: bool,
}
impl AnomalyCollector {
pub fn new() -> Self {
Self::with_threshold(3.0)
}
pub fn with_threshold(threshold: f32) -> Self {
Self {
stats: HashMap::new(),
threshold,
anomalies: VecDeque::with_capacity(100),
max_anomalies: 100,
total_anomalies: 0,
avg_z_score: 0.0,
z_score_sum: 0.0,
z_score_count: 0,
available: true,
}
}
pub fn process(
&mut self,
syscall: &str,
duration_us: u64,
source_file: Option<&str>,
source_line: Option<u32>,
pid: i32,
) -> (f64, Option<AnomalyRecord>) {
let stats = self.stats.entry(syscall.to_string()).or_default();
let z_score = stats.z_score(duration_us as f64);
stats.update(duration_us as f64);
self.z_score_sum += z_score.abs();
self.z_score_count += 1;
self.avg_z_score = self.z_score_sum / self.z_score_count as f64;
if z_score.abs() > self.threshold as f64 {
let mut record = AnomalyRecord::new(syscall, duration_us, z_score as f32).with_pid(pid);
if let (Some(file), Some(line)) = (source_file, source_line) {
record = record.with_source(file, line);
}
if self.anomalies.len() >= self.max_anomalies {
self.anomalies.pop_front();
}
self.anomalies.push_back(record.clone());
self.total_anomalies += 1;
(z_score, Some(record))
} else {
(z_score, None)
}
}
pub fn anomalies(&self) -> &VecDeque<AnomalyRecord> {
&self.anomalies
}
pub fn total_count(&self) -> u64 {
self.total_anomalies
}
pub fn avg_z_score(&self) -> f64 {
self.avg_z_score
}
pub fn threshold(&self) -> f32 {
self.threshold
}
pub fn get_stats(&self, syscall: &str) -> Option<&OnlineStats> {
self.stats.get(syscall)
}
pub fn clear_anomalies(&mut self) {
self.anomalies.clear();
}
}
impl Default for AnomalyCollector {
fn default() -> Self {
Self::new()
}
}
impl Collector for AnomalyCollector {
fn collect(&mut self) -> Result<Metrics> {
let mut values = HashMap::new();
values
.insert("anomaly.total.count".to_string(), MetricValue::Counter(self.total_anomalies));
values.insert("anomaly.avg_z_score".to_string(), MetricValue::Gauge(self.avg_z_score));
values.insert("anomaly.threshold".to_string(), MetricValue::Gauge(self.threshold as f64));
values.insert(
"anomaly.recent.count".to_string(),
MetricValue::Gauge(self.anomalies.len() as f64),
);
let high_severity = self.anomalies.iter().filter(|a| a.is_high_severity()).count();
values.insert(
"anomaly.high_severity.count".to_string(),
MetricValue::Gauge(high_severity as f64),
);
Ok(Metrics::new(values))
}
fn is_available(&self) -> bool {
self.available
}
fn name(&self) -> &'static str {
"anomaly"
}
fn reset(&mut self) {
self.stats.clear();
self.anomalies.clear();
self.total_anomalies = 0;
self.avg_z_score = 0.0;
self.z_score_sum = 0.0;
self.z_score_count = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_anomaly_record_new() {
let record = AnomalyRecord::new("read", 1000, 4.5);
assert_eq!(record.syscall, "read");
assert_eq!(record.duration_us, 1000);
assert!((record.z_score - 4.5).abs() < f32::EPSILON);
assert!(!record.is_high_severity());
}
#[test]
fn test_anomaly_record_high_severity() {
let record = AnomalyRecord::new("write", 5000, 5.5);
assert!(record.is_high_severity());
}
#[test]
fn test_anomaly_record_with_source() {
let record = AnomalyRecord::new("open", 100, 3.5).with_source("main.c", 42);
assert_eq!(record.source_file, Some("main.c".to_string()));
assert_eq!(record.source_line, Some(42));
}
#[test]
fn test_online_stats_welford() {
let mut stats = OnlineStats::new();
stats.update(10.0);
stats.update(20.0);
stats.update(30.0);
assert_eq!(stats.count(), 3);
assert!((stats.mean() - 20.0).abs() < f64::EPSILON);
assert!((stats.variance() - 100.0).abs() < f64::EPSILON);
assert!((stats.stddev() - 10.0).abs() < f64::EPSILON);
}
#[test]
fn test_online_stats_z_score() {
let mut stats = OnlineStats::new();
for i in 0..100 {
stats.update(100.0 + (i % 20) as f64 - 10.0);
}
let z = stats.z_score(stats.mean() + 3.0 * stats.stddev());
assert!((z - 3.0).abs() < 0.1);
}
#[test]
fn test_online_stats_insufficient_data() {
let mut stats = OnlineStats::new();
stats.update(100.0);
assert_eq!(stats.z_score(200.0), 0.0);
}
#[test]
fn test_anomaly_collector_new() {
let collector = AnomalyCollector::new();
assert!((collector.threshold() - 3.0).abs() < f32::EPSILON);
assert_eq!(collector.total_count(), 0);
assert!(collector.is_available());
}
#[test]
fn test_anomaly_collector_process_normal() {
let mut collector = AnomalyCollector::new();
for _ in 0..100 {
collector.process("read", 100, None, None, 1234);
}
let (z, anomaly) = collector.process("read", 100, None, None, 1234);
assert!(z.abs() < 1.0);
assert!(anomaly.is_none());
}
#[test]
fn test_anomaly_collector_process_anomaly() {
let mut collector = AnomalyCollector::with_threshold(3.0);
for i in 0..100 {
let duration = 100 + (i % 5); collector.process("read", duration, None, None, 1234);
}
let (z, anomaly) = collector.process("read", 10000, Some("main.c"), Some(42), 1234);
assert!(z > 3.0);
assert!(anomaly.is_some());
let record = anomaly.unwrap();
assert_eq!(record.syscall, "read");
assert_eq!(record.duration_us, 10000);
assert_eq!(record.source_file, Some("main.c".to_string()));
assert_eq!(record.source_line, Some(42));
}
#[test]
fn test_anomaly_collector_ring_buffer() {
let mut collector = AnomalyCollector::with_threshold(0.001); collector.max_anomalies = 10;
for _ in 0..15 {
collector.process("read", 100, None, None, 1234);
}
collector.anomalies.clear();
let initial_total = collector.total_anomalies;
for i in 0..20 {
collector.process("read", 100 + i * 50, None, None, 1234); }
assert!(collector.anomalies().len() <= 10);
assert!(collector.total_count() > initial_total);
}
#[test]
fn test_anomaly_collector_collect() {
let mut collector = AnomalyCollector::new();
collector.process("read", 100, None, None, 1234);
let metrics = collector.collect().unwrap();
assert!(metrics.values.contains_key("anomaly.total.count"));
assert!(metrics.values.contains_key("anomaly.avg_z_score"));
}
#[test]
fn test_anomaly_collector_reset() {
let mut collector = AnomalyCollector::with_threshold(2.0);
for i in 0..20 {
let duration = 100 + (i % 10); collector.process("read", duration, None, None, 1234);
}
collector.process("read", 10000, None, None, 1234);
assert!(
!collector.anomalies().is_empty(),
"Expected at least one anomaly after extreme value"
);
assert!(collector.total_count() > 0);
collector.reset();
assert!(collector.anomalies().is_empty());
assert_eq!(collector.total_count(), 0);
assert!(collector.stats.is_empty());
}
}