Skip to main content

alimentar/
sketch.rs

1//! Sketch-based statistics for distributed/federated drift detection
2//!
3//! Provides privacy-preserving statistical summaries that can be computed
4//! locally and merged without sharing raw data.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use alimentar::sketch::{TDigest, DDSketch};
10//!
11//! // Create sketches on each node
12//! let mut digest1 = TDigest::new(100);
13//! digest1.add_batch(&node1_data);
14//!
15//! let mut digest2 = TDigest::new(100);
16//! digest2.add_batch(&node2_data);
17//!
18//! // Merge sketches (no raw data shared)
19//! let merged = TDigest::merge(&[digest1, digest2]);
20//!
21//! // Query quantiles from merged sketch
22//! println!("Median: {}", merged.quantile(0.5));
23//! ```
24
25// Sketch algorithms require numeric casts and float comparisons
26#![allow(clippy::cast_precision_loss)]
27#![allow(clippy::cast_possible_truncation)]
28#![allow(clippy::cast_sign_loss)]
29#![allow(clippy::float_cmp)]
30#![allow(clippy::suboptimal_flops)]
31
32use std::collections::HashMap;
33
34use serde::{Deserialize, Serialize};
35
36use crate::{
37    dataset::{ArrowDataset, Dataset},
38    drift::DriftSeverity,
39    error::{Error, Result},
40};
41
42/// A centroid in a T-Digest (mean and weight)
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct Centroid {
45    /// Mean value of this centroid
46    pub mean: f64,
47    /// Weight (count) of this centroid
48    pub weight: f64,
49}
50
51impl Centroid {
52    /// Create a new centroid
53    pub fn new(mean: f64, weight: f64) -> Self {
54        Self { mean, weight }
55    }
56
57    /// Merge another centroid into this one
58    pub fn merge(&mut self, other: &Self) {
59        let total_weight = self.weight + other.weight;
60        if total_weight > 0.0 {
61            self.mean = (self.mean * self.weight + other.mean * other.weight) / total_weight;
62            self.weight = total_weight;
63        }
64    }
65}
66
67/// T-Digest for streaming quantile estimation
68///
69/// Provides accurate quantile estimates with bounded memory usage.
70/// Based on the algorithm by Ted Dunning.
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct TDigest {
73    /// Centroids sorted by mean
74    centroids: Vec<Centroid>,
75    /// Compression parameter (higher = more accuracy, more memory)
76    compression: f64,
77    /// Total weight of all centroids
78    total_weight: f64,
79    /// Minimum value seen
80    min: f64,
81    /// Maximum value seen
82    max: f64,
83}
84
85impl TDigest {
86    /// Create a new T-Digest with given compression factor
87    ///
88    /// Compression of 100 is a good default. Higher values give more accuracy.
89    pub fn new(compression: f64) -> Self {
90        Self {
91            centroids: Vec::new(),
92            compression,
93            total_weight: 0.0,
94            min: f64::INFINITY,
95            max: f64::NEG_INFINITY,
96        }
97    }
98
99    /// Add a single value
100    pub fn add(&mut self, value: f64) {
101        self.add_weighted(value, 1.0);
102    }
103
104    /// Add a value with weight
105    pub fn add_weighted(&mut self, value: f64, weight: f64) {
106        if !value.is_finite() || weight <= 0.0 {
107            return;
108        }
109
110        self.min = self.min.min(value);
111        self.max = self.max.max(value);
112        self.total_weight += weight;
113
114        // Find insertion point
115        let idx = self.find_insertion_point(value);
116
117        // Try to merge with existing centroid
118        if !self.centroids.is_empty() {
119            let max_weight = self.max_weight_at(idx);
120            let nearest = if idx < self.centroids.len() {
121                idx
122            } else {
123                self.centroids.len() - 1
124            };
125
126            if self.centroids[nearest].weight + weight <= max_weight {
127                self.centroids[nearest].merge(&Centroid::new(value, weight));
128                return;
129            }
130        }
131
132        // Insert new centroid
133        self.centroids.insert(idx, Centroid::new(value, weight));
134
135        // Compress if needed
136        if self.centroids.len() > self.compression as usize * 2 {
137            self.compress();
138        }
139    }
140
141    /// Add a batch of values
142    pub fn add_batch(&mut self, values: &[f64]) {
143        for &v in values {
144            self.add(v);
145        }
146    }
147
148    /// Get estimated quantile (0-1)
149    pub fn quantile(&self, q: f64) -> f64 {
150        if self.centroids.is_empty() {
151            return f64::NAN;
152        }
153
154        let q = q.clamp(0.0, 1.0);
155
156        if q == 0.0 {
157            return self.min;
158        }
159        if q == 1.0 {
160            return self.max;
161        }
162
163        let target_weight = q * self.total_weight;
164        let mut cumulative = 0.0;
165
166        for (i, centroid) in self.centroids.iter().enumerate() {
167            let next_cumulative = cumulative + centroid.weight;
168
169            if next_cumulative >= target_weight {
170                // Interpolate within this centroid
171                let prev_mean = if i > 0 {
172                    self.centroids[i - 1].mean
173                } else {
174                    self.min
175                };
176                let next_mean = if i < self.centroids.len() - 1 {
177                    self.centroids[i + 1].mean
178                } else {
179                    self.max
180                };
181
182                let ratio = if centroid.weight > 0.0 {
183                    (target_weight - cumulative) / centroid.weight
184                } else {
185                    0.5
186                };
187
188                // Linear interpolation
189                let low = (prev_mean + centroid.mean) / 2.0;
190                let high = (centroid.mean + next_mean) / 2.0;
191
192                return low + ratio * (high - low);
193            }
194
195            cumulative = next_cumulative;
196        }
197
198        self.max
199    }
200
201    /// Get the CDF value for a given x (proportion of values <= x)
202    pub fn cdf(&self, x: f64) -> f64 {
203        if self.centroids.is_empty() || self.total_weight == 0.0 {
204            return 0.0;
205        }
206
207        if x <= self.min {
208            return 0.0;
209        }
210        if x >= self.max {
211            return 1.0;
212        }
213
214        let mut cumulative = 0.0;
215
216        for centroid in &self.centroids {
217            if x < centroid.mean {
218                // Interpolate
219                return cumulative / self.total_weight;
220            }
221            cumulative += centroid.weight;
222        }
223
224        cumulative / self.total_weight
225    }
226
227    /// Merge multiple T-Digests into one
228    pub fn merge(digests: &[Self]) -> Self {
229        if digests.is_empty() {
230            return Self::new(100.0);
231        }
232
233        let compression = digests.iter().map(|d| d.compression).fold(0.0, f64::max);
234        let mut result = Self::new(compression);
235
236        // Collect all centroids
237        let mut all_centroids: Vec<Centroid> = digests
238            .iter()
239            .flat_map(|d| d.centroids.iter().cloned())
240            .collect();
241
242        // Sort by mean
243        all_centroids.sort_by(|a, b| {
244            a.mean
245                .partial_cmp(&b.mean)
246                .unwrap_or(std::cmp::Ordering::Equal)
247        });
248
249        // Update min/max
250        result.min = digests.iter().map(|d| d.min).fold(f64::INFINITY, f64::min);
251        result.max = digests
252            .iter()
253            .map(|d| d.max)
254            .fold(f64::NEG_INFINITY, f64::max);
255        result.total_weight = digests.iter().map(|d| d.total_weight).sum();
256
257        // Add centroids
258        for centroid in all_centroids {
259            result.centroids.push(centroid);
260        }
261
262        // Compress
263        result.compress();
264
265        result
266    }
267
268    /// Get total count of values
269    pub fn count(&self) -> f64 {
270        self.total_weight
271    }
272
273    /// Get minimum value
274    pub fn min(&self) -> f64 {
275        self.min
276    }
277
278    /// Get maximum value
279    pub fn max(&self) -> f64 {
280        self.max
281    }
282
283    /// Get number of centroids
284    pub fn num_centroids(&self) -> usize {
285        self.centroids.len()
286    }
287
288    /// Check if empty
289    pub fn is_empty(&self) -> bool {
290        self.centroids.is_empty()
291    }
292
293    /// Serialize to bytes
294    pub fn to_bytes(&self) -> Result<Vec<u8>> {
295        rmp_serde::to_vec(self)
296            .map_err(|e| Error::Format(format!("Failed to serialize TDigest: {e}")))
297    }
298
299    /// Deserialize from bytes
300    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
301        rmp_serde::from_slice(bytes)
302            .map_err(|e| Error::Format(format!("Failed to deserialize TDigest: {e}")))
303    }
304
305    // Internal methods
306
307    fn find_insertion_point(&self, value: f64) -> usize {
308        self.centroids
309            .binary_search_by(|c| {
310                c.mean
311                    .partial_cmp(&value)
312                    .unwrap_or(std::cmp::Ordering::Equal)
313            })
314            .unwrap_or_else(|i| i)
315    }
316
317    fn max_weight_at(&self, index: usize) -> f64 {
318        // Scale function for T-Digest
319        let q = if self.total_weight > 0.0 {
320            let cumulative: f64 = self.centroids.iter().take(index).map(|c| c.weight).sum();
321            cumulative / self.total_weight
322        } else {
323            0.5
324        };
325
326        // k1 scale function
327        let k = 4.0 * self.compression * q * (1.0 - q);
328        k.max(1.0)
329    }
330
331    fn compress(&mut self) {
332        if self.centroids.len() <= 1 {
333            return;
334        }
335
336        // Sort by mean
337        self.centroids.sort_by(|a, b| {
338            a.mean
339                .partial_cmp(&b.mean)
340                .unwrap_or(std::cmp::Ordering::Equal)
341        });
342
343        let mut new_centroids = Vec::with_capacity(self.compression as usize);
344        let mut current = self.centroids[0].clone();
345        let mut cumulative = 0.0;
346
347        for centroid in self.centroids.iter().skip(1) {
348            let q = cumulative / self.total_weight;
349            let max_weight = 4.0 * self.compression * q * (1.0 - q);
350
351            if current.weight + centroid.weight <= max_weight.max(1.0) {
352                current.merge(centroid);
353            } else {
354                cumulative += current.weight;
355                new_centroids.push(current);
356                current = centroid.clone();
357            }
358        }
359
360        new_centroids.push(current);
361        self.centroids = new_centroids;
362    }
363}
364
365/// DDSketch for distribution estimation
366///
367/// Provides relative-error guarantees for quantile estimation.
368/// Based on the algorithm by DataDog.
369#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct DDSketch {
371    /// Relative accuracy parameter (e.g., 0.01 for 1% error)
372    alpha: f64,
373    /// Gamma = (1 + alpha) / (1 - alpha)
374    gamma: f64,
375    /// Log of gamma for bucket mapping
376    ln_gamma: f64,
377    /// Positive value buckets
378    positive_buckets: HashMap<i32, u64>,
379    /// Negative value buckets
380    negative_buckets: HashMap<i32, u64>,
381    /// Count of zero values
382    zero_count: u64,
383    /// Total count
384    total_count: u64,
385    /// Minimum value
386    min: f64,
387    /// Maximum value
388    max: f64,
389}
390
391impl DDSketch {
392    /// Create a new DDSketch with given relative accuracy
393    ///
394    /// Alpha of 0.01 gives 1% relative error on quantiles.
395    pub fn new(alpha: f64) -> Self {
396        let alpha = alpha.clamp(0.0001, 0.5);
397        let gamma = (1.0 + alpha) / (1.0 - alpha);
398
399        Self {
400            alpha,
401            gamma,
402            ln_gamma: gamma.ln(),
403            positive_buckets: HashMap::new(),
404            negative_buckets: HashMap::new(),
405            zero_count: 0,
406            total_count: 0,
407            min: f64::INFINITY,
408            max: f64::NEG_INFINITY,
409        }
410    }
411
412    /// Add a value
413    pub fn add(&mut self, value: f64) {
414        if !value.is_finite() {
415            return;
416        }
417
418        self.min = self.min.min(value);
419        self.max = self.max.max(value);
420        self.total_count += 1;
421
422        if value > 0.0 {
423            let bucket = self.bucket_index(value);
424            *self.positive_buckets.entry(bucket).or_insert(0) += 1;
425        } else if value < 0.0 {
426            let bucket = self.bucket_index(-value);
427            *self.negative_buckets.entry(bucket).or_insert(0) += 1;
428        } else {
429            self.zero_count += 1;
430        }
431    }
432
433    /// Add a batch of values
434    pub fn add_batch(&mut self, values: &[f64]) {
435        for &v in values {
436            self.add(v);
437        }
438    }
439
440    /// Get estimated quantile (0-1)
441    pub fn quantile(&self, q: f64) -> f64 {
442        if self.total_count == 0 {
443            return f64::NAN;
444        }
445
446        let q = q.clamp(0.0, 1.0);
447
448        if q == 0.0 {
449            return self.min;
450        }
451        if q == 1.0 {
452            return self.max;
453        }
454
455        let target_rank = (q * self.total_count as f64).ceil() as u64;
456        let mut cumulative: u64 = 0;
457
458        // Check negative buckets (sorted descending by bucket index = ascending by
459        // value)
460        let mut neg_buckets: Vec<_> = self.negative_buckets.iter().collect();
461        neg_buckets.sort_by(|a, b| b.0.cmp(a.0));
462
463        for (&bucket, &count) in &neg_buckets {
464            cumulative += count;
465            if cumulative >= target_rank {
466                return -self.bucket_to_value(bucket);
467            }
468        }
469
470        // Check zero
471        cumulative += self.zero_count;
472        if cumulative >= target_rank {
473            return 0.0;
474        }
475
476        // Check positive buckets (sorted ascending)
477        let mut pos_buckets: Vec<_> = self.positive_buckets.iter().collect();
478        pos_buckets.sort_by_key(|&(k, _)| *k);
479
480        for (&bucket, &count) in &pos_buckets {
481            cumulative += count;
482            if cumulative >= target_rank {
483                return self.bucket_to_value(bucket);
484            }
485        }
486
487        self.max
488    }
489
490    /// Merge multiple DDSketches
491    pub fn merge(sketches: &[Self]) -> Self {
492        if sketches.is_empty() {
493            return Self::new(0.01);
494        }
495
496        // Use minimum alpha for best accuracy
497        let alpha = sketches
498            .iter()
499            .map(|s| s.alpha)
500            .fold(f64::INFINITY, f64::min);
501        let mut result = Self::new(alpha);
502
503        result.min = sketches.iter().map(|s| s.min).fold(f64::INFINITY, f64::min);
504        result.max = sketches
505            .iter()
506            .map(|s| s.max)
507            .fold(f64::NEG_INFINITY, f64::max);
508        result.total_count = sketches.iter().map(|s| s.total_count).sum();
509        result.zero_count = sketches.iter().map(|s| s.zero_count).sum();
510
511        for sketch in sketches {
512            for (&bucket, &count) in &sketch.positive_buckets {
513                *result.positive_buckets.entry(bucket).or_insert(0) += count;
514            }
515            for (&bucket, &count) in &sketch.negative_buckets {
516                *result.negative_buckets.entry(bucket).or_insert(0) += count;
517            }
518        }
519
520        result
521    }
522
523    /// Get total count
524    pub fn count(&self) -> u64 {
525        self.total_count
526    }
527
528    /// Get minimum value
529    pub fn min(&self) -> f64 {
530        self.min
531    }
532
533    /// Get maximum value
534    pub fn max(&self) -> f64 {
535        self.max
536    }
537
538    /// Check if empty
539    pub fn is_empty(&self) -> bool {
540        self.total_count == 0
541    }
542
543    /// Serialize to bytes
544    pub fn to_bytes(&self) -> Result<Vec<u8>> {
545        rmp_serde::to_vec(self)
546            .map_err(|e| Error::Format(format!("Failed to serialize DDSketch: {e}")))
547    }
548
549    /// Deserialize from bytes
550    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
551        rmp_serde::from_slice(bytes)
552            .map_err(|e| Error::Format(format!("Failed to deserialize DDSketch: {e}")))
553    }
554
555    // Internal methods
556
557    fn bucket_index(&self, value: f64) -> i32 {
558        if value <= 0.0 {
559            return i32::MIN;
560        }
561        (value.ln() / self.ln_gamma).ceil() as i32
562    }
563
564    fn bucket_to_value(&self, bucket: i32) -> f64 {
565        (2.0 * self.gamma.powi(bucket)) / (1.0 + self.gamma)
566    }
567}
568
569/// Type of sketch algorithm
570#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
571pub enum SketchType {
572    /// T-Digest for quantile estimation
573    TDigest,
574    /// DDSketch for relative-error quantiles
575    DDSketch,
576}
577
578impl SketchType {
579    /// Get human-readable name
580    pub fn name(&self) -> &'static str {
581        match self {
582            Self::TDigest => "T-Digest",
583            Self::DDSketch => "DDSketch",
584        }
585    }
586}
587
588/// Serializable data sketch containing distribution summaries
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct DataSketch {
591    /// Sketch type used
592    pub sketch_type: SketchType,
593    /// Per-column T-Digest sketches
594    pub tdigests: HashMap<String, TDigest>,
595    /// Per-column DDSketch sketches
596    pub ddsketches: HashMap<String, DDSketch>,
597    /// Total row count
598    pub row_count: u64,
599    /// Source identifier (e.g., node name)
600    pub source: Option<String>,
601}
602
603impl DataSketch {
604    /// Create a new empty data sketch
605    pub fn new(sketch_type: SketchType) -> Self {
606        Self {
607            sketch_type,
608            tdigests: HashMap::new(),
609            ddsketches: HashMap::new(),
610            row_count: 0,
611            source: None,
612        }
613    }
614
615    /// Set source identifier
616    #[must_use]
617    pub fn with_source(mut self, source: impl Into<String>) -> Self {
618        self.source = Some(source.into());
619        self
620    }
621
622    /// Create sketch from dataset
623    pub fn from_dataset(dataset: &ArrowDataset, sketch_type: SketchType) -> Result<Self> {
624        let mut sketch = Self::new(sketch_type);
625        sketch.add_dataset(dataset)?;
626        Ok(sketch)
627    }
628
629    /// Add data from dataset to sketch
630    pub fn add_dataset(&mut self, dataset: &ArrowDataset) -> Result<()> {
631        use arrow::{
632            array::{Array, Float64Array, Int32Array, Int64Array},
633            datatypes::DataType,
634        };
635
636        let schema = dataset.schema();
637
638        for batch in dataset.iter() {
639            self.row_count += batch.num_rows() as u64;
640
641            for (col_idx, field) in schema.fields().iter().enumerate() {
642                // Only sketch numeric columns
643                let is_numeric = matches!(
644                    field.data_type(),
645                    DataType::Float64 | DataType::Float32 | DataType::Int32 | DataType::Int64
646                );
647
648                if !is_numeric {
649                    continue;
650                }
651
652                let col_name = field.name();
653                let array = batch.column(col_idx);
654
655                // Collect values
656                let values: Vec<f64> = match field.data_type() {
657                    DataType::Float64 => {
658                        if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
659                            (0..arr.len())
660                                .filter(|&i| !arr.is_null(i))
661                                .map(|i| arr.value(i))
662                                .collect()
663                        } else {
664                            continue;
665                        }
666                    }
667                    DataType::Float32 => {
668                        if let Some(arr) =
669                            array.as_any().downcast_ref::<arrow::array::Float32Array>()
670                        {
671                            (0..arr.len())
672                                .filter(|&i| !arr.is_null(i))
673                                .map(|i| f64::from(arr.value(i)))
674                                .collect()
675                        } else {
676                            continue;
677                        }
678                    }
679                    DataType::Int32 => {
680                        if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
681                            (0..arr.len())
682                                .filter(|&i| !arr.is_null(i))
683                                .map(|i| f64::from(arr.value(i)))
684                                .collect()
685                        } else {
686                            continue;
687                        }
688                    }
689                    DataType::Int64 => {
690                        if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
691                            (0..arr.len())
692                                .filter(|&i| !arr.is_null(i))
693                                .map(|i| arr.value(i) as f64)
694                                .collect()
695                        } else {
696                            continue;
697                        }
698                    }
699                    _ => continue,
700                };
701
702                // Add to appropriate sketch
703                match self.sketch_type {
704                    SketchType::TDigest => {
705                        let digest = self
706                            .tdigests
707                            .entry(col_name.clone())
708                            .or_insert_with(|| TDigest::new(100.0));
709                        digest.add_batch(&values);
710                    }
711                    SketchType::DDSketch => {
712                        let sketch = self
713                            .ddsketches
714                            .entry(col_name.clone())
715                            .or_insert_with(|| DDSketch::new(0.01));
716                        sketch.add_batch(&values);
717                    }
718                }
719            }
720        }
721
722        Ok(())
723    }
724
725    /// Merge multiple data sketches
726    pub fn merge(sketches: &[Self]) -> Result<Self> {
727        if sketches.is_empty() {
728            return Err(Error::invalid_config("Cannot merge empty sketch list"));
729        }
730
731        let sketch_type = sketches[0].sketch_type;
732
733        // Verify all sketches use same type
734        for s in sketches {
735            if s.sketch_type != sketch_type {
736                return Err(Error::invalid_config(
737                    "Cannot merge sketches of different types",
738                ));
739            }
740        }
741
742        let mut result = Self::new(sketch_type);
743        result.row_count = sketches.iter().map(|s| s.row_count).sum();
744
745        // Collect column names based on sketch type
746        let columns: std::collections::HashSet<String> = match sketch_type {
747            SketchType::TDigest => sketches
748                .iter()
749                .flat_map(|s| s.tdigests.keys().cloned())
750                .collect(),
751            SketchType::DDSketch => sketches
752                .iter()
753                .flat_map(|s| s.ddsketches.keys().cloned())
754                .collect(),
755        };
756
757        // Merge each column
758        for col in columns {
759            match sketch_type {
760                SketchType::TDigest => {
761                    let digests: Vec<TDigest> = sketches
762                        .iter()
763                        .filter_map(|s| s.tdigests.get(&col).cloned())
764                        .collect();
765                    if !digests.is_empty() {
766                        result.tdigests.insert(col, TDigest::merge(&digests));
767                    }
768                }
769                SketchType::DDSketch => {
770                    let dd_sketches: Vec<DDSketch> = sketches
771                        .iter()
772                        .filter_map(|s| s.ddsketches.get(&col).cloned())
773                        .collect();
774                    if !dd_sketches.is_empty() {
775                        result.ddsketches.insert(col, DDSketch::merge(&dd_sketches));
776                    }
777                }
778            }
779        }
780
781        Ok(result)
782    }
783
784    /// Get quantile for a column
785    pub fn quantile(&self, column: &str, q: f64) -> Option<f64> {
786        match self.sketch_type {
787            SketchType::TDigest => self.tdigests.get(column).map(|d| d.quantile(q)),
788            SketchType::DDSketch => self.ddsketches.get(column).map(|d| d.quantile(q)),
789        }
790    }
791
792    /// Serialize to bytes
793    pub fn to_bytes(&self) -> Result<Vec<u8>> {
794        rmp_serde::to_vec(self)
795            .map_err(|e| Error::Format(format!("Failed to serialize DataSketch: {e}")))
796    }
797
798    /// Deserialize from bytes
799    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
800        rmp_serde::from_slice(bytes)
801            .map_err(|e| Error::Format(format!("Failed to deserialize DataSketch: {e}")))
802    }
803}
804
805/// Result of distributed drift comparison
806#[derive(Debug, Clone)]
807pub struct SketchDriftResult {
808    /// Column name
809    pub column: String,
810    /// KS-like statistic based on quantile differences
811    pub statistic: f64,
812    /// Estimated severity
813    pub severity: DriftSeverity,
814    /// Quantile differences at key points
815    pub quantile_diffs: Vec<(f64, f64)>,
816}
817
818/// Distributed drift detector using sketches
819pub struct DistributedDriftDetector {
820    /// Sketch type to use
821    sketch_type: SketchType,
822    /// Number of quantile points to compare
823    num_quantiles: usize,
824    /// Threshold for drift detection
825    threshold: f64,
826}
827
828impl Default for DistributedDriftDetector {
829    fn default() -> Self {
830        Self::new()
831    }
832}
833
834impl DistributedDriftDetector {
835    /// Create a new distributed drift detector
836    pub fn new() -> Self {
837        Self {
838            sketch_type: SketchType::TDigest,
839            num_quantiles: 20,
840            threshold: 0.1,
841        }
842    }
843
844    /// Set sketch type
845    #[must_use]
846    pub fn with_sketch_type(mut self, sketch_type: SketchType) -> Self {
847        self.sketch_type = sketch_type;
848        self
849    }
850
851    /// Set number of quantile points to compare
852    #[must_use]
853    pub fn with_num_quantiles(mut self, n: usize) -> Self {
854        self.num_quantiles = n.max(5);
855        self
856    }
857
858    /// Set drift detection threshold
859    #[must_use]
860    pub fn with_threshold(mut self, threshold: f64) -> Self {
861        self.threshold = threshold;
862        self
863    }
864
865    /// Create sketch from dataset
866    pub fn create_sketch(&self, dataset: &ArrowDataset) -> Result<DataSketch> {
867        DataSketch::from_dataset(dataset, self.sketch_type)
868    }
869
870    /// Compare two sketches for drift
871    pub fn compare(
872        &self,
873        reference: &DataSketch,
874        current: &DataSketch,
875    ) -> Result<Vec<SketchDriftResult>> {
876        if reference.sketch_type != current.sketch_type {
877            return Err(Error::invalid_config("Sketch types must match"));
878        }
879
880        let mut results = Vec::new();
881
882        // Get all columns
883        let columns: std::collections::HashSet<&String> = match self.sketch_type {
884            SketchType::TDigest => reference
885                .tdigests
886                .keys()
887                .chain(current.tdigests.keys())
888                .collect(),
889            SketchType::DDSketch => reference
890                .ddsketches
891                .keys()
892                .chain(current.ddsketches.keys())
893                .collect(),
894        };
895
896        for col in columns {
897            let result = self.compare_column(reference, current, col);
898            results.push(result);
899        }
900
901        Ok(results)
902    }
903
904    /// Compare a single column between sketches
905    fn compare_column(
906        &self,
907        reference: &DataSketch,
908        current: &DataSketch,
909        column: &str,
910    ) -> SketchDriftResult {
911        let mut max_diff = 0.0_f64;
912        let mut quantile_diffs = Vec::new();
913
914        // Compare at multiple quantile points
915        for i in 1..self.num_quantiles {
916            let q = i as f64 / self.num_quantiles as f64;
917
918            let ref_val = reference.quantile(column, q);
919            let cur_val = current.quantile(column, q);
920
921            if let (Some(r), Some(c)) = (ref_val, cur_val) {
922                // Relative difference
923                let diff = if r.abs() > f64::EPSILON {
924                    ((c - r) / r).abs()
925                } else if c.abs() > f64::EPSILON {
926                    1.0
927                } else {
928                    0.0
929                };
930
931                max_diff = max_diff.max(diff);
932                quantile_diffs.push((q, diff));
933            }
934        }
935
936        // Map max difference to severity
937        let severity = if max_diff < self.threshold {
938            DriftSeverity::None
939        } else if max_diff < self.threshold * 2.0 {
940            DriftSeverity::Low
941        } else if max_diff < self.threshold * 5.0 {
942            DriftSeverity::Medium
943        } else if max_diff < self.threshold * 10.0 {
944            DriftSeverity::High
945        } else {
946            DriftSeverity::Critical
947        };
948
949        SketchDriftResult {
950            column: column.to_string(),
951            statistic: max_diff,
952            severity,
953            quantile_diffs,
954        }
955    }
956}
957
958#[cfg(test)]
959mod tests {
960    use std::sync::Arc;
961
962    use arrow::{
963        array::Float64Array,
964        datatypes::{DataType, Field, Schema},
965        record_batch::RecordBatch,
966    };
967
968    use super::*;
969
970    // ========== TDigest tests ==========
971
972    #[test]
973    fn test_tdigest_new() {
974        let digest = TDigest::new(100.0);
975        assert!(digest.is_empty());
976        assert_eq!(digest.count(), 0.0);
977    }
978
979    #[test]
980    fn test_tdigest_add_single() {
981        let mut digest = TDigest::new(100.0);
982        digest.add(5.0);
983
984        assert!(!digest.is_empty());
985        assert_eq!(digest.count(), 1.0);
986        assert_eq!(digest.min(), 5.0);
987        assert_eq!(digest.max(), 5.0);
988    }
989
990    #[test]
991    fn test_tdigest_add_batch() {
992        let mut digest = TDigest::new(100.0);
993        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
994        digest.add_batch(&values);
995
996        assert_eq!(digest.count(), 100.0);
997        assert_eq!(digest.min(), 0.0);
998        assert_eq!(digest.max(), 99.0);
999    }
1000
1001    #[test]
1002    fn test_tdigest_quantile_median() {
1003        let mut digest = TDigest::new(100.0);
1004        let values: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1005        digest.add_batch(&values);
1006
1007        let median = digest.quantile(0.5);
1008        // Should be close to 500
1009        assert!((median - 500.0).abs() < 50.0, "Median was {}", median);
1010    }
1011
1012    #[test]
1013    fn test_tdigest_quantile_extremes() {
1014        let mut digest = TDigest::new(100.0);
1015        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1016        digest.add_batch(&values);
1017
1018        assert_eq!(digest.quantile(0.0), 0.0);
1019        assert_eq!(digest.quantile(1.0), 99.0);
1020    }
1021
1022    #[test]
1023    fn test_tdigest_quantile_quartiles() {
1024        let mut digest = TDigest::new(100.0);
1025        let values: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1026        digest.add_batch(&values);
1027
1028        let q1 = digest.quantile(0.25);
1029        let q3 = digest.quantile(0.75);
1030
1031        // Should be approximately 250 and 750
1032        assert!((q1 - 250.0).abs() < 50.0, "Q1 was {}", q1);
1033        assert!((q3 - 750.0).abs() < 50.0, "Q3 was {}", q3);
1034    }
1035
1036    #[test]
1037    fn test_tdigest_merge() {
1038        let mut digest1 = TDigest::new(100.0);
1039        let mut digest2 = TDigest::new(100.0);
1040
1041        let values1: Vec<f64> = (0..500).map(|i| i as f64).collect();
1042        let values2: Vec<f64> = (500..1000).map(|i| i as f64).collect();
1043
1044        digest1.add_batch(&values1);
1045        digest2.add_batch(&values2);
1046
1047        let merged = TDigest::merge(&[digest1, digest2]);
1048
1049        assert_eq!(merged.count(), 1000.0);
1050        assert_eq!(merged.min(), 0.0);
1051        assert_eq!(merged.max(), 999.0);
1052
1053        let median = merged.quantile(0.5);
1054        assert!(
1055            (median - 500.0).abs() < 50.0,
1056            "Merged median was {}",
1057            median
1058        );
1059    }
1060
1061    #[test]
1062    fn test_tdigest_serialization() {
1063        let mut digest = TDigest::new(100.0);
1064        digest.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1065
1066        let bytes = digest.to_bytes().expect("serialize");
1067        let restored = TDigest::from_bytes(&bytes).expect("deserialize");
1068
1069        assert_eq!(restored.count(), digest.count());
1070        assert_eq!(restored.min(), digest.min());
1071        assert_eq!(restored.max(), digest.max());
1072    }
1073
1074    #[test]
1075    fn test_tdigest_cdf() {
1076        let mut digest = TDigest::new(100.0);
1077        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1078        digest.add_batch(&values);
1079
1080        assert_eq!(digest.cdf(-1.0), 0.0);
1081        assert_eq!(digest.cdf(100.0), 1.0);
1082
1083        let cdf_50 = digest.cdf(50.0);
1084        assert!(cdf_50 > 0.4 && cdf_50 < 0.6, "CDF at 50 was {}", cdf_50);
1085    }
1086
1087    #[test]
1088    fn test_tdigest_empty_quantile() {
1089        let digest = TDigest::new(100.0);
1090        assert!(digest.quantile(0.5).is_nan());
1091    }
1092
1093    // ========== DDSketch tests ==========
1094
1095    #[test]
1096    fn test_ddsketch_new() {
1097        let sketch = DDSketch::new(0.01);
1098        assert!(sketch.is_empty());
1099        assert_eq!(sketch.count(), 0);
1100    }
1101
1102    #[test]
1103    fn test_ddsketch_add_single() {
1104        let mut sketch = DDSketch::new(0.01);
1105        sketch.add(5.0);
1106
1107        assert!(!sketch.is_empty());
1108        assert_eq!(sketch.count(), 1);
1109        assert_eq!(sketch.min(), 5.0);
1110        assert_eq!(sketch.max(), 5.0);
1111    }
1112
1113    #[test]
1114    fn test_ddsketch_add_batch() {
1115        let mut sketch = DDSketch::new(0.01);
1116        let values: Vec<f64> = (1..=100).map(|i| i as f64).collect();
1117        sketch.add_batch(&values);
1118
1119        assert_eq!(sketch.count(), 100);
1120        assert_eq!(sketch.min(), 1.0);
1121        assert_eq!(sketch.max(), 100.0);
1122    }
1123
1124    #[test]
1125    fn test_ddsketch_quantile_median() {
1126        let mut sketch = DDSketch::new(0.01);
1127        let values: Vec<f64> = (1..=1000).map(|i| i as f64).collect();
1128        sketch.add_batch(&values);
1129
1130        let median = sketch.quantile(0.5);
1131        // Should be close to 500, within relative error
1132        assert!((median - 500.0).abs() < 100.0, "Median was {}", median);
1133    }
1134
1135    #[test]
1136    fn test_ddsketch_quantile_extremes() {
1137        let mut sketch = DDSketch::new(0.01);
1138        let values: Vec<f64> = (1..=100).map(|i| i as f64).collect();
1139        sketch.add_batch(&values);
1140
1141        assert_eq!(sketch.quantile(0.0), 1.0);
1142        assert_eq!(sketch.quantile(1.0), 100.0);
1143    }
1144
1145    #[test]
1146    fn test_ddsketch_negative_values() {
1147        let mut sketch = DDSketch::new(0.01);
1148        let values: Vec<f64> = (-50..=50).map(|i| i as f64).collect();
1149        sketch.add_batch(&values);
1150
1151        assert_eq!(sketch.min(), -50.0);
1152        assert_eq!(sketch.max(), 50.0);
1153
1154        let median = sketch.quantile(0.5);
1155        assert!((median).abs() < 20.0, "Median was {}", median);
1156    }
1157
1158    #[test]
1159    fn test_ddsketch_merge() {
1160        let mut sketch1 = DDSketch::new(0.01);
1161        let mut sketch2 = DDSketch::new(0.01);
1162
1163        let values1: Vec<f64> = (1..=500).map(|i| i as f64).collect();
1164        let values2: Vec<f64> = (501..=1000).map(|i| i as f64).collect();
1165
1166        sketch1.add_batch(&values1);
1167        sketch2.add_batch(&values2);
1168
1169        let merged = DDSketch::merge(&[sketch1, sketch2]);
1170
1171        assert_eq!(merged.count(), 1000);
1172        assert_eq!(merged.min(), 1.0);
1173        assert_eq!(merged.max(), 1000.0);
1174    }
1175
1176    #[test]
1177    fn test_ddsketch_serialization() {
1178        let mut sketch = DDSketch::new(0.01);
1179        sketch.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1180
1181        let bytes = sketch.to_bytes().expect("serialize");
1182        let restored = DDSketch::from_bytes(&bytes).expect("deserialize");
1183
1184        assert_eq!(restored.count(), sketch.count());
1185        assert_eq!(restored.min(), sketch.min());
1186        assert_eq!(restored.max(), sketch.max());
1187    }
1188
1189    #[test]
1190    fn test_ddsketch_empty_quantile() {
1191        let sketch = DDSketch::new(0.01);
1192        assert!(sketch.quantile(0.5).is_nan());
1193    }
1194
1195    // ========== SketchType tests ==========
1196
1197    #[test]
1198    fn test_sketch_type_name() {
1199        assert_eq!(SketchType::TDigest.name(), "T-Digest");
1200        assert_eq!(SketchType::DDSketch.name(), "DDSketch");
1201    }
1202
1203    // ========== DataSketch tests ==========
1204
1205    fn make_float_dataset(values: Vec<f64>) -> ArrowDataset {
1206        let schema = Arc::new(Schema::new(vec![Field::new(
1207            "value",
1208            DataType::Float64,
1209            false,
1210        )]));
1211
1212        let batch = RecordBatch::try_new(
1213            Arc::clone(&schema),
1214            vec![Arc::new(Float64Array::from(values))],
1215        )
1216        .expect("batch");
1217
1218        ArrowDataset::from_batch(batch).expect("dataset")
1219    }
1220
1221    #[test]
1222    fn test_data_sketch_from_dataset_tdigest() {
1223        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1224        let dataset = make_float_dataset(values);
1225
1226        let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1227
1228        assert_eq!(sketch.row_count, 100);
1229        assert!(sketch.tdigests.contains_key("value"));
1230
1231        let median = sketch.quantile("value", 0.5);
1232        assert!(median.is_some());
1233    }
1234
1235    #[test]
1236    fn test_data_sketch_from_dataset_ddsketch() {
1237        let values: Vec<f64> = (1..=100).map(|i| i as f64).collect();
1238        let dataset = make_float_dataset(values);
1239
1240        let sketch = DataSketch::from_dataset(&dataset, SketchType::DDSketch).expect("sketch");
1241
1242        assert_eq!(sketch.row_count, 100);
1243        assert!(sketch.ddsketches.contains_key("value"));
1244    }
1245
1246    #[test]
1247    fn test_data_sketch_merge() {
1248        let values1: Vec<f64> = (0..50).map(|i| i as f64).collect();
1249        let values2: Vec<f64> = (50..100).map(|i| i as f64).collect();
1250
1251        let dataset1 = make_float_dataset(values1);
1252        let dataset2 = make_float_dataset(values2);
1253
1254        let sketch1 = DataSketch::from_dataset(&dataset1, SketchType::TDigest).expect("sketch1");
1255        let sketch2 = DataSketch::from_dataset(&dataset2, SketchType::TDigest).expect("sketch2");
1256
1257        let merged = DataSketch::merge(&[sketch1, sketch2]).expect("merge");
1258
1259        assert_eq!(merged.row_count, 100);
1260    }
1261
1262    #[test]
1263    fn test_data_sketch_serialization() {
1264        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1265        let dataset = make_float_dataset(values);
1266
1267        let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1268        let bytes = sketch.to_bytes().expect("serialize");
1269        let restored = DataSketch::from_bytes(&bytes).expect("deserialize");
1270
1271        assert_eq!(restored.row_count, sketch.row_count);
1272        assert_eq!(restored.sketch_type, sketch.sketch_type);
1273    }
1274
1275    // ========== DistributedDriftDetector tests ==========
1276
1277    #[test]
1278    fn test_distributed_detector_new() {
1279        let detector = DistributedDriftDetector::new();
1280        assert_eq!(detector.sketch_type, SketchType::TDigest);
1281    }
1282
1283    #[test]
1284    fn test_distributed_detector_builder() {
1285        let detector = DistributedDriftDetector::new()
1286            .with_sketch_type(SketchType::DDSketch)
1287            .with_num_quantiles(50)
1288            .with_threshold(0.2);
1289
1290        assert_eq!(detector.sketch_type, SketchType::DDSketch);
1291        assert_eq!(detector.num_quantiles, 50);
1292        assert!((detector.threshold - 0.2).abs() < f64::EPSILON);
1293    }
1294
1295    #[test]
1296    fn test_distributed_detector_no_drift() {
1297        let values: Vec<f64> = (0..500).map(|i| i as f64).collect();
1298        let dataset1 = make_float_dataset(values.clone());
1299        let dataset2 = make_float_dataset(values);
1300
1301        let detector = DistributedDriftDetector::new();
1302        let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1303        let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1304
1305        let results = detector.compare(&sketch1, &sketch2).expect("compare");
1306
1307        assert_eq!(results.len(), 1);
1308        assert_eq!(results[0].severity, DriftSeverity::None);
1309    }
1310
1311    #[test]
1312    fn test_distributed_detector_with_drift() {
1313        let values1: Vec<f64> = (0..500).map(|i| i as f64).collect();
1314        let values2: Vec<f64> = (500..1000).map(|i| i as f64).collect();
1315
1316        let dataset1 = make_float_dataset(values1);
1317        let dataset2 = make_float_dataset(values2);
1318
1319        let detector = DistributedDriftDetector::new().with_threshold(0.1);
1320        let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1321        let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1322
1323        let results = detector.compare(&sketch1, &sketch2).expect("compare");
1324
1325        assert_eq!(results.len(), 1);
1326        assert!(results[0].severity.is_drift(), "Should detect drift");
1327        assert!(results[0].statistic > 0.0);
1328    }
1329
1330    #[test]
1331    fn test_distributed_detector_ddsketch() {
1332        let values1: Vec<f64> = (1..=500).map(|i| i as f64).collect();
1333        let values2: Vec<f64> = (1..=500).map(|i| i as f64).collect();
1334
1335        let dataset1 = make_float_dataset(values1);
1336        let dataset2 = make_float_dataset(values2);
1337
1338        let detector = DistributedDriftDetector::new().with_sketch_type(SketchType::DDSketch);
1339        let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1340        let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1341
1342        let results = detector.compare(&sketch1, &sketch2).expect("compare");
1343
1344        assert!(!results.is_empty());
1345        assert_eq!(results[0].severity, DriftSeverity::None);
1346    }
1347
1348    #[test]
1349    fn test_distributed_detector_quantile_diffs() {
1350        let values1: Vec<f64> = (0..500).map(|i| i as f64).collect();
1351        let values2: Vec<f64> = (100..600).map(|i| i as f64).collect();
1352
1353        let dataset1 = make_float_dataset(values1);
1354        let dataset2 = make_float_dataset(values2);
1355
1356        let detector = DistributedDriftDetector::new().with_num_quantiles(10);
1357        let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1358        let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1359
1360        let results = detector.compare(&sketch1, &sketch2).expect("compare");
1361
1362        assert!(!results[0].quantile_diffs.is_empty());
1363    }
1364
1365    // ========== Additional edge case tests ==========
1366
1367    #[test]
1368    fn test_centroid_new_and_merge() {
1369        let mut c1 = Centroid::new(10.0, 2.0);
1370        let c2 = Centroid::new(20.0, 3.0);
1371        c1.merge(&c2);
1372        // Weighted average: (10*2 + 20*3) / 5 = 80/5 = 16
1373        assert!((c1.mean - 16.0).abs() < f64::EPSILON);
1374        assert_eq!(c1.weight, 5.0);
1375    }
1376
1377    #[test]
1378    fn test_centroid_merge_zero_weights() {
1379        let mut c1 = Centroid::new(10.0, 0.0);
1380        let c2 = Centroid::new(20.0, 0.0);
1381        c1.merge(&c2);
1382        // total_weight = 0, so merge shouldn't change mean
1383        assert_eq!(c1.mean, 10.0);
1384        assert_eq!(c1.weight, 0.0);
1385    }
1386
1387    #[test]
1388    fn test_tdigest_add_weighted_non_finite() {
1389        let mut digest = TDigest::new(100.0);
1390        digest.add_weighted(f64::NAN, 1.0);
1391        digest.add_weighted(f64::INFINITY, 1.0);
1392        digest.add_weighted(f64::NEG_INFINITY, 1.0);
1393        assert!(digest.is_empty());
1394    }
1395
1396    #[test]
1397    fn test_tdigest_add_weighted_zero_weight() {
1398        let mut digest = TDigest::new(100.0);
1399        digest.add_weighted(5.0, 0.0);
1400        digest.add_weighted(10.0, -1.0);
1401        assert!(digest.is_empty());
1402    }
1403
1404    #[test]
1405    fn test_tdigest_num_centroids() {
1406        let mut digest = TDigest::new(100.0);
1407        assert_eq!(digest.num_centroids(), 0);
1408
1409        digest.add(5.0);
1410        assert!(digest.num_centroids() > 0);
1411    }
1412
1413    #[test]
1414    fn test_tdigest_quantile_clamp() {
1415        let mut digest = TDigest::new(100.0);
1416        digest.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1417
1418        // Test clamping of out-of-range quantiles
1419        let q_neg = digest.quantile(-0.5);
1420        let q_over = digest.quantile(1.5);
1421        assert_eq!(q_neg, digest.min());
1422        assert_eq!(q_over, digest.max());
1423    }
1424
1425    #[test]
1426    fn test_tdigest_merge_empty() {
1427        let merged = TDigest::merge(&[]);
1428        assert!(merged.is_empty());
1429        assert_eq!(merged.compression, 100.0);
1430    }
1431
1432    #[test]
1433    fn test_tdigest_cdf_empty() {
1434        let digest = TDigest::new(100.0);
1435        assert_eq!(digest.cdf(5.0), 0.0);
1436    }
1437
1438    #[test]
1439    fn test_tdigest_clone() {
1440        let mut digest = TDigest::new(100.0);
1441        digest.add_batch(&[1.0, 2.0, 3.0]);
1442
1443        let cloned = digest.clone();
1444        assert_eq!(cloned.count(), digest.count());
1445        assert_eq!(cloned.min(), digest.min());
1446        assert_eq!(cloned.max(), digest.max());
1447    }
1448
1449    #[test]
1450    fn test_tdigest_debug() {
1451        let digest = TDigest::new(100.0);
1452        let debug = format!("{:?}", digest);
1453        assert!(debug.contains("TDigest"));
1454    }
1455
1456    #[test]
1457    fn test_ddsketch_add_non_finite() {
1458        let mut sketch = DDSketch::new(0.01);
1459        sketch.add(f64::NAN);
1460        sketch.add(f64::INFINITY);
1461        sketch.add(f64::NEG_INFINITY);
1462        assert!(sketch.is_empty());
1463    }
1464
1465    #[test]
1466    fn test_ddsketch_add_zero() {
1467        let mut sketch = DDSketch::new(0.01);
1468        sketch.add(0.0);
1469        assert_eq!(sketch.count(), 1);
1470        assert_eq!(sketch.quantile(0.5), 0.0);
1471    }
1472
1473    #[test]
1474    fn test_ddsketch_quantile_clamp() {
1475        let mut sketch = DDSketch::new(0.01);
1476        sketch.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1477
1478        let q_neg = sketch.quantile(-0.5);
1479        let q_over = sketch.quantile(1.5);
1480        assert_eq!(q_neg, sketch.min());
1481        assert_eq!(q_over, sketch.max());
1482    }
1483
1484    #[test]
1485    fn test_ddsketch_merge_empty() {
1486        let merged = DDSketch::merge(&[]);
1487        assert!(merged.is_empty());
1488    }
1489
1490    #[test]
1491    fn test_ddsketch_alpha_clamp() {
1492        // Alpha too small
1493        let sketch1 = DDSketch::new(0.00001);
1494        assert!(sketch1.alpha >= 0.0001);
1495
1496        // Alpha too large
1497        let sketch2 = DDSketch::new(0.9);
1498        assert!(sketch2.alpha <= 0.5);
1499    }
1500
1501    #[test]
1502    fn test_ddsketch_clone() {
1503        let mut sketch = DDSketch::new(0.01);
1504        sketch.add_batch(&[1.0, 2.0, 3.0]);
1505
1506        let cloned = sketch.clone();
1507        assert_eq!(cloned.count(), sketch.count());
1508        assert_eq!(cloned.min(), sketch.min());
1509    }
1510
1511    #[test]
1512    fn test_ddsketch_debug() {
1513        let sketch = DDSketch::new(0.01);
1514        let debug = format!("{:?}", sketch);
1515        assert!(debug.contains("DDSketch"));
1516    }
1517
1518    #[test]
1519    fn test_sketch_type_equality() {
1520        assert_eq!(SketchType::TDigest, SketchType::TDigest);
1521        assert_ne!(SketchType::TDigest, SketchType::DDSketch);
1522    }
1523
1524    #[test]
1525    fn test_sketch_type_clone() {
1526        let st = SketchType::DDSketch;
1527        let cloned = st;
1528        assert_eq!(st, cloned);
1529    }
1530
1531    #[test]
1532    fn test_sketch_type_debug() {
1533        let st = SketchType::TDigest;
1534        let debug = format!("{:?}", st);
1535        assert!(debug.contains("TDigest"));
1536    }
1537
1538    #[test]
1539    fn test_data_sketch_new() {
1540        let sketch = DataSketch::new(SketchType::TDigest);
1541        assert_eq!(sketch.sketch_type, SketchType::TDigest);
1542        assert_eq!(sketch.row_count, 0);
1543        assert!(sketch.source.is_none());
1544    }
1545
1546    #[test]
1547    fn test_data_sketch_with_source() {
1548        let sketch = DataSketch::new(SketchType::TDigest).with_source("node1");
1549        assert_eq!(sketch.source, Some("node1".to_string()));
1550    }
1551
1552    #[test]
1553    fn test_data_sketch_merge_empty_error() {
1554        let result = DataSketch::merge(&[]);
1555        assert!(result.is_err());
1556    }
1557
1558    #[test]
1559    fn test_data_sketch_merge_different_types_error() {
1560        let sketch1 = DataSketch::new(SketchType::TDigest);
1561        let sketch2 = DataSketch::new(SketchType::DDSketch);
1562
1563        let result = DataSketch::merge(&[sketch1, sketch2]);
1564        assert!(result.is_err());
1565    }
1566
1567    #[test]
1568    fn test_data_sketch_quantile_not_found() {
1569        let sketch = DataSketch::new(SketchType::TDigest);
1570        assert!(sketch.quantile("nonexistent", 0.5).is_none());
1571    }
1572
1573    #[test]
1574    fn test_data_sketch_clone() {
1575        let values: Vec<f64> = (0..50).map(|i| i as f64).collect();
1576        let dataset = make_float_dataset(values);
1577        let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1578
1579        let cloned = sketch.clone();
1580        assert_eq!(cloned.row_count, sketch.row_count);
1581        assert_eq!(cloned.sketch_type, sketch.sketch_type);
1582    }
1583
1584    #[test]
1585    fn test_data_sketch_debug() {
1586        let sketch = DataSketch::new(SketchType::DDSketch);
1587        let debug = format!("{:?}", sketch);
1588        assert!(debug.contains("DataSketch"));
1589    }
1590
1591    #[test]
1592    fn test_sketch_drift_result_clone() {
1593        let result = SketchDriftResult {
1594            column: "test".to_string(),
1595            statistic: 0.5,
1596            severity: DriftSeverity::Medium,
1597            quantile_diffs: vec![(0.5, 0.1)],
1598        };
1599
1600        let cloned = result.clone();
1601        assert_eq!(cloned.column, result.column);
1602        assert_eq!(cloned.statistic, result.statistic);
1603    }
1604
1605    #[test]
1606    fn test_sketch_drift_result_debug() {
1607        let result = SketchDriftResult {
1608            column: "test".to_string(),
1609            statistic: 0.5,
1610            severity: DriftSeverity::None,
1611            quantile_diffs: vec![],
1612        };
1613
1614        let debug = format!("{:?}", result);
1615        assert!(debug.contains("SketchDriftResult"));
1616    }
1617
1618    #[test]
1619    fn test_distributed_detector_default() {
1620        let detector = DistributedDriftDetector::default();
1621        assert_eq!(detector.sketch_type, SketchType::TDigest);
1622    }
1623
1624    #[test]
1625    fn test_distributed_detector_compare_type_mismatch() {
1626        let sketch1 = DataSketch::new(SketchType::TDigest);
1627        let sketch2 = DataSketch::new(SketchType::DDSketch);
1628
1629        let detector = DistributedDriftDetector::new();
1630        let result = detector.compare(&sketch1, &sketch2);
1631        assert!(result.is_err());
1632    }
1633
1634    #[test]
1635    fn test_distributed_detector_num_quantiles_min() {
1636        let detector = DistributedDriftDetector::new().with_num_quantiles(1);
1637        assert!(detector.num_quantiles >= 5);
1638    }
1639
1640    #[test]
1641    fn test_tdigest_compression_triggers() {
1642        // Add enough values to trigger compression
1643        let mut digest = TDigest::new(10.0); // Low compression to trigger often
1644        for i in 0..1000 {
1645            digest.add(i as f64);
1646        }
1647        // Should have compressed
1648        assert!(digest.num_centroids() < 1000);
1649    }
1650
1651    #[test]
1652    fn test_tdigest_serialization_invalid() {
1653        let result = TDigest::from_bytes(&[0, 1, 2, 3]);
1654        assert!(result.is_err());
1655    }
1656
1657    #[test]
1658    fn test_ddsketch_serialization_invalid() {
1659        let result = DDSketch::from_bytes(&[0, 1, 2, 3]);
1660        assert!(result.is_err());
1661    }
1662
1663    #[test]
1664    fn test_data_sketch_serialization_invalid() {
1665        let result = DataSketch::from_bytes(&[0, 1, 2, 3]);
1666        assert!(result.is_err());
1667    }
1668
1669    #[test]
1670    fn test_centroid_clone() {
1671        let c = Centroid::new(5.0, 2.0);
1672        let cloned = c.clone();
1673        assert_eq!(cloned.mean, c.mean);
1674        assert_eq!(cloned.weight, c.weight);
1675    }
1676
1677    #[test]
1678    fn test_centroid_debug() {
1679        let c = Centroid::new(5.0, 2.0);
1680        let debug = format!("{:?}", c);
1681        assert!(debug.contains("Centroid"));
1682    }
1683
1684    #[test]
1685    fn test_data_sketch_merge_ddsketch() {
1686        let values1: Vec<f64> = (1..=50).map(|i| i as f64).collect();
1687        let values2: Vec<f64> = (51..=100).map(|i| i as f64).collect();
1688
1689        let dataset1 = make_float_dataset(values1);
1690        let dataset2 = make_float_dataset(values2);
1691
1692        let sketch1 = DataSketch::from_dataset(&dataset1, SketchType::DDSketch).expect("sketch1");
1693        let sketch2 = DataSketch::from_dataset(&dataset2, SketchType::DDSketch).expect("sketch2");
1694
1695        let merged = DataSketch::merge(&[sketch1, sketch2]).expect("merge");
1696
1697        assert_eq!(merged.row_count, 100);
1698        assert_eq!(merged.sketch_type, SketchType::DDSketch);
1699    }
1700
1701    #[test]
1702    fn test_distributed_detector_severity_levels() {
1703        // Create significantly different distributions to trigger different severity
1704        // levels
1705        let values1: Vec<f64> = (0..100).map(|i| i as f64).collect();
1706        let values2: Vec<f64> = (0..100).map(|i| (i * 50) as f64).collect(); // 50x different
1707
1708        let dataset1 = make_float_dataset(values1);
1709        let dataset2 = make_float_dataset(values2);
1710
1711        let detector = DistributedDriftDetector::new().with_threshold(0.01);
1712        let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1713        let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1714
1715        let results = detector.compare(&sketch1, &sketch2).expect("compare");
1716        assert!(!results.is_empty());
1717        // Should detect some level of drift
1718        assert!(results[0].statistic > 0.0);
1719    }
1720
1721    #[test]
1722    fn test_data_sketch_add_dataset_int_types() {
1723        // Test with Int32 and Int64 columns
1724        use arrow::array::{Int32Array, Int64Array};
1725
1726        let schema = Arc::new(Schema::new(vec![
1727            Field::new("int32_col", DataType::Int32, false),
1728            Field::new("int64_col", DataType::Int64, false),
1729        ]));
1730
1731        let batch = RecordBatch::try_new(
1732            Arc::clone(&schema),
1733            vec![
1734                Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
1735                Arc::new(Int64Array::from(vec![10i64, 20, 30, 40, 50])),
1736            ],
1737        )
1738        .expect("batch");
1739
1740        let dataset = ArrowDataset::from_batch(batch).expect("dataset");
1741        let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1742
1743        assert_eq!(sketch.row_count, 5);
1744        assert!(sketch.tdigests.contains_key("int32_col"));
1745        assert!(sketch.tdigests.contains_key("int64_col"));
1746    }
1747
1748    #[test]
1749    fn test_data_sketch_add_dataset_float32() {
1750        use arrow::array::Float32Array;
1751
1752        let schema = Arc::new(Schema::new(vec![Field::new(
1753            "float32_col",
1754            DataType::Float32,
1755            false,
1756        )]));
1757
1758        let batch = RecordBatch::try_new(
1759            Arc::clone(&schema),
1760            vec![Arc::new(Float32Array::from(vec![
1761                1.0f32, 2.0, 3.0, 4.0, 5.0,
1762            ]))],
1763        )
1764        .expect("batch");
1765
1766        let dataset = ArrowDataset::from_batch(batch).expect("dataset");
1767        let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1768
1769        assert_eq!(sketch.row_count, 5);
1770        assert!(sketch.tdigests.contains_key("float32_col"));
1771    }
1772
1773    #[test]
1774    fn test_data_sketch_non_numeric_columns_skipped() {
1775        use arrow::array::StringArray;
1776
1777        let schema = Arc::new(Schema::new(vec![
1778            Field::new("value", DataType::Float64, false),
1779            Field::new("name", DataType::Utf8, false),
1780        ]));
1781
1782        let batch = RecordBatch::try_new(
1783            Arc::clone(&schema),
1784            vec![
1785                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
1786                Arc::new(StringArray::from(vec!["a", "b", "c"])),
1787            ],
1788        )
1789        .expect("batch");
1790
1791        let dataset = ArrowDataset::from_batch(batch).expect("dataset");
1792        let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1793
1794        // Only numeric column should be sketched
1795        assert!(sketch.tdigests.contains_key("value"));
1796        assert!(!sketch.tdigests.contains_key("name"));
1797    }
1798
1799    #[test]
1800    fn test_distributed_detector_compare_missing_column() {
1801        // Test when one sketch has a column the other doesn't
1802        let values1: Vec<f64> = (0..100).map(|i| i as f64).collect();
1803        let dataset1 = make_float_dataset(values1);
1804
1805        let detector = DistributedDriftDetector::new();
1806        let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1807
1808        // Create empty sketch
1809        let sketch2 = DataSketch::new(SketchType::TDigest);
1810
1811        let results = detector.compare(&sketch1, &sketch2).expect("compare");
1812        // Should still produce results, even if quantiles are None
1813        assert!(!results.is_empty());
1814    }
1815}