use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{debug, instrument};
use crate::analyzers::{AnalyzerError, AnalyzerResult};
const EPSILON: f64 = 1e-10;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricPoint {
pub value: f64,
pub timestamp: DateTime<Utc>,
pub metadata: HashMap<String, String>,
}
impl MetricPoint {
pub fn new(value: f64) -> Self {
Self {
value,
timestamp: Utc::now(),
metadata: HashMap::new(),
}
}
pub fn with_timestamp(value: f64, timestamp: DateTime<Utc>) -> Self {
Self {
value,
timestamp,
metadata: HashMap::new(),
}
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyResult {
pub is_anomaly: bool,
pub confidence: f64,
pub expected_range: Option<(f64, f64)>,
pub actual_value: f64,
pub explanation: String,
pub details: HashMap<String, String>,
}
impl AnomalyResult {
pub fn no_anomaly(actual_value: f64, explanation: String) -> Self {
Self {
is_anomaly: false,
confidence: 0.0,
expected_range: None,
actual_value,
explanation,
details: HashMap::new(),
}
}
pub fn anomaly_detected(actual_value: f64, confidence: f64, explanation: String) -> Self {
Self {
is_anomaly: true,
confidence,
expected_range: None,
actual_value,
explanation,
details: HashMap::new(),
}
}
pub fn insufficient_history() -> Self {
Self {
is_anomaly: false,
confidence: 0.0,
expected_range: None,
actual_value: 0.0,
explanation: "Insufficient historical data for anomaly detection".to_string(),
details: HashMap::new(),
}
}
pub fn with_expected_range(mut self, min: f64, max: f64) -> Self {
self.expected_range = Some((min, max));
self
}
pub fn with_detail(mut self, key: String, value: String) -> Self {
self.details.insert(key, value);
self
}
}
#[async_trait]
pub trait AnomalyDetectionStrategy: Send + Sync {
async fn detect(
&self,
history: &[MetricPoint],
current: MetricPoint,
) -> AnalyzerResult<AnomalyResult>;
fn name(&self) -> &str;
fn description(&self) -> &str;
}
#[derive(Debug, Clone)]
pub struct RelativeRateOfChangeStrategy {
pub max_rate_increase: Option<f64>,
pub max_rate_decrease: Option<f64>,
pub min_history_points: usize,
}
impl RelativeRateOfChangeStrategy {
pub fn new(max_rate: f64) -> AnalyzerResult<Self> {
Self::validate_rate(max_rate, "max_rate")?;
Ok(Self {
max_rate_increase: Some(max_rate),
max_rate_decrease: Some(max_rate),
min_history_points: 1,
})
}
pub fn with_asymmetric_thresholds(
max_increase: Option<f64>,
max_decrease: Option<f64>,
) -> AnalyzerResult<Self> {
if let Some(rate) = max_increase {
Self::validate_rate(rate, "max_increase")?;
}
if let Some(rate) = max_decrease {
Self::validate_rate(rate, "max_decrease")?;
}
Ok(Self {
max_rate_increase: max_increase,
max_rate_decrease: max_decrease,
min_history_points: 1,
})
}
pub fn with_min_history(mut self, min_points: usize) -> Self {
self.min_history_points = min_points;
self
}
fn validate_rate(rate: f64, name: &str) -> AnalyzerResult<()> {
if !rate.is_finite() || rate < 0.0 {
return Err(AnalyzerError::Custom(format!(
"{name} must be finite and non-negative, got: {rate}"
)));
}
Ok(())
}
}
#[async_trait]
impl AnomalyDetectionStrategy for RelativeRateOfChangeStrategy {
#[instrument(skip(self, history))]
async fn detect(
&self,
history: &[MetricPoint],
current: MetricPoint,
) -> AnalyzerResult<AnomalyResult> {
if history.len() < self.min_history_points {
debug!(
history_size = history.len(),
required = self.min_history_points,
"Insufficient history for rate of change detection"
);
return Ok(AnomalyResult::insufficient_history());
}
let previous = history
.last()
.ok_or_else(|| AnalyzerError::Custom("No previous value in history".to_string()))?;
if previous.value.abs() < EPSILON {
if current.value.abs() < EPSILON {
return Ok(AnomalyResult::no_anomaly(
current.value,
"No change detected (both values are near zero)".to_string(),
));
}
let explanation = format!(
"Cannot calculate rate of change from near-zero baseline (previous: {}, current: {})",
previous.value, current.value
);
if self.max_rate_increase.is_some() || self.max_rate_decrease.is_some() {
return Ok(AnomalyResult::anomaly_detected(
current.value,
1.0, explanation,
)
.with_detail("previous_value".to_string(), previous.value.to_string())
.with_detail("rate_of_change".to_string(), "infinite".to_string()));
} else {
return Ok(AnomalyResult::no_anomaly(current.value, explanation));
}
}
let rate_of_change = (current.value - previous.value) / previous.value.abs();
debug!(
previous = previous.value,
current = current.value,
rate_of_change = rate_of_change,
"Calculated rate of change"
);
let mut is_anomaly = false;
let mut confidence = 0.0;
let mut explanation = String::new();
if rate_of_change > 0.0 {
if let Some(max_increase) = self.max_rate_increase {
if rate_of_change > max_increase {
is_anomaly = true;
confidence = (rate_of_change / max_increase).min(1.0);
explanation = format!(
"Rate of increase ({:.1}%) exceeds threshold ({:.1}%)",
rate_of_change * 100.0,
max_increase * 100.0
);
}
}
} else if rate_of_change < 0.0 {
let abs_rate = rate_of_change.abs();
if let Some(max_decrease) = self.max_rate_decrease {
if abs_rate > max_decrease {
is_anomaly = true;
confidence = (abs_rate / max_decrease).min(1.0);
explanation = format!(
"Rate of decrease ({:.1}%) exceeds threshold ({:.1}%)",
abs_rate * 100.0,
max_decrease * 100.0
);
}
}
}
let mut result = if is_anomaly {
AnomalyResult::anomaly_detected(current.value, confidence, explanation)
} else {
let explanation = format!(
"Rate of change ({:.1}%) is within acceptable limits",
rate_of_change * 100.0
);
AnomalyResult::no_anomaly(current.value, explanation)
};
result = result
.with_detail("previous_value".to_string(), previous.value.to_string())
.with_detail("rate_of_change".to_string(), format!("{rate_of_change:.4}"))
.with_detail(
"rate_of_change_percentage".to_string(),
format!("{:.1}%", rate_of_change * 100.0),
);
if let (Some(max_inc), Some(max_dec)) = (self.max_rate_increase, self.max_rate_decrease) {
let min_expected = previous.value * (1.0 - max_dec);
let max_expected = previous.value * (1.0 + max_inc);
result = result.with_expected_range(min_expected, max_expected);
}
Ok(result)
}
fn name(&self) -> &str {
"RelativeRateOfChange"
}
fn description(&self) -> &str {
"Detects anomalies based on relative rate of change between consecutive values"
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
#[tokio::test]
async fn test_relative_rate_of_change_normal() {
let strategy = RelativeRateOfChangeStrategy::new(0.1).unwrap();
let now = Utc::now();
let history = vec![
MetricPoint::with_timestamp(100.0, now - Duration::hours(2)),
MetricPoint::with_timestamp(105.0, now - Duration::hours(1)),
];
let current = MetricPoint::with_timestamp(110.25, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(!result.is_anomaly);
assert_eq!(result.confidence, 0.0);
assert!(result.explanation.contains("within acceptable limits"));
}
#[tokio::test]
async fn test_relative_rate_of_change_anomaly_increase() {
let strategy = RelativeRateOfChangeStrategy::new(0.1).unwrap();
let now = Utc::now();
let history = vec![MetricPoint::with_timestamp(100.0, now - Duration::hours(1))];
let current = MetricPoint::with_timestamp(120.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(result.is_anomaly);
assert!(result.confidence > 0.9); assert!(result.explanation.contains("exceeds threshold"));
if let Some((min, max)) = result.expected_range {
assert!((min - 90.0).abs() < 0.001);
assert!((max - 110.0).abs() < 0.001);
} else {
panic!("Expected range should be Some");
}
}
#[tokio::test]
async fn test_relative_rate_of_change_anomaly_decrease() {
let strategy = RelativeRateOfChangeStrategy::new(0.1).unwrap();
let now = Utc::now();
let history = vec![MetricPoint::with_timestamp(100.0, now - Duration::hours(1))];
let current = MetricPoint::with_timestamp(85.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(result.is_anomaly);
assert!(result.confidence > 0.9); assert!(result.explanation.contains("decrease"));
assert!(result.explanation.contains("exceeds threshold"));
}
#[tokio::test]
async fn test_asymmetric_thresholds() {
let strategy = RelativeRateOfChangeStrategy::with_asymmetric_thresholds(
Some(0.2), Some(0.05), )
.unwrap();
let now = Utc::now();
let history = vec![MetricPoint::with_timestamp(100.0, now - Duration::hours(1))];
let current = MetricPoint::with_timestamp(115.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(!result.is_anomaly);
let current = MetricPoint::with_timestamp(93.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(result.is_anomaly);
assert!(result.explanation.contains("decrease"));
}
#[tokio::test]
async fn test_zero_baseline_edge_case() {
let strategy = RelativeRateOfChangeStrategy::new(0.1).unwrap();
let now = Utc::now();
let history = vec![MetricPoint::with_timestamp(0.0, now - Duration::hours(1))];
let current = MetricPoint::with_timestamp(10.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(result.is_anomaly);
assert_eq!(result.confidence, 1.0);
assert!(result.explanation.contains("zero baseline"));
assert!(result
.details
.get("rate_of_change")
.unwrap()
.contains("infinite"));
}
#[tokio::test]
async fn test_insufficient_history() {
let strategy = RelativeRateOfChangeStrategy::new(0.1)
.unwrap()
.with_min_history(3);
let history = vec![MetricPoint::new(100.0), MetricPoint::new(105.0)];
let current = MetricPoint::new(110.0);
let result = strategy.detect(&history, current).await.unwrap();
assert!(!result.is_anomaly);
assert!(result.explanation.contains("Insufficient"));
}
#[tokio::test]
async fn test_no_limit_on_increases() {
let strategy = RelativeRateOfChangeStrategy::with_asymmetric_thresholds(
None, Some(0.1), )
.unwrap();
let now = Utc::now();
let history = vec![MetricPoint::with_timestamp(100.0, now - Duration::hours(1))];
let current = MetricPoint::with_timestamp(600.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(!result.is_anomaly);
let current = MetricPoint::with_timestamp(85.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(result.is_anomaly);
}
#[tokio::test]
async fn test_invalid_rate_validation() {
let result = RelativeRateOfChangeStrategy::new(f64::NAN);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("must be finite"));
let result = RelativeRateOfChangeStrategy::new(f64::INFINITY);
assert!(result.is_err());
let result = RelativeRateOfChangeStrategy::new(-0.1);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("non-negative"));
let result =
RelativeRateOfChangeStrategy::with_asymmetric_thresholds(Some(f64::NAN), Some(0.1));
assert!(result.is_err());
let result =
RelativeRateOfChangeStrategy::with_asymmetric_thresholds(Some(0.1), Some(-0.05));
assert!(result.is_err());
}
#[tokio::test]
async fn test_near_zero_baseline() {
let strategy = RelativeRateOfChangeStrategy::new(0.1).unwrap();
let now = Utc::now();
let history = vec![MetricPoint::with_timestamp(1e-11, now - Duration::hours(1))];
let current = MetricPoint::with_timestamp(10.0, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(result.is_anomaly);
assert_eq!(result.confidence, 1.0);
assert!(result.explanation.contains("near-zero baseline"));
assert!(result
.details
.get("rate_of_change")
.unwrap()
.contains("infinite"));
}
#[tokio::test]
async fn test_very_small_changes() {
let strategy = RelativeRateOfChangeStrategy::new(0.1).unwrap();
let now = Utc::now();
let history = vec![MetricPoint::with_timestamp(1e-6, now - Duration::hours(1))];
let current = MetricPoint::with_timestamp(1.05e-6, now);
let result = strategy.detect(&history, current).await.unwrap();
assert!(!result.is_anomaly);
assert!(result.explanation.contains("within acceptable limits"));
}
}