use super::dbscan::{Dbscan, NOISE};
use super::distance::{DistanceMetric, SquaredEuclidean};
use super::flat::DataRef;
use crate::error::{Error, Result};
#[derive(Debug, Clone)]
struct MicroCluster {
n: usize,
ls: Vec<f32>,
ss: Vec<f32>,
weight: f64,
creation_time: u64,
last_update: u64,
}
impl MicroCluster {
fn new(point: &[f32], timestamp: u64) -> Self {
let ls = point.to_vec();
let ss: Vec<f32> = point.iter().map(|&x| x * x).collect();
Self {
n: 1,
ls,
ss,
weight: 1.0,
creation_time: timestamp,
last_update: timestamp,
}
}
fn centroid(&self) -> Vec<f32> {
let w = self.weight as f32;
if w <= 0.0 {
return self.ls.clone(); }
self.ls.iter().map(|&x| x / w).collect()
}
#[allow(dead_code)]
fn radius(&self) -> f32 {
self.radius_from(self.weight as f32, &self.ls, &self.ss)
}
fn radius_if_absorbed(&self, point: &[f32]) -> f32 {
let new_w = self.weight as f32 + 1.0;
let new_ls: Vec<f32> = self.ls.iter().zip(point).map(|(&l, &p)| l + p).collect();
let new_ss: Vec<f32> = self
.ss
.iter()
.zip(point)
.map(|(&s, &p)| s + p * p)
.collect();
self.radius_from(new_w, &new_ls, &new_ss)
}
fn radius_from(&self, w: f32, ls: &[f32], ss: &[f32]) -> f32 {
if w <= 0.0 {
return 0.0;
}
let mut sum = 0.0f32;
for (&l, &s) in ls.iter().zip(ss) {
let mean = l / w;
let var = s / w - mean * mean;
sum += var;
}
sum.max(0.0).sqrt()
}
fn apply_decay(&mut self, decay_factor: f64, timestamp: u64) {
let elapsed = timestamp.saturating_sub(self.last_update);
if elapsed > 0 {
let decay = 2.0_f64.powf(-decay_factor * elapsed as f64) as f32;
self.weight *= decay as f64;
for l in &mut self.ls {
*l *= decay;
}
for s in &mut self.ss {
*s *= decay;
}
self.last_update = timestamp;
}
}
fn absorb(&mut self, point: &[f32], decay_factor: f64, timestamp: u64) {
self.apply_decay(decay_factor, timestamp);
self.n += 1;
self.weight += 1.0;
for (&p, (l, s)) in point.iter().zip(self.ls.iter_mut().zip(self.ss.iter_mut())) {
*l += p;
*s += p * p;
}
self.last_update = timestamp;
}
fn decay(&mut self, decay_factor: f64, timestamp: u64) {
self.apply_decay(decay_factor, timestamp);
}
}
#[derive(Debug, Clone)]
pub struct DenStream<D: DistanceMetric = SquaredEuclidean> {
epsilon: f32,
macro_epsilon: f32,
min_pts: usize,
beta: f64,
lambda: f64,
mu: f64,
t_p: usize,
metric: D,
p_micro_clusters: Vec<MicroCluster>,
o_micro_clusters: Vec<MicroCluster>,
timestamp: u64,
updates_since_prune: usize,
dim: Option<usize>,
}
impl DenStream<SquaredEuclidean> {
pub fn new(epsilon: f32, min_pts: usize) -> Self {
Self::with_metric(epsilon, min_pts, SquaredEuclidean)
}
}
impl<D: DistanceMetric> DenStream<D> {
pub fn with_metric(epsilon: f32, min_pts: usize, metric: D) -> Self {
Self {
epsilon,
macro_epsilon: epsilon * 2.0,
min_pts,
beta: 0.5,
lambda: 0.001,
mu: 1.0,
t_p: 100,
metric,
p_micro_clusters: Vec::new(),
o_micro_clusters: Vec::new(),
timestamp: 0,
updates_since_prune: 0,
dim: None,
}
}
pub fn with_beta(mut self, beta: f64) -> Self {
self.beta = beta;
self
}
pub fn with_lambda(mut self, lambda: f64) -> Self {
self.lambda = lambda;
self
}
pub fn with_mu(mut self, mu: f64) -> Self {
self.mu = mu;
self
}
pub fn with_macro_epsilon(mut self, eps: f32) -> Self {
self.macro_epsilon = eps;
self
}
pub fn with_pruning_period(mut self, t_p: usize) -> Self {
self.t_p = t_p;
self
}
pub fn macro_cluster(&self) -> Result<Vec<usize>> {
if self.p_micro_clusters.is_empty() {
return Err(Error::EmptyInput);
}
let centroids: Vec<Vec<f32>> = self
.p_micro_clusters
.iter()
.map(|mc| mc.centroid())
.collect();
let dbscan = Dbscan::with_metric(self.macro_epsilon, self.min_pts, self.metric.clone());
dbscan.fit_predict(¢roids)
}
fn validate_point(&self, point: &[f32]) -> Result<()> {
if point.is_empty() {
return Err(Error::InvalidParameter {
name: "point",
message: "must be non-empty",
});
}
if let Some(expected) = self.dim {
if point.len() != expected {
return Err(Error::DimensionMismatch {
expected,
found: point.len(),
});
}
}
Ok(())
}
fn nearest_micro_cluster(
&self,
point: &[f32],
clusters: &[MicroCluster],
) -> Option<(usize, f32)> {
let mut best_idx = None;
let mut best_dist = f32::MAX;
let d = point.len();
let mut centroid_buf = vec![0.0f32; d];
for (i, mc) in clusters.iter().enumerate() {
let w = mc.weight as f32;
if w > 0.0 {
for (j, &x) in mc.ls.iter().enumerate() {
centroid_buf[j] = x / w;
}
} else {
centroid_buf[..d].copy_from_slice(&mc.ls[..d]);
}
let dist = self.metric.distance(point, ¢roid_buf);
if dist < best_dist {
best_dist = dist;
best_idx = Some(i);
}
}
best_idx.map(|idx| (idx, best_dist))
}
fn prune(&mut self) {
let threshold = self.beta * self.mu;
let ts = self.timestamp;
let lambda = self.lambda;
for mc in &mut self.p_micro_clusters {
mc.decay(lambda, ts);
}
self.p_micro_clusters.retain(|mc| mc.weight >= threshold);
for mc in &mut self.o_micro_clusters {
mc.decay(lambda, ts);
}
let current_ts = self.timestamp;
let t_p = self.t_p as u64;
let lam = self.lambda;
self.o_micro_clusters.retain(|mc| {
let age = current_ts.saturating_sub(mc.creation_time);
mc.weight >= outlier_weight_threshold(lam, t_p, age, threshold)
});
}
}
fn outlier_weight_threshold(lambda: f64, t_p: u64, age: u64, potential_threshold: f64) -> f64 {
let denom = 2.0_f64.powf(-lambda * t_p as f64) - 1.0;
if denom.abs() < f64::EPSILON {
return potential_threshold;
}
let numer = 2.0_f64.powf(-lambda * (age + t_p) as f64) - 1.0;
let xi = numer / denom;
xi * potential_threshold
}
impl<D: DistanceMetric> DenStream<D> {
pub fn update(&mut self, point: &[f32]) -> Result<usize> {
self.validate_point(point)?;
for &val in point {
if !val.is_finite() {
return Err(Error::InvalidParameter {
name: "data",
message: "contains NaN or infinity",
});
}
}
if self.dim.is_none() {
self.dim = Some(point.len());
}
self.timestamp += 1;
let ts = self.timestamp;
let potential_threshold = self.beta * self.mu;
let mut assigned_p_idx = None;
if let Some((idx, dist)) = self.nearest_micro_cluster(point, &self.p_micro_clusters) {
if dist <= self.epsilon {
let new_radius = self.p_micro_clusters[idx].radius_if_absorbed(point);
if new_radius <= self.epsilon {
self.p_micro_clusters[idx].absorb(point, self.lambda, ts);
assigned_p_idx = Some(idx);
}
}
}
if assigned_p_idx.is_none() {
let mut absorbed_into_outlier = false;
if let Some((idx, dist)) = self.nearest_micro_cluster(point, &self.o_micro_clusters) {
if dist <= self.epsilon {
let new_radius = self.o_micro_clusters[idx].radius_if_absorbed(point);
if new_radius <= self.epsilon {
self.o_micro_clusters[idx].absorb(point, self.lambda, ts);
absorbed_into_outlier = true;
if self.o_micro_clusters[idx].weight >= potential_threshold {
let promoted = self.o_micro_clusters.remove(idx);
self.p_micro_clusters.push(promoted);
assigned_p_idx = Some(self.p_micro_clusters.len() - 1);
}
}
}
}
if !absorbed_into_outlier && assigned_p_idx.is_none() {
let mc = MicroCluster::new(point, ts);
if mc.weight >= potential_threshold {
self.p_micro_clusters.push(mc);
assigned_p_idx = Some(self.p_micro_clusters.len() - 1);
} else {
self.o_micro_clusters.push(mc);
}
}
}
self.updates_since_prune += 1;
if self.updates_since_prune >= self.t_p {
self.prune();
self.updates_since_prune = 0;
}
Ok(assigned_p_idx.unwrap_or(NOISE))
}
pub fn update_batch(&mut self, points: &(impl DataRef + ?Sized)) -> Result<Vec<usize>> {
if points.n() == 0 {
return Err(Error::EmptyInput);
}
let mut labels = Vec::with_capacity(points.n());
for i in 0..points.n() {
labels.push(self.update(points.row(i))?);
}
Ok(labels)
}
pub fn predict(&self, point: &[f32]) -> Result<usize> {
if self.p_micro_clusters.is_empty() {
return Err(Error::InvalidParameter {
name: "state",
message: "no potential micro-clusters exist yet",
});
}
if let Some(dim) = self.dim {
if point.len() != dim {
return Err(Error::DimensionMismatch {
expected: dim,
found: point.len(),
});
}
}
match self.nearest_micro_cluster(point, &self.p_micro_clusters) {
Some((idx, dist)) if dist <= self.epsilon => Ok(idx),
_ => Ok(super::dbscan::NOISE),
}
}
pub fn predict_batch(&self, points: &(impl DataRef + ?Sized)) -> Result<Vec<usize>> {
(0..points.n())
.map(|i| self.predict(points.row(i)))
.collect()
}
pub fn centroids(&self) -> Vec<Vec<f32>> {
self.p_micro_clusters
.iter()
.map(|mc| mc.centroid())
.collect()
}
pub fn counts(&self) -> Vec<usize> {
self.p_micro_clusters.iter().map(|mc| mc.n).collect()
}
pub fn n_clusters(&self) -> usize {
self.p_micro_clusters.len()
}
}
#[cfg(test)]
#[allow(clippy::needless_range_loop)]
mod tests {
use super::*;
use crate::cluster::dbscan::NOISE;
fn test_denstream() -> DenStream<SquaredEuclidean> {
DenStream::new(2.0, 2)
.with_beta(0.5)
.with_lambda(0.001)
.with_mu(1.0)
.with_macro_epsilon(4.0)
.with_pruning_period(1000)
}
#[test]
fn absorbs_nearby_points() {
let mut ds = test_denstream();
ds.update(&[0.0, 0.0]).ok();
ds.update(&[0.1, 0.1]).ok();
assert!(
ds.p_micro_clusters.len() + ds.o_micro_clusters.len() <= 2,
"nearby points should merge"
);
let total: usize = ds
.p_micro_clusters
.iter()
.chain(ds.o_micro_clusters.iter())
.map(|mc| mc.n)
.sum();
assert_eq!(total, 2);
}
#[test]
fn creates_new_micro_cluster_for_distant_points() {
let mut ds = test_denstream();
ds.update(&[0.0, 0.0]).ok();
ds.update(&[100.0, 100.0]).ok();
let total_clusters = ds.p_micro_clusters.len() + ds.o_micro_clusters.len();
assert_eq!(
total_clusters, 2,
"distant points should create separate micro-clusters"
);
}
#[test]
fn pruning_removes_stale_clusters() {
let mut ds = DenStream::new(2.0, 2)
.with_beta(0.5)
.with_lambda(1.0) .with_mu(1.0)
.with_pruning_period(5);
ds.update(&[100.0, 100.0]).ok();
for i in 0..20 {
ds.update(&[0.0 + i as f32 * 0.01, 0.0]).ok();
}
let has_distant = ds
.p_micro_clusters
.iter()
.chain(ds.o_micro_clusters.iter())
.any(|mc| {
let c = mc.centroid();
c[0] > 50.0
});
assert!(
!has_distant,
"stale distant cluster should have been pruned"
);
}
#[test]
fn macro_clustering_finds_groups() {
let mut ds = DenStream::new(1.0, 2)
.with_beta(0.2)
.with_lambda(0.0001)
.with_mu(1.0)
.with_macro_epsilon(3.0)
.with_pruning_period(10_000);
for i in 0..30 {
let offset = i as f32 * 0.05;
ds.update(&[offset, offset]).ok();
}
for i in 0..30 {
let offset = 50.0 + i as f32 * 0.05;
ds.update(&[offset, offset]).ok();
}
let macro_labels = ds.macro_cluster();
assert!(macro_labels.is_ok(), "macro_cluster should succeed");
let labels = macro_labels.expect("checked above");
let distinct: std::collections::HashSet<usize> =
labels.iter().copied().filter(|&l| l != NOISE).collect();
assert!(
distinct.len() >= 2,
"should find at least 2 macro-clusters, found {}",
distinct.len()
);
}
#[test]
fn with_custom_metric() {
use crate::cluster::distance::Euclidean;
let mut ds = DenStream::with_metric(2.0, 2, Euclidean)
.with_beta(0.5)
.with_lambda(0.001)
.with_mu(1.0);
ds.update(&[0.0, 0.0]).ok();
ds.update(&[0.5, 0.5]).ok();
ds.update(&[100.0, 100.0]).ok();
let total = ds.p_micro_clusters.len() + ds.o_micro_clusters.len();
assert!(
total >= 2,
"should have at least 2 micro-clusters with Euclidean"
);
}
#[test]
fn empty_update_error() {
let mut ds = test_denstream();
let result = ds.update(&[]);
assert!(result.is_err(), "empty point should error");
}
#[test]
fn dimension_mismatch_error() {
let mut ds = test_denstream();
ds.update(&[1.0, 2.0]).ok();
let result = ds.update(&[1.0, 2.0, 3.0]);
assert!(result.is_err(), "dimension mismatch should error");
}
#[test]
fn streaming_trait_consistency() {
let mut ds = test_denstream();
for i in 0..10 {
ds.update(&[i as f32, i as f32]).ok();
}
assert_eq!(
ds.n_clusters(),
ds.centroids().len(),
"n_clusters should match centroids().len()"
);
}
#[test]
fn update_batch_processes_all_points() {
let mut ds = test_denstream();
let points: Vec<Vec<f32>> = (0..10).map(|i| vec![i as f32 * 10.0, 0.0]).collect();
let labels = ds.update_batch(&points);
assert!(labels.is_ok());
assert_eq!(labels.expect("checked above").len(), 10);
}
#[test]
fn update_batch_empty_errors() {
let mut ds = test_denstream();
let result = ds.update_batch(&[]);
assert!(result.is_err());
}
#[test]
fn micro_cluster_radius_single_point_is_zero() {
let mc = MicroCluster::new(&[1.0, 2.0, 3.0], 0);
assert!(mc.radius().abs() < 1e-6, "single-point radius should be 0");
}
#[test]
fn micro_cluster_centroid_matches_single_point() {
let mc = MicroCluster::new(&[3.0, 4.0], 0);
let c = mc.centroid();
assert!((c[0] - 3.0).abs() < 1e-6);
assert!((c[1] - 4.0).abs() < 1e-6);
}
#[test]
fn macro_cluster_on_empty_errors() {
let ds = test_denstream();
let result = ds.macro_cluster();
assert!(result.is_err(), "macro_cluster on empty should error");
}
#[test]
fn noise_sentinel_value() {
assert_eq!(NOISE, usize::MAX);
}
#[test]
fn nan_input_rejected() {
let mut ds = test_denstream();
let result = ds.update(&[1.0, f32::NAN]);
assert!(result.is_err());
}
#[test]
fn inf_input_rejected() {
let mut ds = test_denstream();
let result = ds.update(&[f32::INFINITY, 0.0]);
assert!(result.is_err());
}
#[test]
fn centroid_drift_under_decay() {
let mut ds = DenStream::new(2.0, 2)
.with_beta(0.2)
.with_lambda(0.1) .with_mu(1.0)
.with_pruning_period(10_000);
for _ in 0..50 {
ds.update(&[0.0, 0.0]).ok();
}
for _ in 0..200 {
ds.update(&[10.0, 10.0]).ok();
}
let centroids = ds.centroids();
assert!(!centroids.is_empty());
let has_near_10 = centroids.iter().any(|c| c[0] > 5.0 && c[1] > 5.0);
assert!(has_near_10, "centroid should track recent (10,10) points");
}
#[test]
fn denstream_weight_decay_invariant() {
let lambda = 0.5_f64;
let t_steps = [1u64, 2, 5, 10];
for &t in &t_steps {
let mut mc = MicroCluster::new(&[0.0, 0.0], 0);
assert!(
(mc.weight - 1.0).abs() < 1e-12,
"initial weight should be 1.0"
);
mc.decay(lambda, t);
let expected = 2.0_f64.powf(-lambda * t as f64);
assert!(
(mc.weight - expected).abs() < 1e-6,
"weight after {t} steps with lambda={lambda}: got {}, expected {}",
mc.weight,
expected
);
}
}
}
#[cfg(test)]
mod proptests {
use super::*;
use crate::cluster::dbscan::NOISE;
use proptest::prelude::*;
fn arb_point(d: usize) -> impl Strategy<Value = Vec<f32>> {
proptest::collection::vec(-100.0f32..100.0, d)
}
fn arb_points(n: usize, d: usize) -> impl Strategy<Value = Vec<Vec<f32>>> {
proptest::collection::vec(arb_point(d), n)
}
proptest! {
#[test]
fn labels_in_valid_range(points in arb_points(30, 3)) {
let mut ds = DenStream::new(5.0, 2)
.with_beta(0.5)
.with_lambda(0.001)
.with_mu(1.0)
.with_pruning_period(1000);
for point in &points {
let label = ds.update(point).expect("update should succeed");
prop_assert!(
label == NOISE || label < ds.p_micro_clusters.len(),
"label {} out of range (n_p={})",
label,
ds.p_micro_clusters.len()
);
}
}
#[test]
fn centroid_dimension_matches_input(points in arb_points(10, 5)) {
let mut ds = DenStream::new(5.0, 2)
.with_beta(0.5)
.with_lambda(0.001)
.with_mu(1.0);
for point in &points {
ds.update(point).expect("update should succeed");
}
for c in ds.centroids() {
prop_assert_eq!(c.len(), 5, "centroid dim should match input dim");
}
}
}
#[test]
fn centroid_drift_under_decay() {
let mut ds = DenStream::new(2.0, 2)
.with_beta(0.5)
.with_lambda(0.1)
.with_mu(1.0);
for _ in 0..20 {
ds.update(&[0.0, 0.0]).unwrap();
}
for _ in 0..20 {
ds.update(&[10.0, 10.0]).unwrap();
}
let centroids = ds.centroids();
let near_target = centroids.iter().any(|c| {
let dist_sq = c
.iter()
.zip([10.0, 10.0].iter())
.map(|(a, b)| (a - b).powi(2))
.sum::<f32>();
dist_sq < 25.0
});
assert!(near_target, "centroid should drift toward recent points");
}
}