use chrono::Utc;
use llmtrace_core::{
AnomalyDetectionConfig, AnomalyType, CacheLayer, SecurityFinding, SecuritySeverity, TenantId,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlidingWindow {
values: Vec<f64>,
capacity: usize,
}
impl SlidingWindow {
pub fn new(capacity: usize) -> Self {
Self {
values: Vec::with_capacity(capacity),
capacity,
}
}
pub fn push(&mut self, value: f64) {
if self.values.len() >= self.capacity {
self.values.remove(0);
}
self.values.push(value);
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn mean(&self) -> f64 {
if self.values.is_empty() {
return 0.0;
}
let sum: f64 = self.values.iter().sum();
sum / self.values.len() as f64
}
pub fn std_dev(&self) -> f64 {
if self.values.len() < 2 {
return 0.0;
}
let mean = self.mean();
let variance: f64 = self
.values
.iter()
.map(|v| {
let diff = v - mean;
diff * diff
})
.sum::<f64>()
/ self.values.len() as f64;
variance.sqrt()
}
pub fn sigma_distance(&self, value: f64) -> Option<f64> {
if self.values.len() < 2 {
return None;
}
let sd = self.std_dev();
if sd < f64::EPSILON {
return None;
}
Some((value - self.mean()) / sd)
}
}
const WINDOW_CACHE_TTL_SECS: u64 = 86_400;
const MIN_SAMPLES: usize = 10;
pub struct AnomalyDetector {
config: AnomalyDetectionConfig,
cache: Arc<dyn CacheLayer>,
}
impl AnomalyDetector {
pub fn new(config: &AnomalyDetectionConfig, cache: Arc<dyn CacheLayer>) -> Option<Self> {
if !config.enabled {
return None;
}
Some(Self {
config: config.clone(),
cache,
})
}
pub async fn record_and_check(
&self,
tenant_id: TenantId,
cost_usd: Option<f64>,
total_tokens: Option<u32>,
latency_ms: Option<u64>,
) -> Vec<SecurityFinding> {
let mut findings = Vec::new();
if self.config.check_cost {
if let Some(cost) = cost_usd {
if let Some(f) = self
.check_metric(tenant_id, AnomalyType::CostSpike, cost)
.await
{
findings.push(f);
}
}
}
if self.config.check_tokens {
if let Some(tokens) = total_tokens {
if let Some(f) = self
.check_metric(tenant_id, AnomalyType::TokenSpike, tokens as f64)
.await
{
findings.push(f);
}
}
}
if self.config.check_latency {
if let Some(ms) = latency_ms {
if let Some(f) = self
.check_metric(tenant_id, AnomalyType::LatencySpike, ms as f64)
.await
{
findings.push(f);
}
}
}
if self.config.check_velocity {
if let Some(f) = self.check_velocity(tenant_id).await {
findings.push(f);
}
}
findings
}
async fn check_metric(
&self,
tenant_id: TenantId,
anomaly_type: AnomalyType,
value: f64,
) -> Option<SecurityFinding> {
let key = cache_key(tenant_id, &anomaly_type);
let mut window = self.load_window(&key).await;
let finding = if window.len() >= MIN_SAMPLES {
if let Some(sigma) = window.sigma_distance(value) {
if sigma >= self.config.sigma_threshold {
let severity = severity_from_sigma(sigma, self.config.sigma_threshold);
let mean = window.mean();
let sd = window.std_dev();
Some(build_finding(
&anomaly_type,
severity,
value,
mean,
sd,
sigma,
tenant_id,
))
} else {
None
}
} else {
None
}
} else {
None
};
window.push(value);
self.save_window(&key, &window).await;
finding
}
async fn check_velocity(&self, tenant_id: TenantId) -> Option<SecurityFinding> {
let ts_key = format!("anomaly:{tenant_id}:velocity_ts");
let window_key = cache_key(tenant_id, &AnomalyType::VelocitySpike);
let mut timestamps: Vec<i64> = self.load_timestamps(&ts_key).await;
let now = Utc::now().timestamp();
timestamps.retain(|t| now - t < 60);
timestamps.push(now);
self.save_timestamps(&ts_key, ×tamps).await;
let current_rpm = timestamps.len() as f64;
let mut window = self.load_window(&window_key).await;
let finding = if window.len() >= MIN_SAMPLES {
if let Some(sigma) = window.sigma_distance(current_rpm) {
if sigma >= self.config.sigma_threshold {
let severity = severity_from_sigma(sigma, self.config.sigma_threshold);
let mean = window.mean();
let sd = window.std_dev();
Some(build_finding(
&AnomalyType::VelocitySpike,
severity,
current_rpm,
mean,
sd,
sigma,
tenant_id,
))
} else {
None
}
} else {
None
}
} else {
None
};
window.push(current_rpm);
self.save_window(&window_key, &window).await;
finding
}
async fn load_window(&self, key: &str) -> SlidingWindow {
match self.cache.get(key).await {
Ok(Some(bytes)) => serde_json::from_slice(&bytes)
.unwrap_or_else(|_| SlidingWindow::new(self.config.window_size)),
Ok(None) => SlidingWindow::new(self.config.window_size),
Err(e) => {
warn!(%key, "Failed to load anomaly window: {e}");
SlidingWindow::new(self.config.window_size)
}
}
}
async fn save_window(&self, key: &str, window: &SlidingWindow) {
match serde_json::to_vec(window) {
Ok(bytes) => {
let ttl = Duration::from_secs(WINDOW_CACHE_TTL_SECS);
if let Err(e) = self.cache.set(key, &bytes, ttl).await {
warn!(%key, "Failed to save anomaly window: {e}");
}
}
Err(e) => {
warn!(%key, "Failed to serialize anomaly window: {e}");
}
}
}
async fn load_timestamps(&self, key: &str) -> Vec<i64> {
match self.cache.get(key).await {
Ok(Some(bytes)) => serde_json::from_slice(&bytes).unwrap_or_default(),
Ok(None) => Vec::new(),
Err(e) => {
warn!(%key, "Failed to load velocity timestamps: {e}");
Vec::new()
}
}
}
async fn save_timestamps(&self, key: &str, timestamps: &[i64]) {
match serde_json::to_vec(timestamps) {
Ok(bytes) => {
let ttl = Duration::from_secs(120);
if let Err(e) = self.cache.set(key, &bytes, ttl).await {
warn!(%key, "Failed to save velocity timestamps: {e}");
}
}
Err(e) => {
warn!(%key, "Failed to serialize velocity timestamps: {e}");
}
}
}
}
fn cache_key(tenant_id: TenantId, anomaly_type: &AnomalyType) -> String {
format!("anomaly:{tenant_id}:{anomaly_type}")
}
fn severity_from_sigma(sigma: f64, base_sigma: f64) -> SecuritySeverity {
let high_threshold = base_sigma * 5.0 / 3.0;
let critical_threshold = base_sigma * 10.0 / 3.0;
if sigma >= critical_threshold {
SecuritySeverity::Critical
} else if sigma >= high_threshold {
SecuritySeverity::High
} else {
SecuritySeverity::Medium
}
}
fn build_finding(
anomaly_type: &AnomalyType,
severity: SecuritySeverity,
value: f64,
mean: f64,
std_dev: f64,
sigma: f64,
tenant_id: TenantId,
) -> SecurityFinding {
let description = format!(
"Anomaly detected: {anomaly_type} — value {value:.4} is {sigma:.1}σ above mean {mean:.4} (σ={std_dev:.4})"
);
let confidence = (sigma / 20.0).clamp(0.5, 1.0);
let requires_alert = matches!(
severity,
SecuritySeverity::High | SecuritySeverity::Critical
);
debug!(
%tenant_id,
%anomaly_type,
value,
mean,
std_dev,
sigma,
%severity,
"Anomaly detected"
);
SecurityFinding {
id: uuid::Uuid::new_v4(),
severity,
finding_type: format!("anomaly_{anomaly_type}"),
description,
detected_at: Utc::now(),
confidence_score: confidence,
location: Some(format!("tenant:{tenant_id}")),
metadata: [
("anomaly_type".to_string(), anomaly_type.to_string()),
("value".to_string(), format!("{value:.6}")),
("mean".to_string(), format!("{mean:.6}")),
("std_dev".to_string(), format!("{std_dev:.6}")),
("sigma".to_string(), format!("{sigma:.2}")),
]
.into_iter()
.collect(),
requires_alert,
}
}
#[cfg(test)]
mod tests {
use super::*;
use llmtrace_core::AnomalyDetectionConfig;
use llmtrace_storage::InMemoryCacheLayer;
fn make_cache() -> Arc<dyn CacheLayer> {
Arc::new(InMemoryCacheLayer::new())
}
fn enabled_config() -> AnomalyDetectionConfig {
AnomalyDetectionConfig {
enabled: true,
window_size: 100,
sigma_threshold: 3.0,
check_cost: true,
check_tokens: true,
check_velocity: true,
check_latency: true,
}
}
#[test]
fn test_sliding_window_empty() {
let w = SlidingWindow::new(5);
assert!(w.is_empty());
assert_eq!(w.len(), 0);
assert_eq!(w.mean(), 0.0);
assert_eq!(w.std_dev(), 0.0);
assert!(w.sigma_distance(42.0).is_none());
}
#[test]
fn test_sliding_window_single_value() {
let mut w = SlidingWindow::new(5);
w.push(10.0);
assert_eq!(w.len(), 1);
assert_eq!(w.mean(), 10.0);
assert_eq!(w.std_dev(), 0.0);
assert!(w.sigma_distance(20.0).is_none()); }
#[test]
fn test_sliding_window_mean_and_stddev() {
let mut w = SlidingWindow::new(10);
for v in [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0] {
w.push(v);
}
assert_eq!(w.len(), 8);
let mean = w.mean();
assert!((mean - 5.0).abs() < 1e-10);
let sd = w.std_dev();
assert!((sd - 2.0).abs() < 1e-10);
}
#[test]
fn test_sliding_window_sigma_distance() {
let mut w = SlidingWindow::new(10);
for v in [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0] {
w.push(v);
}
let sigma = w.sigma_distance(11.0).unwrap();
assert!((sigma - 3.0).abs() < 1e-10);
}
#[test]
fn test_sliding_window_capacity_eviction() {
let mut w = SlidingWindow::new(3);
w.push(1.0);
w.push(2.0);
w.push(3.0);
assert_eq!(w.len(), 3);
w.push(4.0);
assert_eq!(w.len(), 3);
let mean = w.mean();
assert!((mean - 3.0).abs() < 1e-10); }
#[test]
fn test_sliding_window_zero_stddev() {
let mut w = SlidingWindow::new(5);
w.push(5.0);
w.push(5.0);
w.push(5.0);
assert_eq!(w.std_dev(), 0.0);
assert!(w.sigma_distance(10.0).is_none());
}
#[test]
fn test_severity_from_sigma_medium() {
assert_eq!(severity_from_sigma(3.0, 3.0), SecuritySeverity::Medium);
assert_eq!(severity_from_sigma(4.0, 3.0), SecuritySeverity::Medium);
}
#[test]
fn test_severity_from_sigma_high() {
assert_eq!(severity_from_sigma(5.0, 3.0), SecuritySeverity::High);
assert_eq!(severity_from_sigma(7.0, 3.0), SecuritySeverity::High);
}
#[test]
fn test_severity_from_sigma_critical() {
assert_eq!(severity_from_sigma(10.0, 3.0), SecuritySeverity::Critical);
assert_eq!(severity_from_sigma(15.0, 3.0), SecuritySeverity::Critical);
}
#[test]
fn test_disabled_config_returns_none() {
let config = AnomalyDetectionConfig::default(); assert!(AnomalyDetector::new(&config, make_cache()).is_none());
}
#[test]
fn test_enabled_config_returns_detector() {
assert!(AnomalyDetector::new(&enabled_config(), make_cache()).is_some());
}
#[tokio::test]
async fn test_no_anomaly_with_few_samples() {
let cache = make_cache();
let detector = AnomalyDetector::new(&enabled_config(), cache).unwrap();
let tid = TenantId::new();
for i in 0..9 {
let findings = detector
.record_and_check(tid, Some(i as f64), Some(100), Some(50))
.await;
assert!(findings.is_empty(), "Expected no findings with <10 samples");
}
}
#[tokio::test]
async fn test_no_anomaly_with_stable_data() {
let cache = make_cache();
let detector = AnomalyDetector::new(&enabled_config(), cache).unwrap();
let tid = TenantId::new();
for _ in 0..20 {
let findings = detector
.record_and_check(tid, Some(0.05), Some(500), Some(100))
.await;
assert!(
findings.is_empty(),
"Stable data should not trigger anomaly"
);
}
}
#[tokio::test]
async fn test_cost_spike_detected() {
let cache = make_cache();
let config = AnomalyDetectionConfig {
check_tokens: false,
check_velocity: false,
check_latency: false,
..enabled_config()
};
let detector = AnomalyDetector::new(&config, cache).unwrap();
let tid = TenantId::new();
let baseline = [
0.04, 0.05, 0.06, 0.05, 0.04, 0.05, 0.06, 0.05, 0.04, 0.05, 0.06, 0.05, 0.04, 0.05,
0.06, 0.05, 0.04, 0.05, 0.06, 0.05,
];
for &v in &baseline {
let _ = detector.record_and_check(tid, Some(v), None, None).await;
}
let findings = detector.record_and_check(tid, Some(50.0), None, None).await;
assert!(!findings.is_empty(), "Should detect cost spike");
assert_eq!(findings[0].finding_type, "anomaly_cost_spike");
assert!(
findings[0].severity >= SecuritySeverity::Medium,
"Severity should be at least Medium"
);
}
#[tokio::test]
async fn test_token_spike_detected() {
let cache = make_cache();
let config = AnomalyDetectionConfig {
check_cost: false,
check_velocity: false,
check_latency: false,
..enabled_config()
};
let detector = AnomalyDetector::new(&config, cache).unwrap();
let tid = TenantId::new();
let baseline: Vec<u32> = (0..20).map(|i| 480 + (i % 5) * 10).collect();
for v in &baseline {
let _ = detector.record_and_check(tid, None, Some(*v), None).await;
}
let findings = detector
.record_and_check(tid, None, Some(50000), None)
.await;
assert!(!findings.is_empty(), "Should detect token spike");
assert_eq!(findings[0].finding_type, "anomaly_token_spike");
}
#[tokio::test]
async fn test_latency_spike_detected() {
let cache = make_cache();
let config = AnomalyDetectionConfig {
check_cost: false,
check_tokens: false,
check_velocity: false,
..enabled_config()
};
let detector = AnomalyDetector::new(&config, cache).unwrap();
let tid = TenantId::new();
let baseline: Vec<u64> = (0..20).map(|i| 90 + (i % 5) * 5).collect();
for v in &baseline {
let _ = detector.record_and_check(tid, None, None, Some(*v)).await;
}
let findings = detector
.record_and_check(tid, None, None, Some(10000))
.await;
assert!(!findings.is_empty(), "Should detect latency spike");
assert_eq!(findings[0].finding_type, "anomaly_latency_spike");
}
#[tokio::test]
async fn test_disabled_metrics_not_checked() {
let cache = make_cache();
let config = AnomalyDetectionConfig {
check_cost: false,
check_tokens: false,
check_velocity: false,
check_latency: false,
..enabled_config()
};
let detector = AnomalyDetector::new(&config, cache).unwrap();
let tid = TenantId::new();
for _ in 0..20 {
let _ = detector
.record_and_check(tid, Some(0.05), Some(500), Some(100))
.await;
}
let findings = detector
.record_and_check(tid, Some(500.0), Some(999999), Some(999999))
.await;
assert!(
findings.is_empty(),
"All checks disabled should produce no findings"
);
}
#[tokio::test]
async fn test_tenant_isolation() {
let cache = make_cache();
let config = AnomalyDetectionConfig {
check_tokens: false,
check_velocity: false,
check_latency: false,
..enabled_config()
};
let detector = AnomalyDetector::new(&config, cache).unwrap();
let tid1 = TenantId::new();
let tid2 = TenantId::new();
let baseline = [
0.04, 0.05, 0.06, 0.05, 0.04, 0.05, 0.06, 0.05, 0.04, 0.05, 0.06, 0.05, 0.04, 0.05,
0.06, 0.05, 0.04, 0.05, 0.06, 0.05,
];
for &v in &baseline {
let _ = detector.record_and_check(tid1, Some(v), None, None).await;
}
let findings1 = detector
.record_and_check(tid1, Some(50.0), None, None)
.await;
assert!(!findings1.is_empty(), "Tenant 1 should see anomaly");
let findings2 = detector
.record_and_check(tid2, Some(50.0), None, None)
.await;
assert!(
findings2.is_empty(),
"Tenant 2 has no baseline, no anomaly expected"
);
}
#[tokio::test]
async fn test_finding_metadata() {
let cache = make_cache();
let config = AnomalyDetectionConfig {
check_tokens: false,
check_velocity: false,
check_latency: false,
..enabled_config()
};
let detector = AnomalyDetector::new(&config, cache).unwrap();
let tid = TenantId::new();
let baseline = [
0.9, 1.0, 1.1, 1.0, 0.9, 1.0, 1.1, 1.0, 0.9, 1.0, 1.1, 1.0, 0.9, 1.0, 1.1, 1.0, 0.9,
1.0, 1.1, 1.0,
];
for &v in &baseline {
let _ = detector.record_and_check(tid, Some(v), None, None).await;
}
let findings = detector
.record_and_check(tid, Some(100.0), None, None)
.await;
assert!(!findings.is_empty());
let f = &findings[0];
assert!(f.metadata.contains_key("sigma"));
assert!(f.metadata.contains_key("mean"));
assert!(f.metadata.contains_key("std_dev"));
assert!(f.metadata.contains_key("value"));
assert!(f.metadata.contains_key("anomaly_type"));
assert!(f.location.is_some());
assert!(f.description.contains("cost_spike"));
}
#[tokio::test]
async fn test_severity_escalation() {
let cache = make_cache();
let config = AnomalyDetectionConfig {
check_tokens: false,
check_velocity: false,
check_latency: false,
..enabled_config()
};
let detector = AnomalyDetector::new(&config, cache).unwrap();
let tid = TenantId::new();
for i in 0..20 {
let v = 1.0 + (i as f64 % 3.0) * 0.005;
let _ = detector.record_and_check(tid, Some(v), None, None).await;
}
let findings = detector
.record_and_check(tid, Some(100.0), None, None)
.await;
assert!(!findings.is_empty(), "Should detect extreme spike");
assert!(
findings[0].severity >= SecuritySeverity::High,
"Large spike should be High or Critical"
);
}
#[test]
fn test_cache_key_format() {
let tid = TenantId::new();
let key = cache_key(tid, &AnomalyType::CostSpike);
assert!(key.starts_with("anomaly:"));
assert!(key.contains("cost_spike"));
}
#[test]
fn test_build_finding_fields() {
let tid = TenantId::new();
let f = build_finding(
&AnomalyType::CostSpike,
SecuritySeverity::High,
50.0,
5.0,
2.0,
22.5,
tid,
);
assert_eq!(f.finding_type, "anomaly_cost_spike");
assert_eq!(f.severity, SecuritySeverity::High);
assert!(f.requires_alert);
assert!(f.confidence_score >= 0.5);
assert!(f.confidence_score <= 1.0);
}
}