use crate::error::{AnomalyError, AnomalyResult};
use crate::handle::LcgRng;
#[derive(Debug, Clone)]
pub struct StreamHash {
pub k: usize,
pub d: usize,
signs: Vec<f32>,
scale: f32,
}
impl StreamHash {
#[must_use]
pub fn new(d: usize, k: usize, density: f32, seed: u64) -> Self {
let mut signs = vec![0.0_f32; k * d];
for ki in 0..k {
for j in 0..d {
let mut local = LcgRng::new(
seed ^ ((ki as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15))
^ ((j as u64).wrapping_mul(0xC2B2_AE3D_27D4_EB4F)),
);
let u = local.next_f32();
let s = if u < density * 0.5 {
1.0_f32
} else if u < density {
-1.0_f32
} else {
0.0_f32
};
signs[ki * d + j] = s;
}
}
let scale = 1.0_f32 / (density * d as f32).max(1e-12).sqrt();
Self { k, d, signs, scale }
}
#[must_use]
pub fn project(&self, x: &[f32]) -> Vec<f32> {
let used = x.len().min(self.d);
(0..self.k)
.map(|ki| {
let row = &self.signs[ki * self.d..ki * self.d + used];
let acc: f32 = row.iter().zip(x.iter()).map(|(s, xi)| s * xi).sum();
acc * self.scale
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct HalfSpaceChain {
feature: Vec<usize>,
shift: Vec<f32>,
counts: Vec<Vec<(Vec<i64>, u64)>>,
depth: usize,
k: usize,
}
impl HalfSpaceChain {
fn new(k: usize, depth: usize, rng: &mut LcgRng) -> Self {
let feature: Vec<usize> = (0..depth).map(|_| rng.next_usize(k.max(1))).collect();
let shift: Vec<f32> = (0..k).map(|_| rng.next_f32()).collect();
Self {
feature,
shift,
counts: vec![Vec::new(); depth],
depth,
k,
}
}
fn bin_prefixes(&self, z: &[f32]) -> Vec<Vec<i64>> {
let mut splits = vec![0u32; self.k];
let mut prefix: Vec<i64> = Vec::with_capacity(self.depth);
let mut levels: Vec<Vec<i64>> = Vec::with_capacity(self.depth);
for &f in &self.feature {
splits[f] += 1;
let factor = (1u64 << splits[f]) as f32;
let shifted = z[f] * factor + self.shift[f];
let bin = shifted.floor() as i64;
prefix.push(bin);
levels.push(prefix.clone());
}
levels
}
fn add(&mut self, z: &[f32]) {
let levels = self.bin_prefixes(z);
for (l, key) in levels.into_iter().enumerate() {
increment(&mut self.counts[l], key);
}
}
fn level_counts(&self, z: &[f32]) -> Vec<u64> {
let levels = self.bin_prefixes(z);
levels
.into_iter()
.enumerate()
.map(|(l, key)| lookup(&self.counts[l], &key))
.collect()
}
fn min_scaled_density(&self, z: &[f32], query_self: bool) -> f32 {
let counts = self.level_counts(z);
let mut best = f32::INFINITY;
for (l, c) in counts.into_iter().enumerate() {
let adj = if query_self { c.saturating_sub(1) } else { c };
let scale = (1u64 << (l + 1)) as f32; let density = scale * adj as f32;
if density < best {
best = density;
}
}
if best.is_finite() { best } else { 0.0 }
}
}
fn increment(map: &mut Vec<(Vec<i64>, u64)>, key: Vec<i64>) {
for (k, c) in map.iter_mut() {
if *k == key {
*c += 1;
return;
}
}
map.push((key, 1));
}
fn lookup(map: &[(Vec<i64>, u64)], key: &[i64]) -> u64 {
for (k, c) in map {
if k.as_slice() == key {
return *c;
}
}
0
}
#[derive(Debug, Clone)]
pub struct XStreamConfig {
pub n_components: usize,
pub n_chains: usize,
pub depth: usize,
pub projection_density: f32,
pub seed: u64,
}
impl Default for XStreamConfig {
fn default() -> Self {
Self {
n_components: 100,
n_chains: 50,
depth: 15,
projection_density: 1.0 / 3.0,
seed: 42,
}
}
}
pub struct XStream {
config: XStreamConfig,
hash: Option<StreamHash>,
chains: Vec<HalfSpaceChain>,
n_features: usize,
fitted: bool,
}
impl XStream {
#[must_use]
pub fn new(config: XStreamConfig) -> Self {
Self {
config,
hash: None,
chains: Vec::new(),
n_features: 0,
fitted: false,
}
}
pub fn fit(&mut self, data: &[f32], n_samples: usize, n_features: usize) -> AnomalyResult<()> {
if n_samples == 0 {
return Err(AnomalyError::EmptyInput);
}
if n_features == 0 {
return Err(AnomalyError::InvalidFeatureCount { n: 0 });
}
if data.len() != n_samples * n_features {
return Err(AnomalyError::DimensionMismatch {
expected: n_samples * n_features,
got: data.len(),
});
}
if self.config.n_components == 0 {
return Err(AnomalyError::Internal {
msg: "n_components must be > 0".into(),
});
}
if self.config.n_chains == 0 {
return Err(AnomalyError::Internal {
msg: "n_chains must be > 0".into(),
});
}
if self.config.depth == 0 {
return Err(AnomalyError::Internal {
msg: "depth must be > 0".into(),
});
}
let hash = StreamHash::new(
n_features,
self.config.n_components,
self.config.projection_density.clamp(1e-6, 1.0),
self.config.seed,
);
let mut rng = LcgRng::new(self.config.seed ^ 0xDEAD_BEEF_CAFE_F00D);
let mut chains = Vec::with_capacity(self.config.n_chains);
for _ in 0..self.config.n_chains {
chains.push(HalfSpaceChain::new(
self.config.n_components,
self.config.depth,
&mut rng,
));
}
for i in 0..n_samples {
let row = &data[i * n_features..(i + 1) * n_features];
let z = hash.project(row);
for chain in chains.iter_mut() {
chain.add(&z);
}
}
self.hash = Some(hash);
self.chains = chains;
self.n_features = n_features;
self.fitted = true;
Ok(())
}
pub fn partial_fit(&mut self, x: &[f32]) -> AnomalyResult<()> {
let hash = self.hash.as_ref().ok_or(AnomalyError::NotFitted)?;
let z = hash.project(x);
for chain in self.chains.iter_mut() {
chain.add(&z);
}
Ok(())
}
pub fn score(&self, x: &[f32]) -> AnomalyResult<f32> {
self.score_internal(x, false)
}
pub fn score_in_sample(&self, x: &[f32]) -> AnomalyResult<f32> {
self.score_internal(x, true)
}
fn score_internal(&self, x: &[f32], query_self: bool) -> AnomalyResult<f32> {
if !self.fitted {
return Err(AnomalyError::NotFitted);
}
if x.len() != self.n_features {
return Err(AnomalyError::FeatureCountMismatch {
expected: self.n_features,
got: x.len(),
});
}
let hash = self.hash.as_ref().ok_or(AnomalyError::NotFitted)?;
let z = hash.project(x);
let mut sum_min_density = 0.0_f32;
for chain in &self.chains {
sum_min_density += chain.min_scaled_density(&z, query_self);
}
let mean_min_density = sum_min_density / self.chains.len() as f32;
Ok(-mean_min_density)
}
pub fn score_batch(&self, x: &[f32], n: usize) -> AnomalyResult<Vec<f32>> {
if !self.fitted {
return Err(AnomalyError::NotFitted);
}
if x.len() != n * self.n_features {
return Err(AnomalyError::DimensionMismatch {
expected: n * self.n_features,
got: x.len(),
});
}
let mut scores = Vec::with_capacity(n);
for i in 0..n {
let row = &x[i * self.n_features..(i + 1) * self.n_features];
scores.push(self.score(row)?);
}
Ok(scores)
}
#[must_use]
#[inline]
pub fn stream_hash(&self) -> Option<&StreamHash> {
self.hash.as_ref()
}
#[must_use]
#[inline]
pub fn n_features(&self) -> usize {
self.n_features
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dense_cluster(n: usize, seed: u64) -> (Vec<f32>, usize) {
let mut rng = LcgRng::new(seed);
let mut data = Vec::with_capacity(n * 2);
for _ in 0..n {
data.push(rng.next_normal() * 0.3);
data.push(rng.next_normal() * 0.3);
}
(data, n)
}
#[test]
fn streamhash_deterministic() {
let h1 = StreamHash::new(8, 16, 1.0 / 3.0, 99);
let h2 = StreamHash::new(8, 16, 1.0 / 3.0, 99);
let x = [0.1_f32, -0.4, 0.7, 0.2, -0.9, 0.3, 0.5, -0.1];
let p1 = h1.project(&x);
let p2 = h2.project(&x);
assert_eq!(p1.len(), 16);
for (a, b) in p1.iter().zip(p2.iter()) {
assert!(
(a - b).abs() < 1e-7,
"projection not deterministic: {a} vs {b}"
);
}
let p3 = h1.project(&x);
assert_eq!(p1, p3, "repeat projection must match exactly");
}
#[test]
fn streamhash_feature_evolving() {
let h = StreamHash::new(6, 8, 1.0 / 3.0, 7);
let full = [0.5_f32, 0.5, 0.5, 0.0, 0.0, 0.0];
let short = [0.5_f32, 0.5, 0.5]; let pf = h.project(&full);
let ps = h.project(&short);
for (a, b) in pf.iter().zip(ps.iter()) {
assert!(
(a - b).abs() < 1e-6,
"evolving projection mismatch: {a} vs {b}"
);
}
}
#[test]
fn chain_bin_counts_update() {
let mut rng = LcgRng::new(5);
let mut chain = HalfSpaceChain::new(4, 6, &mut rng);
let z = vec![0.37_f32, -0.12, 0.5, 0.9];
for _ in 0..7 {
chain.add(&z);
}
let counts = chain.level_counts(&z);
assert_eq!(counts.len(), 6, "one count per level");
for (l, c) in counts.iter().enumerate() {
assert_eq!(*c, 7, "level {l} count should be 7, got {c}");
}
let far = vec![1000.0_f32, -1000.0, 500.0, -500.0];
let far_counts = chain.level_counts(&far);
assert_eq!(
*far_counts
.last()
.expect("far_counts vec should be non-empty"),
0,
"far point should be empty"
);
}
#[test]
fn multiscale_takes_min_density() {
let mut rng = LcgRng::new(11);
let mut chain = HalfSpaceChain::new(3, 5, &mut rng);
let base = [0.2_f32, 0.3, 0.4];
for k in 0..10 {
let z = [base[0] + k as f32 * 0.01, base[1], base[2]];
chain.add(&z);
}
let query = [base[0], base[1], base[2]];
let counts = chain.level_counts(&query);
let mut expected_min = f32::INFINITY;
for (l, c) in counts.iter().enumerate() {
let d = (1u64 << (l + 1)) as f32 * *c as f32;
if d < expected_min {
expected_min = d;
}
}
let got = chain.min_scaled_density(&query, false);
assert!(
(got - expected_min).abs() < 1e-3,
"min density {got} should equal min over levels {expected_min}"
);
}
#[test]
fn outlier_scores_higher_than_dense_point() {
let (data, n) = dense_cluster(300, 21);
let mut det = XStream::new(XStreamConfig {
n_components: 50,
n_chains: 60,
depth: 12,
projection_density: 1.0 / 3.0,
seed: 3,
});
det.fit(&data, n, 2).expect("fit");
let dense_point = [0.0_f32, 0.0]; let outlier = [15.0_f32, -15.0];
let s_dense = det.score(&dense_point).expect("dense score");
let s_outlier = det.score(&outlier).expect("outlier score");
assert!(
s_outlier > s_dense,
"outlier score {s_outlier} should exceed dense score {s_dense}"
);
}
#[test]
fn score_monotone_in_sparsity() {
let (data, n) = dense_cluster(400, 33);
let mut det = XStream::new(XStreamConfig {
n_components: 40,
n_chains: 80,
depth: 12,
projection_density: 1.0 / 3.0,
seed: 9,
});
det.fit(&data, n, 2).expect("fit");
let near = det.score(&[0.0_f32, 0.0]).expect("near");
let mid = det.score(&[3.0_f32, 3.0]).expect("mid");
let far = det.score(&[30.0_f32, 30.0]).expect("far");
assert!(
near <= mid,
"score should not decrease moving out: {near} → {mid}"
);
assert!(
mid <= far,
"score should not decrease moving out: {mid} → {far}"
);
assert!(
far > near,
"far point must be more anomalous than the centre"
);
}
#[test]
fn partial_fit_increases_local_density() {
let (data, n) = dense_cluster(120, 4);
let mut det = XStream::new(XStreamConfig {
n_components: 30,
n_chains: 40,
depth: 10,
projection_density: 1.0 / 3.0,
seed: 8,
});
det.fit(&data, n, 2).expect("fit");
let probe = [8.0_f32, 8.0];
let before = det.score(&probe).expect("before");
for _ in 0..200 {
det.partial_fit(&probe).expect("partial fit");
}
let after = det.score(&probe).expect("after");
assert!(
after < before,
"after densifying, score {after} should drop below {before}"
);
}
#[test]
fn unfitted_score_errors() {
let det = XStream::new(XStreamConfig::default());
match det.score(&[0.0_f32, 0.0]) {
Err(AnomalyError::NotFitted) => {}
other => panic!("expected NotFitted, got {other:?}"),
}
}
#[test]
fn empty_input_errors() {
let mut det = XStream::new(XStreamConfig::default());
match det.fit(&[], 0, 2) {
Err(AnomalyError::EmptyInput) => {}
other => panic!("expected EmptyInput, got {other:?}"),
}
}
#[test]
fn feature_count_mismatch_on_score() {
let (data, n) = dense_cluster(20, 1);
let mut det = XStream::new(XStreamConfig {
n_components: 16,
n_chains: 8,
depth: 6,
projection_density: 1.0 / 3.0,
seed: 2,
});
det.fit(&data, n, 2).expect("fit");
match det.score(&[0.0_f32, 0.0, 0.0]) {
Err(AnomalyError::FeatureCountMismatch {
expected: 2,
got: 3,
}) => {}
other => panic!("expected FeatureCountMismatch, got {other:?}"),
}
}
#[test]
fn dimension_mismatch_on_fit() {
let mut det = XStream::new(XStreamConfig::default());
match det.fit(&[1.0_f32, 2.0, 3.0], 2, 2) {
Err(AnomalyError::DimensionMismatch {
expected: 4,
got: 3,
}) => {}
other => panic!("expected DimensionMismatch, got {other:?}"),
}
}
#[test]
fn zero_components_errors() {
let mut det = XStream::new(XStreamConfig {
n_components: 0,
..XStreamConfig::default()
});
let data = vec![0.1_f32, 0.2, 0.3, 0.4];
match det.fit(&data, 2, 2) {
Err(AnomalyError::Internal { .. }) => {}
other => panic!("expected Internal error, got {other:?}"),
}
}
#[test]
fn score_batch_length() {
let (data, n) = dense_cluster(50, 6);
let mut det = XStream::new(XStreamConfig {
n_components: 24,
n_chains: 20,
depth: 8,
projection_density: 1.0 / 3.0,
seed: 4,
});
det.fit(&data, n, 2).expect("fit");
let queries = vec![0.0_f32, 0.0, 1.0, 1.0, 10.0, 10.0];
let scores = det.score_batch(&queries, 3).expect("batch");
assert_eq!(scores.len(), 3);
assert!(scores.iter().all(|s| s.is_finite()), "all finite");
}
}