use std::collections::VecDeque;
use crate::error::{AnomalyError, AnomalyResult};
use crate::handle::LcgRng;
#[derive(Debug, Clone)]
pub struct ConformalConfig {
pub significance: f64,
pub smoothing: bool,
}
impl Default for ConformalConfig {
fn default() -> Self {
Self {
significance: 0.05,
smoothing: false,
}
}
}
#[derive(Debug, Clone)]
pub struct ConformalDetector {
pub calibration_scores: Vec<f64>,
pub config: ConformalConfig,
}
#[derive(Debug, Clone)]
pub struct OnlineConformalDetector {
pub window: VecDeque<f64>,
pub window_size: usize,
pub config: ConformalConfig,
}
#[derive(Debug, Clone)]
pub struct ConformalResult {
pub p_values: Vec<f64>,
pub labels: Vec<i32>,
pub n_anomalies: usize,
}
#[must_use]
pub fn conformal_p_value(calibration_scores: &[f64], test_score: f64) -> f64 {
let m = calibration_scores.len();
if m == 0 {
return 0.0;
}
let count_ge = calibration_scores
.iter()
.filter(|&&s| s >= test_score)
.count();
count_ge as f64 / (m + 1) as f64
}
#[must_use]
pub fn conformal_p_value_smoothed(
calibration_scores: &[f64],
test_score: f64,
rng: &mut LcgRng,
) -> f64 {
let m = calibration_scores.len();
if m == 0 {
return rng.next_f32() as f64 / (1.0 + 1.0); }
let count_gt = calibration_scores
.iter()
.filter(|&&s| s > test_score)
.count();
let count_eq = calibration_scores
.iter()
.filter(|&&s| s == test_score)
.count();
let u = rng.next_f32() as f64;
(count_gt as f64 + u * count_eq as f64 + u) / (m + 1) as f64
}
pub fn conformal_calibrate(scores: Vec<f64>, cfg: ConformalConfig) -> ConformalDetector {
ConformalDetector {
calibration_scores: scores,
config: cfg,
}
}
pub fn conformal_predict(
detector: &ConformalDetector,
test_scores: &[f64],
) -> AnomalyResult<ConformalResult> {
if test_scores.is_empty() {
return Err(AnomalyError::EmptyInput);
}
if detector.calibration_scores.is_empty() {
return Err(AnomalyError::InsufficientSamples { need: 1, got: 0 });
}
let eps = detector.config.significance;
let mut p_values = Vec::with_capacity(test_scores.len());
let mut labels = Vec::with_capacity(test_scores.len());
if detector.config.smoothing {
let mut rng = LcgRng::new(0xDEAD_BEEF_CAFE_BABE);
for &ts in test_scores {
let pv = conformal_p_value_smoothed(&detector.calibration_scores, ts, &mut rng);
let label = if pv < eps { -1 } else { 1 };
p_values.push(pv);
labels.push(label);
}
} else {
for &ts in test_scores {
let pv = conformal_p_value(&detector.calibration_scores, ts);
let label = if pv < eps { -1 } else { 1 };
p_values.push(pv);
labels.push(label);
}
}
let n_anomalies = labels.iter().filter(|&&l| l == -1).count();
Ok(ConformalResult {
p_values,
labels,
n_anomalies,
})
}
pub fn mondrian_conformal_predict(
calibration_scores: &[f64],
calibration_labels: &[usize],
test_scores: &[f64],
test_labels: &[usize],
significance: f64,
) -> AnomalyResult<ConformalResult> {
if calibration_scores.is_empty() || test_scores.is_empty() {
return Err(AnomalyError::EmptyInput);
}
if calibration_scores.len() != calibration_labels.len() {
return Err(AnomalyError::DimensionMismatch {
expected: calibration_scores.len(),
got: calibration_labels.len(),
});
}
if test_scores.len() != test_labels.len() {
return Err(AnomalyError::DimensionMismatch {
expected: test_scores.len(),
got: test_labels.len(),
});
}
let n_classes = calibration_labels.iter().copied().max().unwrap_or(0) + 1;
let mut class_scores: Vec<Vec<f64>> = vec![Vec::new(); n_classes];
for (i, &label) in calibration_labels.iter().enumerate() {
class_scores[label].push(calibration_scores[i]);
}
let mut p_values = Vec::with_capacity(test_scores.len());
let mut labels = Vec::with_capacity(test_scores.len());
for (i, &ts) in test_scores.iter().enumerate() {
let class = test_labels[i];
let cal_slice: &[f64] = if class < class_scores.len() {
&class_scores[class]
} else {
&[]
};
let pv = conformal_p_value(cal_slice, ts);
let label = if pv < significance { -1 } else { 1 };
p_values.push(pv);
labels.push(label);
}
let n_anomalies = labels.iter().filter(|&&l| l == -1).count();
Ok(ConformalResult {
p_values,
labels,
n_anomalies,
})
}
#[must_use]
pub fn online_conformal_detector_new(
window_size: usize,
cfg: ConformalConfig,
) -> OnlineConformalDetector {
OnlineConformalDetector {
window: VecDeque::with_capacity(window_size),
window_size,
config: cfg,
}
}
pub fn online_conformal_update(detector: &mut OnlineConformalDetector, score: f64) -> (f64, i32) {
let window_slice: Vec<f64> = detector.window.iter().copied().collect();
let pv = if window_slice.is_empty() {
0.0
} else {
conformal_p_value(&window_slice, score)
};
let label = if pv < detector.config.significance {
-1_i32
} else {
1_i32
};
if detector.window.len() >= detector.window_size && detector.window_size > 0 {
detector.window.pop_front();
}
if detector.window_size > 0 {
detector.window.push_back(score);
}
(pv, label)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::AnomalyResult;
#[test]
fn conformal_p_value_basic() {
let cal = vec![1.0_f64, 2.0, 3.0, 4.0, 5.0];
let p = conformal_p_value(&cal, 5.0);
let expected = 1.0 / 6.0;
assert!((p - expected).abs() < 1e-10, "p={p} expected={expected}");
}
#[test]
fn conformal_p_value_low() {
let cal: Vec<f64> = (0..100).map(|i| i as f64).collect();
let p = conformal_p_value(&cal, 1_000_000.0);
assert_eq!(p, 0.0, "p should be exactly 0, got {p}");
}
#[test]
fn conformal_p_value_range() {
let mut rng = LcgRng::new(42);
let cal: Vec<f64> = (0..50).map(|_| rng.next_f32() as f64 * 10.0).collect();
for _ in 0..200 {
let ts = rng.next_f32() as f64 * 20.0; let p = conformal_p_value(&cal, ts);
assert!((0.0..=1.0).contains(&p), "p={p} outside [0,1] for ts={ts}");
}
}
#[test]
fn conformal_predict_shape() -> AnomalyResult<()> {
let cal: Vec<f64> = (0..30).map(|i| i as f64).collect();
let cfg = ConformalConfig::default();
let detector = conformal_calibrate(cal, cfg);
let test_scores = vec![5.0_f64, 15.0, 100.0, -1.0, 0.0];
let result = conformal_predict(&detector, &test_scores)?;
assert_eq!(result.labels.len(), test_scores.len());
assert_eq!(result.p_values.len(), test_scores.len());
Ok(())
}
#[test]
fn conformal_false_positive_rate() -> AnomalyResult<()> {
let significance = 0.05;
let m = 500_usize; let n_test = 500_usize;
let mut rng = LcgRng::new(0xCAFE_DEAD);
let cal: Vec<f64> = (0..m).map(|_| rng.next_f32() as f64).collect();
let test: Vec<f64> = (0..n_test).map(|_| rng.next_f32() as f64).collect();
let cfg = ConformalConfig {
significance,
smoothing: false,
};
let det = conformal_calibrate(cal, cfg);
let result = conformal_predict(&det, &test)?;
let fpr = result.n_anomalies as f64 / n_test as f64;
assert!(
fpr <= significance + 0.04,
"FPR={fpr:.4} too high (significance={significance})"
);
Ok(())
}
#[test]
fn conformal_anomaly_detected() -> AnomalyResult<()> {
let cal: Vec<f64> = (0..100).map(|i| i as f64 / 100.0).collect();
let cfg = ConformalConfig {
significance: 0.05,
smoothing: false,
};
let det = conformal_calibrate(cal, cfg);
let result = conformal_predict(&det, &[100.0_f64])?;
assert_eq!(result.labels[0], -1, "outlier must be labelled -1");
assert_eq!(result.p_values[0], 0.0);
Ok(())
}
#[test]
fn mondrian_per_class() -> AnomalyResult<()> {
let mut cal_scores = Vec::new();
let mut cal_labels = Vec::new();
for i in 0..20_usize {
cal_scores.push(i as f64 / 20.0);
cal_labels.push(0_usize);
}
for i in 0..20_usize {
cal_scores.push(10.0 + i as f64 / 20.0);
cal_labels.push(1_usize);
}
let test_scores = vec![0.5_f64, 100.0];
let test_labels = vec![0_usize, 1_usize];
let result =
mondrian_conformal_predict(&cal_scores, &cal_labels, &test_scores, &test_labels, 0.05)?;
assert!(
result.p_values[0] > 0.0,
"class-0 normal point p={}",
result.p_values[0]
);
assert_eq!(result.labels[1], -1, "class-1 outlier must be -1");
assert_eq!(result.p_values[1], 0.0);
Ok(())
}
#[test]
fn online_detector_window_fills() {
let cfg = ConformalConfig::default();
let mut det = online_conformal_detector_new(10, cfg);
assert_eq!(det.window.len(), 0);
for i in 0..10_usize {
online_conformal_update(&mut det, i as f64);
}
assert_eq!(
det.window.len(),
10,
"window should be full after 10 updates"
);
}
#[test]
fn online_detector_rolling() {
let cfg = ConformalConfig::default();
let window_size = 5_usize;
let mut det = online_conformal_detector_new(window_size, cfg);
for i in 0..window_size {
online_conformal_update(&mut det, i as f64);
}
online_conformal_update(&mut det, 100.0);
assert_eq!(
det.window.len(),
window_size,
"window size must stay constant"
);
let front = *det
.window
.front()
.expect("window is non-empty after eviction");
assert!(
(front - 1.0).abs() < 1e-10,
"oldest entry should be 1.0 after eviction, got {front}"
);
}
#[test]
fn online_detector_alarm() {
let cfg = ConformalConfig {
significance: 0.05,
smoothing: false,
};
let mut det = online_conformal_detector_new(50, cfg);
let mut rng = LcgRng::new(0xBEEF_CAFE);
for _ in 0..50 {
let v = rng.next_normal() as f64 * 0.1;
online_conformal_update(&mut det, v);
}
let (p, label) = online_conformal_update(&mut det, 1000.0);
assert_eq!(label, -1, "extreme score must trigger alarm, p={p}");
assert_eq!(p, 0.0, "p-value must be 0 for extreme outlier");
}
#[test]
fn conformal_smoothed_in_range() {
let mut rng_data = LcgRng::new(777);
let cal: Vec<f64> = (0..100).map(|_| rng_data.next_f32() as f64 * 5.0).collect();
let mut rng_smooth = LcgRng::new(999);
for _ in 0..200 {
let ts = rng_data.next_f32() as f64 * 10.0;
let p = conformal_p_value_smoothed(&cal, ts, &mut rng_smooth);
assert!((0.0..=1.0).contains(&p), "smoothed p={p} outside [0,1]");
}
}
#[test]
fn mondrian_unseen_class() -> AnomalyResult<()> {
let cal_scores = vec![0.1_f64, 0.2, 0.3];
let cal_labels = vec![0_usize, 0, 0];
let result =
mondrian_conformal_predict(&cal_scores, &cal_labels, &[0.5_f64], &[99_usize], 0.05)?;
assert_eq!(result.p_values[0], 0.0, "unseen class → p=0");
assert_eq!(result.labels[0], -1);
Ok(())
}
#[test]
fn conformal_predict_empty_test_error() {
let cal = vec![1.0_f64, 2.0, 3.0];
let cfg = ConformalConfig::default();
let det = conformal_calibrate(cal, cfg);
let err = conformal_predict(&det, &[]).unwrap_err();
assert!(
matches!(err, crate::error::AnomalyError::EmptyInput),
"expected EmptyInput, got {err:?}"
);
}
#[test]
fn conformal_n_anomalies_consistent() -> AnomalyResult<()> {
let cal: Vec<f64> = (0..50).map(|i| i as f64).collect();
let cfg = ConformalConfig::default();
let det = conformal_calibrate(cal, cfg);
let test = vec![10.0_f64, 1000.0, 25.0, 999.0, 5.0];
let result = conformal_predict(&det, &test)?;
let counted = result.labels.iter().filter(|&&l| l == -1).count();
assert_eq!(result.n_anomalies, counted);
Ok(())
}
}