use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use tracing::{debug, info, instrument, warn};
use crate::analyzers::{AnalyzerContext, AnalyzerError, AnalyzerResult, MetricValue};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Anomaly {
pub metric_name: String,
pub current_value: MetricValue,
pub expected_value: Option<MetricValue>,
pub detection_strategy: String,
pub confidence: f64,
pub description: String,
pub detected_at: DateTime<Utc>,
pub metadata: HashMap<String, String>,
}
impl Anomaly {
pub fn new(
metric_name: String,
current_value: MetricValue,
detection_strategy: String,
confidence: f64,
description: String,
) -> Self {
Self {
metric_name,
current_value,
expected_value: None,
detection_strategy,
confidence,
description,
detected_at: Utc::now(),
metadata: HashMap::new(),
}
}
pub fn with_expected_value(mut self, value: MetricValue) -> Self {
self.expected_value = Some(value);
self
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricDataPoint {
pub value: MetricValue,
pub timestamp: DateTime<Utc>,
pub metadata: HashMap<String, String>,
}
#[async_trait]
pub trait AnomalyDetector: Send + Sync {
async fn detect(
&self,
metric_name: &str,
current_value: &MetricValue,
history: &[MetricDataPoint],
) -> AnalyzerResult<Option<Anomaly>>;
fn name(&self) -> &str;
fn description(&self) -> &str;
}
#[async_trait]
pub trait MetricsRepository: Send + Sync {
async fn store_metric(
&self,
metric_name: &str,
value: MetricValue,
timestamp: DateTime<Utc>,
) -> AnalyzerResult<()>;
async fn get_metric_history(
&self,
metric_name: &str,
since: Option<DateTime<Utc>>,
until: Option<DateTime<Utc>>,
limit: Option<usize>,
) -> AnalyzerResult<Vec<MetricDataPoint>>;
async fn store_context(&self, context: &AnalyzerContext) -> AnalyzerResult<()> {
let timestamp = Utc::now();
for (metric_name, value) in context.all_metrics() {
self.store_metric(metric_name, value.clone(), timestamp)
.await?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct InMemoryMetricsConfig {
pub max_points_per_metric: usize,
pub max_metrics: usize,
pub max_age_seconds: i64,
}
impl Default for InMemoryMetricsConfig {
fn default() -> Self {
Self {
max_points_per_metric: 10_000,
max_metrics: 1_000,
max_age_seconds: 30 * 24 * 60 * 60, }
}
}
#[derive(Clone)]
pub struct InMemoryMetricsRepository {
data: Arc<tokio::sync::RwLock<HashMap<String, Vec<MetricDataPoint>>>>,
config: InMemoryMetricsConfig,
}
impl InMemoryMetricsRepository {
pub fn new() -> Self {
Self::with_config(InMemoryMetricsConfig::default())
}
pub fn with_config(config: InMemoryMetricsConfig) -> Self {
Self {
data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
config,
}
}
pub async fn memory_stats(&self) -> MemoryStats {
let data = self.data.read().await;
let mut total_points = 0;
let mut oldest_timestamp = None;
let mut newest_timestamp = None;
for history in data.values() {
total_points += history.len();
for point in history {
match oldest_timestamp {
None => oldest_timestamp = Some(point.timestamp),
Some(oldest) if point.timestamp < oldest => {
oldest_timestamp = Some(point.timestamp)
}
_ => {}
}
match newest_timestamp {
None => newest_timestamp = Some(point.timestamp),
Some(newest) if point.timestamp > newest => {
newest_timestamp = Some(point.timestamp)
}
_ => {}
}
}
}
MemoryStats {
total_metrics: data.len(),
total_data_points: total_points,
oldest_data_point: oldest_timestamp,
newest_data_point: newest_timestamp,
estimated_memory_bytes: Self::estimate_memory_usage(&data),
}
}
fn estimate_memory_usage(data: &HashMap<String, Vec<MetricDataPoint>>) -> usize {
let mut size = std::mem::size_of::<HashMap<String, Vec<MetricDataPoint>>>();
for (key, values) in data {
size += std::mem::size_of::<String>() + key.len();
size += std::mem::size_of::<Vec<MetricDataPoint>>();
size += values.len() * std::mem::size_of::<MetricDataPoint>();
for point in values {
for (k, v) in &point.metadata {
size += std::mem::size_of::<String>() * 2 + k.len() + v.len();
}
}
}
size
}
async fn cleanup_if_needed(&self) {
let mut data = self.data.write().await;
if data.len() > self.config.max_metrics {
warn!(
current_metrics = data.len(),
max_metrics = self.config.max_metrics,
"Metrics limit exceeded, removing oldest metrics"
);
let mut metrics_by_latest: Vec<_> = data
.iter()
.map(|(name, points)| {
let latest = points.iter().map(|p| p.timestamp).max().unwrap_or_default();
(name.clone(), latest)
})
.collect();
metrics_by_latest.sort_by(|a, b| b.1.cmp(&a.1));
let to_remove = metrics_by_latest.len() - self.config.max_metrics;
for (metric_name, _) in metrics_by_latest.iter().skip(self.config.max_metrics) {
data.remove(metric_name);
}
info!(
removed_metrics = to_remove,
remaining_metrics = data.len(),
"Cleaned up old metrics"
);
}
let cutoff_time = Utc::now() - Duration::seconds(self.config.max_age_seconds);
let mut total_points_removed = 0;
for (metric_name, points) in data.iter_mut() {
let original_len = points.len();
points.retain(|p| p.timestamp >= cutoff_time);
if points.len() > self.config.max_points_per_metric {
points.sort_by_key(|p| p.timestamp);
let to_keep = points.len() - self.config.max_points_per_metric;
points.drain(0..to_keep);
}
let removed = original_len - points.len();
if removed > 0 {
total_points_removed += removed;
debug!(
metric = metric_name,
removed_points = removed,
remaining_points = points.len(),
"Cleaned up old data points"
);
}
}
if total_points_removed > 0 {
info!(
total_removed = total_points_removed,
"Completed data point cleanup"
);
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryStats {
pub total_metrics: usize,
pub total_data_points: usize,
pub oldest_data_point: Option<DateTime<Utc>>,
pub newest_data_point: Option<DateTime<Utc>>,
pub estimated_memory_bytes: usize,
}
impl Default for InMemoryMetricsRepository {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl MetricsRepository for InMemoryMetricsRepository {
async fn store_metric(
&self,
metric_name: &str,
value: MetricValue,
timestamp: DateTime<Utc>,
) -> AnalyzerResult<()> {
{
let data = self.data.read().await;
if data.len() >= self.config.max_metrics && !data.contains_key(metric_name) {
return Err(AnalyzerError::Custom(format!(
"Maximum metrics limit ({}) exceeded",
self.config.max_metrics
)));
}
if let Some(existing) = data.get(metric_name) {
if existing.len() >= self.config.max_points_per_metric {
warn!(
metric = metric_name,
current_points = existing.len(),
max_points = self.config.max_points_per_metric,
"Metric approaching memory limit"
);
}
}
}
{
let mut data = self.data.write().await;
let entry = data.entry(metric_name.to_string()).or_insert_with(Vec::new);
entry.push(MetricDataPoint {
value,
timestamp,
metadata: HashMap::new(),
});
entry.sort_by_key(|dp| dp.timestamp);
}
self.cleanup_if_needed().await;
Ok(())
}
async fn get_metric_history(
&self,
metric_name: &str,
since: Option<DateTime<Utc>>,
until: Option<DateTime<Utc>>,
limit: Option<usize>,
) -> AnalyzerResult<Vec<MetricDataPoint>> {
let data = self.data.read().await;
if let Some(history) = data.get(metric_name) {
let mut filtered: Vec<_> = history
.iter()
.filter(|dp| {
let after_since = since.map_or(true, |s| dp.timestamp >= s);
let before_until = until.map_or(true, |u| dp.timestamp <= u);
after_since && before_until
})
.cloned()
.collect();
if let Some(limit) = limit {
filtered.truncate(limit);
}
Ok(filtered)
} else {
Ok(Vec::new())
}
}
}
pub struct RelativeRateOfChangeDetector {
pub max_rate_of_change: f64,
pub min_history_size: usize,
}
impl RelativeRateOfChangeDetector {
pub fn new(max_rate_of_change: f64) -> Self {
Self {
max_rate_of_change,
min_history_size: 2,
}
}
pub fn with_min_history_size(mut self, size: usize) -> Self {
self.min_history_size = size;
self
}
}
#[async_trait]
impl AnomalyDetector for RelativeRateOfChangeDetector {
async fn detect(
&self,
metric_name: &str,
current_value: &MetricValue,
history: &[MetricDataPoint],
) -> AnalyzerResult<Option<Anomaly>> {
if history.len() < self.min_history_size {
debug!(
metric = metric_name,
history_size = history.len(),
required = self.min_history_size,
"Insufficient history for rate of change detection"
);
return Ok(None);
}
let previous = history.last().unwrap();
debug!(
metric = metric_name,
current = ?current_value,
previous = ?previous.value,
"Comparing values for rate of change"
);
match (current_value, &previous.value) {
(MetricValue::Long(current), MetricValue::Long(previous)) => {
if *previous == 0 {
return Ok(None); }
let rate_of_change = ((*current - *previous) as f64).abs() / (*previous as f64);
debug!(
metric = metric_name,
rate_of_change = rate_of_change,
threshold = self.max_rate_of_change,
"Calculated rate of change"
);
if rate_of_change > self.max_rate_of_change {
let anomaly = Anomaly::new(
metric_name.to_string(),
current_value.clone(),
self.name().to_string(),
rate_of_change / self.max_rate_of_change, format!(
"Relative change of {:.1}% exceeds threshold of {:.1}%",
rate_of_change * 100.0,
self.max_rate_of_change * 100.0
),
)
.with_expected_value(MetricValue::Long(*previous))
.with_metadata("rate_of_change".to_string(), format!("{rate_of_change:.4}"));
return Ok(Some(anomaly));
}
}
(MetricValue::Double(current), MetricValue::Double(previous)) => {
if *previous == 0.0 {
return Ok(None); }
let rate_of_change = ((current - previous).abs()) / previous.abs();
if rate_of_change > self.max_rate_of_change {
let anomaly = Anomaly::new(
metric_name.to_string(),
current_value.clone(),
self.name().to_string(),
rate_of_change / self.max_rate_of_change, format!(
"Relative change of {:.1}% exceeds threshold of {:.1}%",
rate_of_change * 100.0,
self.max_rate_of_change * 100.0
),
)
.with_expected_value(MetricValue::Double(*previous))
.with_metadata("rate_of_change".to_string(), format!("{rate_of_change:.4}"));
return Ok(Some(anomaly));
}
}
_ => {
return Ok(None);
}
}
Ok(None)
}
fn name(&self) -> &str {
"RelativeRateOfChange"
}
fn description(&self) -> &str {
"Detects anomalies when the relative rate of change exceeds a threshold"
}
}
pub struct AbsoluteChangeDetector {
pub max_absolute_change: f64,
pub min_history_size: usize,
}
impl AbsoluteChangeDetector {
pub fn new(max_absolute_change: f64) -> Self {
Self {
max_absolute_change,
min_history_size: 1,
}
}
pub fn with_min_history_size(mut self, size: usize) -> Self {
self.min_history_size = size;
self
}
}
#[async_trait]
impl AnomalyDetector for AbsoluteChangeDetector {
async fn detect(
&self,
metric_name: &str,
current_value: &MetricValue,
history: &[MetricDataPoint],
) -> AnalyzerResult<Option<Anomaly>> {
if history.len() < self.min_history_size {
return Ok(None);
}
let previous = history.last().unwrap();
match (current_value, &previous.value) {
(MetricValue::Long(current), MetricValue::Long(previous)) => {
let change = (*current - *previous).abs() as f64;
if change > self.max_absolute_change {
let anomaly = Anomaly::new(
metric_name.to_string(),
current_value.clone(),
self.name().to_string(),
change / self.max_absolute_change,
format!(
"Absolute change of {change} exceeds threshold of {}",
self.max_absolute_change
),
)
.with_expected_value(MetricValue::Long(*previous))
.with_metadata("absolute_change".to_string(), format!("{change}"));
return Ok(Some(anomaly));
}
}
(MetricValue::Double(current), MetricValue::Double(previous)) => {
let change = (current - previous).abs();
if change > self.max_absolute_change {
let anomaly = Anomaly::new(
metric_name.to_string(),
current_value.clone(),
self.name().to_string(),
change / self.max_absolute_change,
format!(
"Absolute change of {change:.4} exceeds threshold of {:.4}",
self.max_absolute_change
),
)
.with_expected_value(MetricValue::Double(*previous))
.with_metadata("absolute_change".to_string(), format!("{change:.4}"));
return Ok(Some(anomaly));
}
}
_ => return Ok(None),
}
Ok(None)
}
fn name(&self) -> &str {
"AbsoluteChange"
}
fn description(&self) -> &str {
"Detects anomalies when the absolute change exceeds a threshold"
}
}
pub struct ZScoreDetector {
pub z_score_threshold: f64,
pub min_history_size: usize,
}
impl ZScoreDetector {
pub fn new(z_score_threshold: f64) -> Self {
Self {
z_score_threshold,
min_history_size: 10,
}
}
pub fn with_min_history_size(mut self, size: usize) -> Self {
self.min_history_size = size;
self
}
}
#[async_trait]
impl AnomalyDetector for ZScoreDetector {
async fn detect(
&self,
metric_name: &str,
current_value: &MetricValue,
history: &[MetricDataPoint],
) -> AnalyzerResult<Option<Anomaly>> {
if history.len() < self.min_history_size {
return Ok(None);
}
let numeric_values: Vec<f64> = history
.iter()
.filter_map(|dp| match &dp.value {
MetricValue::Long(v) => Some(*v as f64),
MetricValue::Double(v) => Some(*v),
_ => None,
})
.collect();
if numeric_values.len() < self.min_history_size {
return Ok(None);
}
let mean = numeric_values.iter().sum::<f64>() / numeric_values.len() as f64;
let variance = numeric_values
.iter()
.map(|v| (v - mean).powi(2))
.sum::<f64>()
/ numeric_values.len() as f64;
let std_dev = variance.sqrt();
if std_dev == 0.0 {
return Ok(None);
}
let current_numeric = match current_value {
MetricValue::Long(v) => *v as f64,
MetricValue::Double(v) => *v,
_ => return Ok(None),
};
let z_score = (current_numeric - mean).abs() / std_dev;
if z_score > self.z_score_threshold {
let anomaly = Anomaly::new(
metric_name.to_string(),
current_value.clone(),
self.name().to_string(),
(z_score / self.z_score_threshold).min(1.0),
format!(
"Value is {z_score:.1} standard deviations from mean (threshold: {:.1})",
self.z_score_threshold
),
)
.with_expected_value(MetricValue::Double(mean))
.with_metadata("z_score".to_string(), format!("{z_score:.2}"))
.with_metadata("mean".to_string(), format!("{mean:.4}"))
.with_metadata("std_dev".to_string(), format!("{std_dev:.4}"));
return Ok(Some(anomaly));
}
Ok(None)
}
fn name(&self) -> &str {
"ZScore"
}
fn description(&self) -> &str {
"Detects anomalies using statistical Z-score analysis"
}
}
#[derive(Debug, Clone)]
pub struct AnomalyDetectionConfig {
pub min_confidence: f64,
pub store_current_metrics: bool,
pub default_history_window: Duration,
}
impl Default for AnomalyDetectionConfig {
fn default() -> Self {
Self {
min_confidence: 0.7,
store_current_metrics: true,
default_history_window: Duration::days(30),
}
}
}
pub struct AnomalyDetectionRunner {
repository: Box<dyn MetricsRepository>,
detectors: Vec<(String, Box<dyn AnomalyDetector>)>,
config: AnomalyDetectionConfig,
}
impl AnomalyDetectionRunner {
pub fn builder() -> AnomalyDetectionRunnerBuilder {
AnomalyDetectionRunnerBuilder::default()
}
#[instrument(skip(self, context))]
pub async fn detect_anomalies(
&self,
context: &AnalyzerContext,
) -> AnalyzerResult<Vec<Anomaly>> {
let mut anomalies = Vec::new();
let detection_time = Utc::now() - chrono::Duration::milliseconds(1);
if self.config.store_current_metrics {
self.repository.store_context(context).await?;
}
for (metric_name, metric_value) in context.all_metrics() {
for (pattern, detector) in &self.detectors {
if self.matches_pattern(metric_name, pattern) {
let since = Utc::now() - self.config.default_history_window;
let history = self
.repository
.get_metric_history(metric_name, Some(since), Some(detection_time), None)
.await?;
debug!(
metric = metric_name,
history_size = history.len(),
current_value = ?metric_value,
"Running anomaly detection"
);
match detector.detect(metric_name, metric_value, &history).await {
Ok(Some(anomaly)) => {
if anomaly.confidence >= self.config.min_confidence {
info!(
metric = metric_name,
strategy = anomaly.detection_strategy,
confidence = anomaly.confidence,
"Anomaly detected"
);
anomalies.push(anomaly);
}
}
Ok(None) => {
}
Err(e) => {
warn!(
metric = metric_name,
detector = detector.name(),
error = %e,
"Error during anomaly detection"
);
}
}
}
}
}
Ok(anomalies)
}
fn matches_pattern(&self, metric_name: &str, pattern: &str) -> bool {
if pattern == "*" {
return true;
}
if let Some(prefix) = pattern.strip_suffix('*') {
return metric_name.starts_with(prefix);
}
metric_name == pattern
}
}
#[derive(Default)]
pub struct AnomalyDetectionRunnerBuilder {
repository: Option<Box<dyn MetricsRepository>>,
detectors: Vec<(String, Box<dyn AnomalyDetector>)>,
config: AnomalyDetectionConfig,
}
impl AnomalyDetectionRunnerBuilder {
pub fn repository(mut self, repository: Box<dyn MetricsRepository>) -> Self {
self.repository = Some(repository);
self
}
pub fn add_detector(mut self, pattern: &str, detector: Box<dyn AnomalyDetector>) -> Self {
self.detectors.push((pattern.to_string(), detector));
self
}
pub fn config(mut self, config: AnomalyDetectionConfig) -> Self {
self.config = config;
self
}
pub fn build(self) -> AnalyzerResult<AnomalyDetectionRunner> {
let repository = self
.repository
.ok_or_else(|| AnalyzerError::Custom("Metrics repository is required".to_string()))?;
Ok(AnomalyDetectionRunner {
repository,
detectors: self.detectors,
config: self.config,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_relative_rate_of_change_detector() {
let detector = RelativeRateOfChangeDetector::new(0.1).with_min_history_size(1);
let history = vec![MetricDataPoint {
value: MetricValue::Long(100),
timestamp: Utc::now() - Duration::hours(1),
metadata: HashMap::new(),
}];
let current = MetricValue::Long(105);
let result = detector
.detect("test_metric", ¤t, &history)
.await
.unwrap();
assert!(result.is_none());
let current = MetricValue::Long(120);
let result = detector
.detect("test_metric", ¤t, &history)
.await
.unwrap();
assert!(result.is_some());
let anomaly = result.unwrap();
assert_eq!(anomaly.detection_strategy, "RelativeRateOfChange");
assert!(anomaly.confidence > 1.0); }
#[tokio::test]
async fn test_z_score_detector() {
let detector = ZScoreDetector::new(2.0);
let mut history = Vec::new();
for i in 0..20 {
history.push(MetricDataPoint {
value: MetricValue::Long(95 + (i % 10)),
timestamp: Utc::now() - Duration::hours(i),
metadata: HashMap::new(),
});
}
let current = MetricValue::Long(102);
let result = detector
.detect("test_metric", ¤t, &history)
.await
.unwrap();
assert!(result.is_none());
let current = MetricValue::Long(150);
let result = detector
.detect("test_metric", ¤t, &history)
.await
.unwrap();
assert!(result.is_some());
let anomaly = result.unwrap();
assert_eq!(anomaly.detection_strategy, "ZScore");
}
#[tokio::test]
async fn test_in_memory_repository_memory_limits() {
let config = InMemoryMetricsConfig {
max_metrics: 2,
max_points_per_metric: 3,
max_age_seconds: 60,
};
let repo = InMemoryMetricsRepository::with_config(config);
let now = Utc::now();
repo.store_metric("metric1", MetricValue::Long(100), now)
.await
.unwrap();
repo.store_metric("metric2", MetricValue::Long(200), now)
.await
.unwrap();
let result = repo
.store_metric("metric3", MetricValue::Long(300), now)
.await;
assert!(result.is_err());
let stats = repo.memory_stats().await;
assert_eq!(stats.total_metrics, 2);
assert_eq!(stats.total_data_points, 2);
repo.store_metric(
"metric1",
MetricValue::Long(101),
now + Duration::seconds(1),
)
.await
.unwrap();
repo.store_metric(
"metric1",
MetricValue::Long(102),
now + Duration::seconds(2),
)
.await
.unwrap();
repo.store_metric(
"metric1",
MetricValue::Long(103),
now + Duration::seconds(3),
)
.await
.unwrap();
let history = repo
.get_metric_history("metric1", None, None, None)
.await
.unwrap();
assert!(history.len() <= 3);
let final_stats = repo.memory_stats().await;
assert!(final_stats.estimated_memory_bytes > 0);
}
#[tokio::test]
async fn test_in_memory_repository() {
let repo = InMemoryMetricsRepository::new();
let now = Utc::now();
repo.store_metric("metric1", MetricValue::Long(100), now)
.await
.unwrap();
repo.store_metric("metric1", MetricValue::Long(110), now + Duration::hours(1))
.await
.unwrap();
repo.store_metric("metric2", MetricValue::Double(0.95), now)
.await
.unwrap();
let history = repo
.get_metric_history("metric1", None, None, None)
.await
.unwrap();
assert_eq!(history.len(), 2);
assert_eq!(history[0].value, MetricValue::Long(100));
assert_eq!(history[1].value, MetricValue::Long(110));
let history = repo
.get_metric_history("metric1", Some(now + Duration::minutes(30)), None, None)
.await
.unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0].value, MetricValue::Long(110));
}
}