use anyhow::Result;
use serde::Serialize;
use super::kdtree::{KDTree, Point};
#[derive(Debug, Clone, Serialize)]
pub struct Cluster {
pub id: usize,
pub points: Vec<Point>,
pub centroid: Vec<f64>,
pub density: f64,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ClusterLabel {
Noise,
Cluster(usize),
}
#[derive(Debug, Clone)]
pub struct DBSCANConfig {
pub density: f64,
pub min_cluster_size: usize,
pub max_distance: f64,
pub window_size: usize,
}
impl Default for DBSCANConfig {
fn default() -> Self {
Self {
density: 0.1,
min_cluster_size: 3,
max_distance: 1.0,
window_size: 1000,
}
}
}
#[derive(Debug)]
pub struct StreamingClusters {
pub clusters: Vec<Cluster>,
pub kd_tree: KDTree,
pub window: Vec<Point>,
pub config: DBSCANConfig,
}
impl StreamingClusters {
pub fn new(config: DBSCANConfig) -> Self {
Self {
clusters: Vec::new(),
kd_tree: KDTree::new(),
window: Vec::with_capacity(config.window_size),
config,
}
}
pub fn update(&mut self, new_point: Point) -> ClusterLabel {
self.window.push(new_point.clone());
if self.window.len() > self.config.window_size {
self.window.remove(0);
}
self.kd_tree = KDTree::build(&self.window);
let local_density = self.calculate_local_density(&new_point);
if local_density < self.config.density {
ClusterLabel::Noise
} else {
self.assign_to_cluster(new_point, local_density)
}
}
fn calculate_local_density(&self, point: &Point) -> f64 {
let neighbors = self
.kd_tree
.find_neighbors(&point.features, self.config.max_distance);
let filtered: Vec<Point> = neighbors.into_iter().filter(|p| p.id != point.id).collect();
if filtered.is_empty() {
0.0
} else {
let avg_distance: f64 = filtered
.iter()
.map(|neighbor| calculate_distance(&point.features, &neighbor.features))
.sum::<f64>()
/ filtered.len() as f64;
filtered.len() as f64 / (1.0 + avg_distance)
}
}
fn assign_to_cluster(&mut self, point: Point, density: f64) -> ClusterLabel {
let mut nearest_cluster = None;
let mut min_distance = f64::INFINITY;
for (i, cluster) in self.clusters.iter().enumerate() {
let distance = calculate_distance(&point.features, &cluster.centroid);
if distance < min_distance && distance <= self.config.max_distance {
min_distance = distance;
nearest_cluster = Some(i);
}
}
if let Some(cluster_idx) = nearest_cluster {
self.clusters[cluster_idx].points.push(point.clone());
self.update_cluster_centroid(cluster_idx);
ClusterLabel::Cluster(cluster_idx)
} else {
let new_cluster = Cluster {
id: self.clusters.len(),
points: vec![point.clone()],
centroid: point.features.clone(),
density,
};
self.clusters.push(new_cluster);
ClusterLabel::Cluster(self.clusters.len() - 1)
}
}
fn update_cluster_centroid(&mut self, cluster_idx: usize) {
if self.clusters[cluster_idx].points.is_empty() {
return;
}
let cluster = &self.clusters[cluster_idx];
let feature_count = cluster.points[0].features.len();
let mut new_centroid = vec![0.0; feature_count];
for point in &cluster.points {
for (i, &feature) in point.features.iter().enumerate() {
new_centroid[i] += feature;
}
}
for centroid_val in &mut new_centroid {
*centroid_val /= cluster.points.len() as f64;
}
self.clusters[cluster_idx].centroid = new_centroid;
}
pub fn calculate_density_maps(&self) -> Vec<f64> {
let mut density_map = Vec::with_capacity(self.window.len());
for point in &self.window {
let density = self.calculate_local_density(point);
density_map.push(density);
}
density_map
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct AdaptiveDBSCAN {
config: DBSCANConfig,
streaming_clusters: StreamingClusters,
}
impl AdaptiveDBSCAN {
pub fn new(config: DBSCANConfig) -> Result<Self> {
Ok(Self {
streaming_clusters: StreamingClusters::new(config.clone()),
config,
})
}
pub fn detect_anomalies_dbscan(
&mut self,
values: &[f64],
timestamps: &[f64],
) -> Vec<ClusterLabel> {
let points: Vec<Point> = values
.iter()
.enumerate()
.map(|(i, &value)| {
let timestamp = timestamps.get(i).copied().unwrap_or(i as f64);
Point {
id: i,
values: vec![value],
embedding: None,
timestamp,
features: vec![value],
}
})
.collect();
self.adaptive_dbscan_clustering(&points)
}
pub fn adaptive_dbscan_clustering(&mut self, points: &[Point]) -> Vec<ClusterLabel> {
let adaptive_params = self.calculate_adaptive_parameters(points);
self.apply_dbscan(points, adaptive_params)
}
fn calculate_adaptive_parameters(&self, points: &[Point]) -> (f64, usize) {
if points.len() < 3 {
return (0.5, 2);
}
let features: Vec<Vec<f64>> = points.iter().map(|p| p.features.clone()).collect();
let mut all_values = Vec::new();
for feature_vec in &features {
all_values.extend(feature_vec);
}
let mean: f64 = all_values.iter().sum::<f64>() / all_values.len() as f64;
let variance: f64 =
all_values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / all_values.len() as f64;
let std_dev = variance.sqrt();
let adaptive_epsilon = (std_dev * 2.0).max(1e-6); let adaptive_min_samples = (points.len() as f64 * 0.1).max(3.0) as usize;
(
adaptive_epsilon,
adaptive_min_samples.min(points.len().saturating_sub(1)),
)
}
fn apply_dbscan(&self, points: &[Point], params: (f64, usize)) -> Vec<ClusterLabel> {
let (epsilon, min_samples) = params;
if points.is_empty() {
return Vec::new();
}
let kd_tree = KDTree::build(points);
let mut labels = vec![ClusterLabel::Noise; points.len()];
let mut cluster_id = 0;
for (i, point) in points.iter().enumerate() {
if !matches!(labels[i], ClusterLabel::Noise) {
continue; }
let neighbors = kd_tree.find_neighbors(&point.features, epsilon);
if neighbors.len() < min_samples {
labels[i] = ClusterLabel::Noise; } else {
labels[i] = ClusterLabel::Cluster(cluster_id);
let mut cluster_points = vec![i];
let mut queue = neighbors.iter().map(|n| n.id).collect::<Vec<_>>();
while let Some(neighbor_id) = queue.pop() {
if matches!(labels[neighbor_id], ClusterLabel::Noise) {
labels[neighbor_id] = ClusterLabel::Cluster(cluster_id);
cluster_points.push(neighbor_id);
let neighbor_neighbors =
kd_tree.find_neighbors(&points[neighbor_id].features, epsilon);
for neighbor_neighbor in &neighbor_neighbors {
if matches!(labels[neighbor_neighbor.id], ClusterLabel::Noise) {
queue.push(neighbor_neighbor.id);
}
}
}
}
cluster_id += 1;
}
}
labels
}
pub fn update_streaming_clusters(&mut self, new_point: Point) -> ClusterLabel {
self.streaming_clusters.update(new_point)
}
pub fn get_density_maps(&self) -> Vec<f64> {
self.streaming_clusters.calculate_density_maps()
}
}
fn calculate_distance(a: &[f64], b: &[f64]) -> f64 {
let len = a.len().min(b.len());
if len == 0 {
return 0.0;
}
let mut sum = 0.0;
for i in 0..len {
let diff = a[i] - b[i];
sum += diff * diff;
}
sum.sqrt()
}