use crate::config::DetectConfig;
use crate::dist::chi2_pvalue;
use crate::{linalg, Detector, Report, ScanContext};
use ax_core::finding::Handle;
use ax_core::{det, AnomalyClass, Finding, RecordSet};
#[derive(Debug, Default, Clone)]
pub struct MahalanobisDetector;
struct Features {
matrix: Vec<Vec<f64>>,
rows: Vec<usize>,
dim: usize,
}
impl MahalanobisDetector {
fn extract(current: &RecordSet) -> Option<Features> {
let feats: Vec<&Vec<ax_core::Value>> = current
.columns
.iter()
.filter(|c| c.ty.is_numeric())
.map(|c| &c.cells)
.collect();
let dim = feats.len();
if dim < 2 {
return None;
}
let mut matrix = Vec::new();
let mut rows = Vec::new();
for r in 0..current.rows() {
let mut row = Vec::with_capacity(dim);
let complete = feats.iter().all(|cells| match cells[r].as_f64() {
Some(v) if v.is_finite() => {
row.push(v);
true
}
_ => false,
});
if complete {
matrix.push(row);
rows.push(r);
}
}
Some(Features { matrix, rows, dim })
}
fn mean(f: &Features) -> Vec<f64> {
(0..f.dim)
.map(|j| {
let col: Vec<f64> = f.matrix.iter().map(|row| row[j]).collect();
det::mean(&col).unwrap_or(0.0)
})
.collect()
}
fn covariance(f: &Features, mean: &[f64], ridge: f64) -> Vec<Vec<f64>> {
let n = f.matrix.len();
let d = f.dim;
let centered: Vec<Vec<f64>> = f
.matrix
.iter()
.map(|row| (0..d).map(|j| row[j] - mean[j]).collect())
.collect();
let mut cov = vec![vec![0.0_f64; d]; d];
for j in 0..d {
for k in j..d {
let prods: Vec<f64> = centered.iter().map(|c| c[j] * c[k]).collect();
let c = det::det_sum(&prods) / (n - 1) as f64;
cov[j][k] = c;
cov[k][j] = c;
}
}
let trace: f64 = det::det_sum(&(0..d).map(|j| cov[j][j]).collect::<Vec<_>>());
let eps = ridge * (trace / d as f64);
for (j, row) in cov.iter_mut().enumerate() {
row[j] += eps;
}
cov
}
}
impl Detector for MahalanobisDetector {
fn id(&self) -> &'static str {
"mv.mahalanobis"
}
fn class(&self) -> AnomalyClass {
AnomalyClass::Multivariate
}
fn detect(&self, ctx: &ScanContext, cfg: &DetectConfig, out: &mut Report) {
let Some(f) = Self::extract(ctx.current) else {
out.mark_absent(
self.id(),
"needs at least 2 numeric columns for a multivariate distance",
);
return;
};
if f.matrix.len() < cfg.mv_min_n {
out.mark_absent(
self.id(),
format!(
"fewer than {} complete numeric rows to estimate a covariance",
cfg.mv_min_n
),
);
return;
}
let mean = Self::mean(&f);
let cov = Self::covariance(&f, &mean, cfg.mv_ridge);
let Some(chol) = linalg::cholesky(&cov) else {
out.mark_absent(
self.id(),
"covariance is singular (constant or collinear columns)",
);
return;
};
for (i, &row) in f.rows.iter().enumerate() {
let dvec: Vec<f64> = (0..f.dim).map(|j| f.matrix[i][j] - mean[j]).collect();
let dsq = linalg::mahalanobis_sq(&chol, &dvec);
let p = chi2_pvalue(dsq, f.dim);
if p < cfg.mv_alpha {
out.push(Finding::new(
self.id(),
AnomalyClass::Multivariate,
Handle::Row { row },
1.0 - p,
dsq,
format!(
"row {row}: Mahalanobis D²={dsq:.3} over {} numeric columns, p={p:.3e} < α={:.3e} — joint outlier",
f.dim, cfg.mv_alpha
),
));
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ax_core::{Column, Value};
use proptest::prelude::*;
fn corpus(xs: &[f64], ys: &[f64]) -> RecordSet {
RecordSet::new(
"-",
"t",
vec![
Column::new("x", xs.iter().map(|&v| Value::Float(v)).collect()),
Column::new("y", ys.iter().map(|&v| Value::Float(v)).collect()),
],
)
}
fn run(rs: &RecordSet) -> Report {
let mut out = Report::new();
MahalanobisDetector.detect(&ScanContext::single(rs), &DetectConfig::default(), &mut out);
out
}
fn correlated(n: usize) -> (Vec<f64>, Vec<f64>) {
let xs: Vec<f64> = (0..n).map(|i| (i % 10) as f64).collect();
let ys: Vec<f64> = (0..n)
.map(|i| (i % 10) as f64 + ((i / 10) % 2) as f64 * 0.4)
.collect();
(xs, ys)
}
#[test]
fn needs_two_numeric_columns() {
let rs = RecordSet::new(
"-",
"t",
vec![Column::new("x", (0..30).map(Value::Int).collect())],
);
let report = run(&rs);
assert!(report.findings.is_empty());
assert_eq!(report.absent.len(), 1);
assert_eq!(report.absent[0].detector, "mv.mahalanobis");
}
#[test]
fn too_few_complete_rows_is_absent() {
let (xs, ys) = correlated(10); let report = run(&corpus(&xs, &ys));
assert!(report.findings.is_empty());
assert_eq!(report.absent.len(), 1);
}
#[test]
fn constant_columns_are_absent_singular() {
let xs = vec![5.0; 30];
let ys = vec![7.0; 30];
let report = run(&corpus(&xs, &ys));
assert!(report.findings.is_empty());
assert_eq!(report.absent.len(), 1);
assert!(report.absent[0].reason.contains("singular"));
}
#[test]
fn clean_correlated_cloud_has_no_findings() {
let (xs, ys) = correlated(40);
let report = run(&corpus(&xs, &ys));
assert!(
report.findings.is_empty(),
"no joint outliers in a tidy cloud"
);
assert!(
report.absent.is_empty(),
"the detector ran; it is not absent"
);
}
#[test]
fn joint_outlier_is_flagged_though_each_axis_is_in_range() {
let (mut xs, mut ys) = correlated(40);
xs.push(0.0);
ys.push(9.0);
let report = run(&corpus(&xs, &ys));
assert_eq!(report.findings.len(), 1);
assert!(matches!(report.findings[0].handle, Handle::Row { row: 40 }));
assert_eq!(report.findings[0].class, AnomalyClass::Multivariate);
assert!(report.findings[0].confidence < 1.0);
assert!(report.findings[0].confidence > 0.99);
}
#[test]
fn covariance_is_exact() {
let f = Features {
matrix: vec![vec![1.0, 1.0], vec![2.0, 3.0], vec![6.0, 5.0]],
rows: vec![0, 1, 2],
dim: 2,
};
let mean = MahalanobisDetector::mean(&f);
assert_eq!(mean, vec![3.0, 3.0]);
let cov = MahalanobisDetector::covariance(&f, &mean, 0.0);
assert!((cov[0][0] - 7.0).abs() < 1e-12, "var_x");
assert!((cov[1][1] - 4.0).abs() < 1e-12, "var_y");
assert!((cov[0][1] - 5.0).abs() < 1e-12, "cov_xy");
assert_eq!(cov[0][1], cov[1][0], "symmetric");
}
#[test]
fn covariance_ridge_adds_scaled_diagonal() {
let f = Features {
matrix: vec![vec![1.0, 1.0], vec![2.0, 3.0], vec![6.0, 5.0]],
rows: vec![0, 1, 2],
dim: 2,
};
let mean = MahalanobisDetector::mean(&f);
let cov = MahalanobisDetector::covariance(&f, &mean, 1.0);
assert!((cov[0][0] - 12.5).abs() < 1e-12);
assert!((cov[1][1] - 9.5).abs() < 1e-12);
assert!((cov[0][1] - 5.0).abs() < 1e-12);
}
#[test]
fn runs_at_exactly_min_n_complete_rows() {
let n = DetectConfig::default().mv_min_n; let (xs, ys) = correlated(n);
let report = run(&corpus(&xs, &ys));
assert!(
report.absent.is_empty(),
"exactly min_n rows must be assessed"
);
}
#[test]
fn rows_with_non_finite_values_are_excluded() {
let (xs, mut ys) = correlated(20);
ys[0] = f64::NAN;
let report = run(&corpus(&xs, &ys));
assert!(report.findings.is_empty());
assert_eq!(report.absent.len(), 1, "incomplete row must not be counted");
assert!(report.absent[0].reason.contains("complete"));
}
#[test]
fn deterministic_across_runs() {
let (mut xs, mut ys) = correlated(40);
xs.push(0.0);
ys.push(9.0);
let rs = corpus(&xs, &ys);
let a = run(&rs);
let b = run(&rs);
assert_eq!(
serde_json::to_string(&a.findings).unwrap(),
serde_json::to_string(&b.findings).unwrap()
);
}
proptest! {
#[test]
fn translation_invariant(dx in -1e4f64..1e4, dy in -1e4f64..1e4) {
let (mut xs, mut ys) = correlated(40);
xs.push(0.0);
ys.push(9.0);
let base: Vec<usize> = run(&corpus(&xs, &ys))
.findings
.iter()
.map(|f| match f.handle { Handle::Row { row } => row, _ => unreachable!() })
.collect();
let sx: Vec<f64> = xs.iter().map(|v| v + dx).collect();
let sy: Vec<f64> = ys.iter().map(|v| v + dy).collect();
let shifted: Vec<usize> = run(&corpus(&sx, &sy))
.findings
.iter()
.map(|f| match f.handle { Handle::Row { row } => row, _ => unreachable!() })
.collect();
prop_assert_eq!(base, shifted);
}
}
}