#![cfg(feature = "std")]
use std::sync::Arc;
use crate::error::{RcfError, RcfResult};
use crate::metrics::{MetricsSink, default_sink, names};
use crate::tdigest::{DEFAULT_COMPRESSION, TDigest};
pub const DEFAULT_QUANTILE: f64 = 0.98;
pub const DEFAULT_ALERT_P: f64 = 1.0e-3;
pub const MIN_PEAKS_FOR_FIT: usize = 16;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PotDetector {
digest: TDigest,
peak_count: u64,
peak_mean: f64,
peak_m2: f64,
frozen_u: Option<f64>,
q: f64,
total_seen: u64,
#[cfg_attr(
feature = "serde",
serde(skip, default = "crate::metrics::default_sink")
)]
metrics: Arc<dyn MetricsSink>,
}
impl PotDetector {
pub fn new(q: f64) -> RcfResult<Self> {
if !q.is_finite() || !(0.0..1.0).contains(&q) || q <= 0.0 {
return Err(RcfError::InvalidConfig(
format!("PotDetector: q must be in (0.0, 1.0), got {q}").into(),
));
}
Ok(Self {
digest: TDigest::new(DEFAULT_COMPRESSION)?,
peak_count: 0,
peak_mean: 0.0,
peak_m2: 0.0,
frozen_u: None,
q,
total_seen: 0,
metrics: default_sink(),
})
}
#[must_use]
pub fn default_spot() -> Self {
Self {
digest: TDigest::with_default_compression(),
peak_count: 0,
peak_mean: 0.0,
peak_m2: 0.0,
frozen_u: None,
q: DEFAULT_QUANTILE,
total_seen: 0,
metrics: default_sink(),
}
}
#[must_use]
pub fn with_metrics_sink(mut self, sink: Arc<dyn MetricsSink>) -> Self {
self.metrics = sink;
self
}
#[must_use]
pub fn metrics_sink(&self) -> &Arc<dyn MetricsSink> {
&self.metrics
}
#[must_use]
pub fn quantile_target(&self) -> f64 {
self.q
}
#[must_use]
pub fn is_frozen(&self) -> bool {
self.frozen_u.is_some()
}
#[must_use]
pub fn total_seen(&self) -> u64 {
self.total_seen
}
#[must_use]
pub fn peak_count(&self) -> u64 {
self.peak_count
}
pub fn record(&mut self, value: f64) {
if !value.is_finite() {
return;
}
self.total_seen = self.total_seen.saturating_add(1);
self.metrics.inc_counter(names::SPOT_OBSERVATIONS_TOTAL, 1);
self.digest.record(value);
if let Some(u) = self.current_u() {
let excess = value - u;
if excess > 0.0 {
self.peak_count = self.peak_count.saturating_add(1);
self.metrics.inc_counter(names::SPOT_PEAKS_TOTAL, 1);
#[allow(clippy::cast_precision_loss)]
let n = self.peak_count as f64;
let delta = excess - self.peak_mean;
self.peak_mean += delta / n;
let delta2 = excess - self.peak_mean;
self.peak_m2 += delta * delta2;
}
}
}
pub fn freeze_baseline(&mut self) -> RcfResult<()> {
let Some(u) = self.digest.quantile(self.q) else {
return Err(RcfError::EmptyForest);
};
self.frozen_u = Some(u);
self.peak_count = 0;
self.peak_mean = 0.0;
self.peak_m2 = 0.0;
Ok(())
}
#[must_use]
pub fn p_value(&self, value: f64) -> f64 {
if !value.is_finite() {
return 1.0;
}
let Some(u) = self.current_u() else {
return 1.0;
};
let excess = value - u;
if excess <= 0.0 {
return 1.0;
}
#[allow(clippy::cast_precision_loss)]
let tail_prob = 1.0 - self.q;
if self.peak_count < MIN_PEAKS_FOR_FIT as u64 {
#[allow(clippy::cast_precision_loss)]
let floor = 1.0 / (self.total_seen.max(1) as f64);
return floor.max(tail_prob);
}
let (gamma, sigma) = self.gpd_mom();
if sigma <= 0.0 || !sigma.is_finite() || !gamma.is_finite() {
return tail_prob.clamp(0.0, 1.0);
}
let cond = if gamma.abs() < 1.0e-9 {
(-excess / sigma).exp()
} else {
let inner = 1.0 + gamma * (excess / sigma);
if inner <= 0.0 {
0.0
} else {
inner.powf(-1.0 / gamma)
}
};
let p = tail_prob * cond;
#[allow(clippy::cast_precision_loss)]
let floor = 1.0 / (self.total_seen.max(1) as f64 * 1_000.0);
p.max(floor).min(1.0)
}
#[must_use]
pub fn is_anomaly(&self, value: f64, alert_p: f64) -> bool {
self.p_value(value) < alert_p
}
fn current_u(&self) -> Option<f64> {
if let Some(u) = self.frozen_u {
return Some(u);
}
let mut scratch = self.digest.clone();
scratch.quantile(self.q)
}
fn gpd_mom(&self) -> (f64, f64) {
let n = self.peak_count;
if n < 2 {
return (0.0, 0.0);
}
#[allow(clippy::cast_precision_loss)]
let n_f = n as f64;
let variance = self.peak_m2 / (n_f - 1.0);
if variance <= 0.0 || self.peak_mean <= 0.0 {
return (0.0, 0.0);
}
let mean_sq = self.peak_mean * self.peak_mean;
let gamma = 0.5 * (1.0 - mean_sq / variance);
let gamma = gamma.clamp(-5.0, 0.4999);
let sigma = self.peak_mean * (1.0 + gamma);
(gamma, sigma)
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::panic,
clippy::float_cmp,
clippy::cast_precision_loss,
clippy::cast_lossless
)]
mod tests {
use super::*;
#[test]
fn new_rejects_out_of_range_q() {
assert!(PotDetector::new(0.0).is_err());
assert!(PotDetector::new(1.0).is_err());
assert!(PotDetector::new(f64::NAN).is_err());
assert!(PotDetector::new(-0.1).is_err());
}
#[test]
fn record_ignores_non_finite() {
let mut d = PotDetector::default_spot();
d.record(f64::NAN);
d.record(f64::INFINITY);
assert_eq!(d.total_seen(), 0);
}
#[test]
fn p_value_below_threshold_returns_one() {
let mut d = PotDetector::new(0.9).unwrap();
for i in 0..1000 {
d.record(i as f64 * 0.001);
}
d.freeze_baseline().unwrap();
assert_eq!(d.p_value(-1.0), 1.0);
}
#[test]
fn p_value_above_threshold_is_smaller_than_one() {
let mut d = PotDetector::new(0.9).unwrap();
for i in 0..1000 {
d.record(i as f64 * 0.001);
}
d.freeze_baseline().unwrap();
for i in 0..200 {
d.record(0.95 + i as f64 * 0.0005);
}
let p = d.p_value(10.0);
assert!(p < 1.0);
assert!(p > 0.0);
}
#[test]
fn heavier_outlier_gets_smaller_p_value() {
let mut d = PotDetector::new(0.9).unwrap();
for i in 0..1000 {
d.record(i as f64 * 0.001);
}
d.freeze_baseline().unwrap();
for i in 0..200 {
d.record(0.95 + i as f64 * 0.0005);
}
let mild = d.p_value(1.1);
let heavy = d.p_value(10.0);
assert!(heavy <= mild);
}
#[test]
fn freeze_baseline_errors_on_empty_digest() {
let mut d = PotDetector::default_spot();
assert!(matches!(
d.freeze_baseline().unwrap_err(),
RcfError::EmptyForest
));
}
#[test]
fn is_anomaly_thresholds_on_alert_p() {
let mut d = PotDetector::new(0.9).unwrap();
for i in 0..1000 {
d.record(i as f64 * 0.001);
}
d.freeze_baseline().unwrap();
for i in 0..200 {
d.record(0.95 + i as f64 * 0.0005);
}
assert!(!d.is_anomaly(0.5, 1.0e-3));
}
}