1#![allow(clippy::cast_precision_loss)]
27#![allow(clippy::cast_possible_truncation)]
28#![allow(clippy::cast_sign_loss)]
29#![allow(clippy::float_cmp)]
30#![allow(clippy::suboptimal_flops)]
31
32use std::collections::HashMap;
33
34use serde::{Deserialize, Serialize};
35
36use crate::{
37 dataset::{ArrowDataset, Dataset},
38 drift::DriftSeverity,
39 error::{Error, Result},
40};
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct Centroid {
45 pub mean: f64,
47 pub weight: f64,
49}
50
51impl Centroid {
52 pub fn new(mean: f64, weight: f64) -> Self {
54 Self { mean, weight }
55 }
56
57 pub fn merge(&mut self, other: &Self) {
59 let total_weight = self.weight + other.weight;
60 if total_weight > 0.0 {
61 self.mean = (self.mean * self.weight + other.mean * other.weight) / total_weight;
62 self.weight = total_weight;
63 }
64 }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct TDigest {
73 centroids: Vec<Centroid>,
75 compression: f64,
77 total_weight: f64,
79 min: f64,
81 max: f64,
83}
84
85impl TDigest {
86 pub fn new(compression: f64) -> Self {
90 Self {
91 centroids: Vec::new(),
92 compression,
93 total_weight: 0.0,
94 min: f64::INFINITY,
95 max: f64::NEG_INFINITY,
96 }
97 }
98
99 pub fn add(&mut self, value: f64) {
101 self.add_weighted(value, 1.0);
102 }
103
104 pub fn add_weighted(&mut self, value: f64, weight: f64) {
106 if !value.is_finite() || weight <= 0.0 {
107 return;
108 }
109
110 self.min = self.min.min(value);
111 self.max = self.max.max(value);
112 self.total_weight += weight;
113
114 let idx = self.find_insertion_point(value);
116
117 if !self.centroids.is_empty() {
119 let max_weight = self.max_weight_at(idx);
120 let nearest = if idx < self.centroids.len() {
121 idx
122 } else {
123 self.centroids.len() - 1
124 };
125
126 if self.centroids[nearest].weight + weight <= max_weight {
127 self.centroids[nearest].merge(&Centroid::new(value, weight));
128 return;
129 }
130 }
131
132 self.centroids.insert(idx, Centroid::new(value, weight));
134
135 if self.centroids.len() > self.compression as usize * 2 {
137 self.compress();
138 }
139 }
140
141 pub fn add_batch(&mut self, values: &[f64]) {
143 for &v in values {
144 self.add(v);
145 }
146 }
147
148 pub fn quantile(&self, q: f64) -> f64 {
150 if self.centroids.is_empty() {
151 return f64::NAN;
152 }
153
154 let q = q.clamp(0.0, 1.0);
155
156 if q == 0.0 {
157 return self.min;
158 }
159 if q == 1.0 {
160 return self.max;
161 }
162
163 let target_weight = q * self.total_weight;
164 let mut cumulative = 0.0;
165
166 for (i, centroid) in self.centroids.iter().enumerate() {
167 let next_cumulative = cumulative + centroid.weight;
168
169 if next_cumulative >= target_weight {
170 let prev_mean = if i > 0 {
172 self.centroids[i - 1].mean
173 } else {
174 self.min
175 };
176 let next_mean = if i < self.centroids.len() - 1 {
177 self.centroids[i + 1].mean
178 } else {
179 self.max
180 };
181
182 let ratio = if centroid.weight > 0.0 {
183 (target_weight - cumulative) / centroid.weight
184 } else {
185 0.5
186 };
187
188 let low = (prev_mean + centroid.mean) / 2.0;
190 let high = (centroid.mean + next_mean) / 2.0;
191
192 return low + ratio * (high - low);
193 }
194
195 cumulative = next_cumulative;
196 }
197
198 self.max
199 }
200
201 pub fn cdf(&self, x: f64) -> f64 {
203 if self.centroids.is_empty() || self.total_weight == 0.0 {
204 return 0.0;
205 }
206
207 if x <= self.min {
208 return 0.0;
209 }
210 if x >= self.max {
211 return 1.0;
212 }
213
214 let mut cumulative = 0.0;
215
216 for centroid in &self.centroids {
217 if x < centroid.mean {
218 return cumulative / self.total_weight;
220 }
221 cumulative += centroid.weight;
222 }
223
224 cumulative / self.total_weight
225 }
226
227 pub fn merge(digests: &[Self]) -> Self {
229 if digests.is_empty() {
230 return Self::new(100.0);
231 }
232
233 let compression = digests.iter().map(|d| d.compression).fold(0.0, f64::max);
234 let mut result = Self::new(compression);
235
236 let mut all_centroids: Vec<Centroid> = digests
238 .iter()
239 .flat_map(|d| d.centroids.iter().cloned())
240 .collect();
241
242 all_centroids.sort_by(|a, b| {
244 a.mean
245 .partial_cmp(&b.mean)
246 .unwrap_or(std::cmp::Ordering::Equal)
247 });
248
249 result.min = digests.iter().map(|d| d.min).fold(f64::INFINITY, f64::min);
251 result.max = digests
252 .iter()
253 .map(|d| d.max)
254 .fold(f64::NEG_INFINITY, f64::max);
255 result.total_weight = digests.iter().map(|d| d.total_weight).sum();
256
257 for centroid in all_centroids {
259 result.centroids.push(centroid);
260 }
261
262 result.compress();
264
265 result
266 }
267
268 pub fn count(&self) -> f64 {
270 self.total_weight
271 }
272
273 pub fn min(&self) -> f64 {
275 self.min
276 }
277
278 pub fn max(&self) -> f64 {
280 self.max
281 }
282
283 pub fn num_centroids(&self) -> usize {
285 self.centroids.len()
286 }
287
288 pub fn is_empty(&self) -> bool {
290 self.centroids.is_empty()
291 }
292
293 pub fn to_bytes(&self) -> Result<Vec<u8>> {
295 rmp_serde::to_vec(self)
296 .map_err(|e| Error::Format(format!("Failed to serialize TDigest: {e}")))
297 }
298
299 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
301 rmp_serde::from_slice(bytes)
302 .map_err(|e| Error::Format(format!("Failed to deserialize TDigest: {e}")))
303 }
304
305 fn find_insertion_point(&self, value: f64) -> usize {
308 self.centroids
309 .binary_search_by(|c| {
310 c.mean
311 .partial_cmp(&value)
312 .unwrap_or(std::cmp::Ordering::Equal)
313 })
314 .unwrap_or_else(|i| i)
315 }
316
317 fn max_weight_at(&self, index: usize) -> f64 {
318 let q = if self.total_weight > 0.0 {
320 let cumulative: f64 = self.centroids.iter().take(index).map(|c| c.weight).sum();
321 cumulative / self.total_weight
322 } else {
323 0.5
324 };
325
326 let k = 4.0 * self.compression * q * (1.0 - q);
328 k.max(1.0)
329 }
330
331 fn compress(&mut self) {
332 if self.centroids.len() <= 1 {
333 return;
334 }
335
336 self.centroids.sort_by(|a, b| {
338 a.mean
339 .partial_cmp(&b.mean)
340 .unwrap_or(std::cmp::Ordering::Equal)
341 });
342
343 let mut new_centroids = Vec::with_capacity(self.compression as usize);
344 let mut current = self.centroids[0].clone();
345 let mut cumulative = 0.0;
346
347 for centroid in self.centroids.iter().skip(1) {
348 let q = cumulative / self.total_weight;
349 let max_weight = 4.0 * self.compression * q * (1.0 - q);
350
351 if current.weight + centroid.weight <= max_weight.max(1.0) {
352 current.merge(centroid);
353 } else {
354 cumulative += current.weight;
355 new_centroids.push(current);
356 current = centroid.clone();
357 }
358 }
359
360 new_centroids.push(current);
361 self.centroids = new_centroids;
362 }
363}
364
365#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct DDSketch {
371 alpha: f64,
373 gamma: f64,
375 ln_gamma: f64,
377 positive_buckets: HashMap<i32, u64>,
379 negative_buckets: HashMap<i32, u64>,
381 zero_count: u64,
383 total_count: u64,
385 min: f64,
387 max: f64,
389}
390
391impl DDSketch {
392 pub fn new(alpha: f64) -> Self {
396 let alpha = alpha.clamp(0.0001, 0.5);
397 let gamma = (1.0 + alpha) / (1.0 - alpha);
398
399 Self {
400 alpha,
401 gamma,
402 ln_gamma: gamma.ln(),
403 positive_buckets: HashMap::new(),
404 negative_buckets: HashMap::new(),
405 zero_count: 0,
406 total_count: 0,
407 min: f64::INFINITY,
408 max: f64::NEG_INFINITY,
409 }
410 }
411
412 pub fn add(&mut self, value: f64) {
414 if !value.is_finite() {
415 return;
416 }
417
418 self.min = self.min.min(value);
419 self.max = self.max.max(value);
420 self.total_count += 1;
421
422 if value > 0.0 {
423 let bucket = self.bucket_index(value);
424 *self.positive_buckets.entry(bucket).or_insert(0) += 1;
425 } else if value < 0.0 {
426 let bucket = self.bucket_index(-value);
427 *self.negative_buckets.entry(bucket).or_insert(0) += 1;
428 } else {
429 self.zero_count += 1;
430 }
431 }
432
433 pub fn add_batch(&mut self, values: &[f64]) {
435 for &v in values {
436 self.add(v);
437 }
438 }
439
440 pub fn quantile(&self, q: f64) -> f64 {
442 if self.total_count == 0 {
443 return f64::NAN;
444 }
445
446 let q = q.clamp(0.0, 1.0);
447
448 if q == 0.0 {
449 return self.min;
450 }
451 if q == 1.0 {
452 return self.max;
453 }
454
455 let target_rank = (q * self.total_count as f64).ceil() as u64;
456 let mut cumulative: u64 = 0;
457
458 let mut neg_buckets: Vec<_> = self.negative_buckets.iter().collect();
461 neg_buckets.sort_by(|a, b| b.0.cmp(a.0));
462
463 for (&bucket, &count) in &neg_buckets {
464 cumulative += count;
465 if cumulative >= target_rank {
466 return -self.bucket_to_value(bucket);
467 }
468 }
469
470 cumulative += self.zero_count;
472 if cumulative >= target_rank {
473 return 0.0;
474 }
475
476 let mut pos_buckets: Vec<_> = self.positive_buckets.iter().collect();
478 pos_buckets.sort_by_key(|&(k, _)| *k);
479
480 for (&bucket, &count) in &pos_buckets {
481 cumulative += count;
482 if cumulative >= target_rank {
483 return self.bucket_to_value(bucket);
484 }
485 }
486
487 self.max
488 }
489
490 pub fn merge(sketches: &[Self]) -> Self {
492 if sketches.is_empty() {
493 return Self::new(0.01);
494 }
495
496 let alpha = sketches
498 .iter()
499 .map(|s| s.alpha)
500 .fold(f64::INFINITY, f64::min);
501 let mut result = Self::new(alpha);
502
503 result.min = sketches.iter().map(|s| s.min).fold(f64::INFINITY, f64::min);
504 result.max = sketches
505 .iter()
506 .map(|s| s.max)
507 .fold(f64::NEG_INFINITY, f64::max);
508 result.total_count = sketches.iter().map(|s| s.total_count).sum();
509 result.zero_count = sketches.iter().map(|s| s.zero_count).sum();
510
511 for sketch in sketches {
512 for (&bucket, &count) in &sketch.positive_buckets {
513 *result.positive_buckets.entry(bucket).or_insert(0) += count;
514 }
515 for (&bucket, &count) in &sketch.negative_buckets {
516 *result.negative_buckets.entry(bucket).or_insert(0) += count;
517 }
518 }
519
520 result
521 }
522
523 pub fn count(&self) -> u64 {
525 self.total_count
526 }
527
528 pub fn min(&self) -> f64 {
530 self.min
531 }
532
533 pub fn max(&self) -> f64 {
535 self.max
536 }
537
538 pub fn is_empty(&self) -> bool {
540 self.total_count == 0
541 }
542
543 pub fn to_bytes(&self) -> Result<Vec<u8>> {
545 rmp_serde::to_vec(self)
546 .map_err(|e| Error::Format(format!("Failed to serialize DDSketch: {e}")))
547 }
548
549 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
551 rmp_serde::from_slice(bytes)
552 .map_err(|e| Error::Format(format!("Failed to deserialize DDSketch: {e}")))
553 }
554
555 fn bucket_index(&self, value: f64) -> i32 {
558 if value <= 0.0 {
559 return i32::MIN;
560 }
561 (value.ln() / self.ln_gamma).ceil() as i32
562 }
563
564 fn bucket_to_value(&self, bucket: i32) -> f64 {
565 (2.0 * self.gamma.powi(bucket)) / (1.0 + self.gamma)
566 }
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
571pub enum SketchType {
572 TDigest,
574 DDSketch,
576}
577
578impl SketchType {
579 pub fn name(&self) -> &'static str {
581 match self {
582 Self::TDigest => "T-Digest",
583 Self::DDSketch => "DDSketch",
584 }
585 }
586}
587
588#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct DataSketch {
591 pub sketch_type: SketchType,
593 pub tdigests: HashMap<String, TDigest>,
595 pub ddsketches: HashMap<String, DDSketch>,
597 pub row_count: u64,
599 pub source: Option<String>,
601}
602
603impl DataSketch {
604 pub fn new(sketch_type: SketchType) -> Self {
606 Self {
607 sketch_type,
608 tdigests: HashMap::new(),
609 ddsketches: HashMap::new(),
610 row_count: 0,
611 source: None,
612 }
613 }
614
615 #[must_use]
617 pub fn with_source(mut self, source: impl Into<String>) -> Self {
618 self.source = Some(source.into());
619 self
620 }
621
622 pub fn from_dataset(dataset: &ArrowDataset, sketch_type: SketchType) -> Result<Self> {
624 let mut sketch = Self::new(sketch_type);
625 sketch.add_dataset(dataset)?;
626 Ok(sketch)
627 }
628
629 pub fn add_dataset(&mut self, dataset: &ArrowDataset) -> Result<()> {
631 use arrow::{
632 array::{Array, Float64Array, Int32Array, Int64Array},
633 datatypes::DataType,
634 };
635
636 let schema = dataset.schema();
637
638 for batch in dataset.iter() {
639 self.row_count += batch.num_rows() as u64;
640
641 for (col_idx, field) in schema.fields().iter().enumerate() {
642 let is_numeric = matches!(
644 field.data_type(),
645 DataType::Float64 | DataType::Float32 | DataType::Int32 | DataType::Int64
646 );
647
648 if !is_numeric {
649 continue;
650 }
651
652 let col_name = field.name();
653 let array = batch.column(col_idx);
654
655 let values: Vec<f64> = match field.data_type() {
657 DataType::Float64 => {
658 if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
659 (0..arr.len())
660 .filter(|&i| !arr.is_null(i))
661 .map(|i| arr.value(i))
662 .collect()
663 } else {
664 continue;
665 }
666 }
667 DataType::Float32 => {
668 if let Some(arr) =
669 array.as_any().downcast_ref::<arrow::array::Float32Array>()
670 {
671 (0..arr.len())
672 .filter(|&i| !arr.is_null(i))
673 .map(|i| f64::from(arr.value(i)))
674 .collect()
675 } else {
676 continue;
677 }
678 }
679 DataType::Int32 => {
680 if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
681 (0..arr.len())
682 .filter(|&i| !arr.is_null(i))
683 .map(|i| f64::from(arr.value(i)))
684 .collect()
685 } else {
686 continue;
687 }
688 }
689 DataType::Int64 => {
690 if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
691 (0..arr.len())
692 .filter(|&i| !arr.is_null(i))
693 .map(|i| arr.value(i) as f64)
694 .collect()
695 } else {
696 continue;
697 }
698 }
699 _ => continue,
700 };
701
702 match self.sketch_type {
704 SketchType::TDigest => {
705 let digest = self
706 .tdigests
707 .entry(col_name.clone())
708 .or_insert_with(|| TDigest::new(100.0));
709 digest.add_batch(&values);
710 }
711 SketchType::DDSketch => {
712 let sketch = self
713 .ddsketches
714 .entry(col_name.clone())
715 .or_insert_with(|| DDSketch::new(0.01));
716 sketch.add_batch(&values);
717 }
718 }
719 }
720 }
721
722 Ok(())
723 }
724
725 pub fn merge(sketches: &[Self]) -> Result<Self> {
727 if sketches.is_empty() {
728 return Err(Error::invalid_config("Cannot merge empty sketch list"));
729 }
730
731 let sketch_type = sketches[0].sketch_type;
732
733 for s in sketches {
735 if s.sketch_type != sketch_type {
736 return Err(Error::invalid_config(
737 "Cannot merge sketches of different types",
738 ));
739 }
740 }
741
742 let mut result = Self::new(sketch_type);
743 result.row_count = sketches.iter().map(|s| s.row_count).sum();
744
745 let columns: std::collections::HashSet<String> = match sketch_type {
747 SketchType::TDigest => sketches
748 .iter()
749 .flat_map(|s| s.tdigests.keys().cloned())
750 .collect(),
751 SketchType::DDSketch => sketches
752 .iter()
753 .flat_map(|s| s.ddsketches.keys().cloned())
754 .collect(),
755 };
756
757 for col in columns {
759 match sketch_type {
760 SketchType::TDigest => {
761 let digests: Vec<TDigest> = sketches
762 .iter()
763 .filter_map(|s| s.tdigests.get(&col).cloned())
764 .collect();
765 if !digests.is_empty() {
766 result.tdigests.insert(col, TDigest::merge(&digests));
767 }
768 }
769 SketchType::DDSketch => {
770 let dd_sketches: Vec<DDSketch> = sketches
771 .iter()
772 .filter_map(|s| s.ddsketches.get(&col).cloned())
773 .collect();
774 if !dd_sketches.is_empty() {
775 result.ddsketches.insert(col, DDSketch::merge(&dd_sketches));
776 }
777 }
778 }
779 }
780
781 Ok(result)
782 }
783
784 pub fn quantile(&self, column: &str, q: f64) -> Option<f64> {
786 match self.sketch_type {
787 SketchType::TDigest => self.tdigests.get(column).map(|d| d.quantile(q)),
788 SketchType::DDSketch => self.ddsketches.get(column).map(|d| d.quantile(q)),
789 }
790 }
791
792 pub fn to_bytes(&self) -> Result<Vec<u8>> {
794 rmp_serde::to_vec(self)
795 .map_err(|e| Error::Format(format!("Failed to serialize DataSketch: {e}")))
796 }
797
798 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
800 rmp_serde::from_slice(bytes)
801 .map_err(|e| Error::Format(format!("Failed to deserialize DataSketch: {e}")))
802 }
803}
804
805#[derive(Debug, Clone)]
807pub struct SketchDriftResult {
808 pub column: String,
810 pub statistic: f64,
812 pub severity: DriftSeverity,
814 pub quantile_diffs: Vec<(f64, f64)>,
816}
817
818pub struct DistributedDriftDetector {
820 sketch_type: SketchType,
822 num_quantiles: usize,
824 threshold: f64,
826}
827
828impl Default for DistributedDriftDetector {
829 fn default() -> Self {
830 Self::new()
831 }
832}
833
834impl DistributedDriftDetector {
835 pub fn new() -> Self {
837 Self {
838 sketch_type: SketchType::TDigest,
839 num_quantiles: 20,
840 threshold: 0.1,
841 }
842 }
843
844 #[must_use]
846 pub fn with_sketch_type(mut self, sketch_type: SketchType) -> Self {
847 self.sketch_type = sketch_type;
848 self
849 }
850
851 #[must_use]
853 pub fn with_num_quantiles(mut self, n: usize) -> Self {
854 self.num_quantiles = n.max(5);
855 self
856 }
857
858 #[must_use]
860 pub fn with_threshold(mut self, threshold: f64) -> Self {
861 self.threshold = threshold;
862 self
863 }
864
865 pub fn create_sketch(&self, dataset: &ArrowDataset) -> Result<DataSketch> {
867 DataSketch::from_dataset(dataset, self.sketch_type)
868 }
869
870 pub fn compare(
872 &self,
873 reference: &DataSketch,
874 current: &DataSketch,
875 ) -> Result<Vec<SketchDriftResult>> {
876 if reference.sketch_type != current.sketch_type {
877 return Err(Error::invalid_config("Sketch types must match"));
878 }
879
880 let mut results = Vec::new();
881
882 let columns: std::collections::HashSet<&String> = match self.sketch_type {
884 SketchType::TDigest => reference
885 .tdigests
886 .keys()
887 .chain(current.tdigests.keys())
888 .collect(),
889 SketchType::DDSketch => reference
890 .ddsketches
891 .keys()
892 .chain(current.ddsketches.keys())
893 .collect(),
894 };
895
896 for col in columns {
897 let result = self.compare_column(reference, current, col);
898 results.push(result);
899 }
900
901 Ok(results)
902 }
903
904 fn compare_column(
906 &self,
907 reference: &DataSketch,
908 current: &DataSketch,
909 column: &str,
910 ) -> SketchDriftResult {
911 let mut max_diff = 0.0_f64;
912 let mut quantile_diffs = Vec::new();
913
914 for i in 1..self.num_quantiles {
916 let q = i as f64 / self.num_quantiles as f64;
917
918 let ref_val = reference.quantile(column, q);
919 let cur_val = current.quantile(column, q);
920
921 if let (Some(r), Some(c)) = (ref_val, cur_val) {
922 let diff = if r.abs() > f64::EPSILON {
924 ((c - r) / r).abs()
925 } else if c.abs() > f64::EPSILON {
926 1.0
927 } else {
928 0.0
929 };
930
931 max_diff = max_diff.max(diff);
932 quantile_diffs.push((q, diff));
933 }
934 }
935
936 let severity = if max_diff < self.threshold {
938 DriftSeverity::None
939 } else if max_diff < self.threshold * 2.0 {
940 DriftSeverity::Low
941 } else if max_diff < self.threshold * 5.0 {
942 DriftSeverity::Medium
943 } else if max_diff < self.threshold * 10.0 {
944 DriftSeverity::High
945 } else {
946 DriftSeverity::Critical
947 };
948
949 SketchDriftResult {
950 column: column.to_string(),
951 statistic: max_diff,
952 severity,
953 quantile_diffs,
954 }
955 }
956}
957
958#[cfg(test)]
959mod tests {
960 use std::sync::Arc;
961
962 use arrow::{
963 array::Float64Array,
964 datatypes::{DataType, Field, Schema},
965 record_batch::RecordBatch,
966 };
967
968 use super::*;
969
970 #[test]
973 fn test_tdigest_new() {
974 let digest = TDigest::new(100.0);
975 assert!(digest.is_empty());
976 assert_eq!(digest.count(), 0.0);
977 }
978
979 #[test]
980 fn test_tdigest_add_single() {
981 let mut digest = TDigest::new(100.0);
982 digest.add(5.0);
983
984 assert!(!digest.is_empty());
985 assert_eq!(digest.count(), 1.0);
986 assert_eq!(digest.min(), 5.0);
987 assert_eq!(digest.max(), 5.0);
988 }
989
990 #[test]
991 fn test_tdigest_add_batch() {
992 let mut digest = TDigest::new(100.0);
993 let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
994 digest.add_batch(&values);
995
996 assert_eq!(digest.count(), 100.0);
997 assert_eq!(digest.min(), 0.0);
998 assert_eq!(digest.max(), 99.0);
999 }
1000
1001 #[test]
1002 fn test_tdigest_quantile_median() {
1003 let mut digest = TDigest::new(100.0);
1004 let values: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1005 digest.add_batch(&values);
1006
1007 let median = digest.quantile(0.5);
1008 assert!((median - 500.0).abs() < 50.0, "Median was {}", median);
1010 }
1011
1012 #[test]
1013 fn test_tdigest_quantile_extremes() {
1014 let mut digest = TDigest::new(100.0);
1015 let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1016 digest.add_batch(&values);
1017
1018 assert_eq!(digest.quantile(0.0), 0.0);
1019 assert_eq!(digest.quantile(1.0), 99.0);
1020 }
1021
1022 #[test]
1023 fn test_tdigest_quantile_quartiles() {
1024 let mut digest = TDigest::new(100.0);
1025 let values: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1026 digest.add_batch(&values);
1027
1028 let q1 = digest.quantile(0.25);
1029 let q3 = digest.quantile(0.75);
1030
1031 assert!((q1 - 250.0).abs() < 50.0, "Q1 was {}", q1);
1033 assert!((q3 - 750.0).abs() < 50.0, "Q3 was {}", q3);
1034 }
1035
1036 #[test]
1037 fn test_tdigest_merge() {
1038 let mut digest1 = TDigest::new(100.0);
1039 let mut digest2 = TDigest::new(100.0);
1040
1041 let values1: Vec<f64> = (0..500).map(|i| i as f64).collect();
1042 let values2: Vec<f64> = (500..1000).map(|i| i as f64).collect();
1043
1044 digest1.add_batch(&values1);
1045 digest2.add_batch(&values2);
1046
1047 let merged = TDigest::merge(&[digest1, digest2]);
1048
1049 assert_eq!(merged.count(), 1000.0);
1050 assert_eq!(merged.min(), 0.0);
1051 assert_eq!(merged.max(), 999.0);
1052
1053 let median = merged.quantile(0.5);
1054 assert!(
1055 (median - 500.0).abs() < 50.0,
1056 "Merged median was {}",
1057 median
1058 );
1059 }
1060
1061 #[test]
1062 fn test_tdigest_serialization() {
1063 let mut digest = TDigest::new(100.0);
1064 digest.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1065
1066 let bytes = digest.to_bytes().expect("serialize");
1067 let restored = TDigest::from_bytes(&bytes).expect("deserialize");
1068
1069 assert_eq!(restored.count(), digest.count());
1070 assert_eq!(restored.min(), digest.min());
1071 assert_eq!(restored.max(), digest.max());
1072 }
1073
1074 #[test]
1075 fn test_tdigest_cdf() {
1076 let mut digest = TDigest::new(100.0);
1077 let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1078 digest.add_batch(&values);
1079
1080 assert_eq!(digest.cdf(-1.0), 0.0);
1081 assert_eq!(digest.cdf(100.0), 1.0);
1082
1083 let cdf_50 = digest.cdf(50.0);
1084 assert!(cdf_50 > 0.4 && cdf_50 < 0.6, "CDF at 50 was {}", cdf_50);
1085 }
1086
1087 #[test]
1088 fn test_tdigest_empty_quantile() {
1089 let digest = TDigest::new(100.0);
1090 assert!(digest.quantile(0.5).is_nan());
1091 }
1092
1093 #[test]
1096 fn test_ddsketch_new() {
1097 let sketch = DDSketch::new(0.01);
1098 assert!(sketch.is_empty());
1099 assert_eq!(sketch.count(), 0);
1100 }
1101
1102 #[test]
1103 fn test_ddsketch_add_single() {
1104 let mut sketch = DDSketch::new(0.01);
1105 sketch.add(5.0);
1106
1107 assert!(!sketch.is_empty());
1108 assert_eq!(sketch.count(), 1);
1109 assert_eq!(sketch.min(), 5.0);
1110 assert_eq!(sketch.max(), 5.0);
1111 }
1112
1113 #[test]
1114 fn test_ddsketch_add_batch() {
1115 let mut sketch = DDSketch::new(0.01);
1116 let values: Vec<f64> = (1..=100).map(|i| i as f64).collect();
1117 sketch.add_batch(&values);
1118
1119 assert_eq!(sketch.count(), 100);
1120 assert_eq!(sketch.min(), 1.0);
1121 assert_eq!(sketch.max(), 100.0);
1122 }
1123
1124 #[test]
1125 fn test_ddsketch_quantile_median() {
1126 let mut sketch = DDSketch::new(0.01);
1127 let values: Vec<f64> = (1..=1000).map(|i| i as f64).collect();
1128 sketch.add_batch(&values);
1129
1130 let median = sketch.quantile(0.5);
1131 assert!((median - 500.0).abs() < 100.0, "Median was {}", median);
1133 }
1134
1135 #[test]
1136 fn test_ddsketch_quantile_extremes() {
1137 let mut sketch = DDSketch::new(0.01);
1138 let values: Vec<f64> = (1..=100).map(|i| i as f64).collect();
1139 sketch.add_batch(&values);
1140
1141 assert_eq!(sketch.quantile(0.0), 1.0);
1142 assert_eq!(sketch.quantile(1.0), 100.0);
1143 }
1144
1145 #[test]
1146 fn test_ddsketch_negative_values() {
1147 let mut sketch = DDSketch::new(0.01);
1148 let values: Vec<f64> = (-50..=50).map(|i| i as f64).collect();
1149 sketch.add_batch(&values);
1150
1151 assert_eq!(sketch.min(), -50.0);
1152 assert_eq!(sketch.max(), 50.0);
1153
1154 let median = sketch.quantile(0.5);
1155 assert!((median).abs() < 20.0, "Median was {}", median);
1156 }
1157
1158 #[test]
1159 fn test_ddsketch_merge() {
1160 let mut sketch1 = DDSketch::new(0.01);
1161 let mut sketch2 = DDSketch::new(0.01);
1162
1163 let values1: Vec<f64> = (1..=500).map(|i| i as f64).collect();
1164 let values2: Vec<f64> = (501..=1000).map(|i| i as f64).collect();
1165
1166 sketch1.add_batch(&values1);
1167 sketch2.add_batch(&values2);
1168
1169 let merged = DDSketch::merge(&[sketch1, sketch2]);
1170
1171 assert_eq!(merged.count(), 1000);
1172 assert_eq!(merged.min(), 1.0);
1173 assert_eq!(merged.max(), 1000.0);
1174 }
1175
1176 #[test]
1177 fn test_ddsketch_serialization() {
1178 let mut sketch = DDSketch::new(0.01);
1179 sketch.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1180
1181 let bytes = sketch.to_bytes().expect("serialize");
1182 let restored = DDSketch::from_bytes(&bytes).expect("deserialize");
1183
1184 assert_eq!(restored.count(), sketch.count());
1185 assert_eq!(restored.min(), sketch.min());
1186 assert_eq!(restored.max(), sketch.max());
1187 }
1188
1189 #[test]
1190 fn test_ddsketch_empty_quantile() {
1191 let sketch = DDSketch::new(0.01);
1192 assert!(sketch.quantile(0.5).is_nan());
1193 }
1194
1195 #[test]
1198 fn test_sketch_type_name() {
1199 assert_eq!(SketchType::TDigest.name(), "T-Digest");
1200 assert_eq!(SketchType::DDSketch.name(), "DDSketch");
1201 }
1202
1203 fn make_float_dataset(values: Vec<f64>) -> ArrowDataset {
1206 let schema = Arc::new(Schema::new(vec![Field::new(
1207 "value",
1208 DataType::Float64,
1209 false,
1210 )]));
1211
1212 let batch = RecordBatch::try_new(
1213 Arc::clone(&schema),
1214 vec![Arc::new(Float64Array::from(values))],
1215 )
1216 .expect("batch");
1217
1218 ArrowDataset::from_batch(batch).expect("dataset")
1219 }
1220
1221 #[test]
1222 fn test_data_sketch_from_dataset_tdigest() {
1223 let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1224 let dataset = make_float_dataset(values);
1225
1226 let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1227
1228 assert_eq!(sketch.row_count, 100);
1229 assert!(sketch.tdigests.contains_key("value"));
1230
1231 let median = sketch.quantile("value", 0.5);
1232 assert!(median.is_some());
1233 }
1234
1235 #[test]
1236 fn test_data_sketch_from_dataset_ddsketch() {
1237 let values: Vec<f64> = (1..=100).map(|i| i as f64).collect();
1238 let dataset = make_float_dataset(values);
1239
1240 let sketch = DataSketch::from_dataset(&dataset, SketchType::DDSketch).expect("sketch");
1241
1242 assert_eq!(sketch.row_count, 100);
1243 assert!(sketch.ddsketches.contains_key("value"));
1244 }
1245
1246 #[test]
1247 fn test_data_sketch_merge() {
1248 let values1: Vec<f64> = (0..50).map(|i| i as f64).collect();
1249 let values2: Vec<f64> = (50..100).map(|i| i as f64).collect();
1250
1251 let dataset1 = make_float_dataset(values1);
1252 let dataset2 = make_float_dataset(values2);
1253
1254 let sketch1 = DataSketch::from_dataset(&dataset1, SketchType::TDigest).expect("sketch1");
1255 let sketch2 = DataSketch::from_dataset(&dataset2, SketchType::TDigest).expect("sketch2");
1256
1257 let merged = DataSketch::merge(&[sketch1, sketch2]).expect("merge");
1258
1259 assert_eq!(merged.row_count, 100);
1260 }
1261
1262 #[test]
1263 fn test_data_sketch_serialization() {
1264 let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1265 let dataset = make_float_dataset(values);
1266
1267 let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1268 let bytes = sketch.to_bytes().expect("serialize");
1269 let restored = DataSketch::from_bytes(&bytes).expect("deserialize");
1270
1271 assert_eq!(restored.row_count, sketch.row_count);
1272 assert_eq!(restored.sketch_type, sketch.sketch_type);
1273 }
1274
1275 #[test]
1278 fn test_distributed_detector_new() {
1279 let detector = DistributedDriftDetector::new();
1280 assert_eq!(detector.sketch_type, SketchType::TDigest);
1281 }
1282
1283 #[test]
1284 fn test_distributed_detector_builder() {
1285 let detector = DistributedDriftDetector::new()
1286 .with_sketch_type(SketchType::DDSketch)
1287 .with_num_quantiles(50)
1288 .with_threshold(0.2);
1289
1290 assert_eq!(detector.sketch_type, SketchType::DDSketch);
1291 assert_eq!(detector.num_quantiles, 50);
1292 assert!((detector.threshold - 0.2).abs() < f64::EPSILON);
1293 }
1294
1295 #[test]
1296 fn test_distributed_detector_no_drift() {
1297 let values: Vec<f64> = (0..500).map(|i| i as f64).collect();
1298 let dataset1 = make_float_dataset(values.clone());
1299 let dataset2 = make_float_dataset(values);
1300
1301 let detector = DistributedDriftDetector::new();
1302 let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1303 let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1304
1305 let results = detector.compare(&sketch1, &sketch2).expect("compare");
1306
1307 assert_eq!(results.len(), 1);
1308 assert_eq!(results[0].severity, DriftSeverity::None);
1309 }
1310
1311 #[test]
1312 fn test_distributed_detector_with_drift() {
1313 let values1: Vec<f64> = (0..500).map(|i| i as f64).collect();
1314 let values2: Vec<f64> = (500..1000).map(|i| i as f64).collect();
1315
1316 let dataset1 = make_float_dataset(values1);
1317 let dataset2 = make_float_dataset(values2);
1318
1319 let detector = DistributedDriftDetector::new().with_threshold(0.1);
1320 let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1321 let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1322
1323 let results = detector.compare(&sketch1, &sketch2).expect("compare");
1324
1325 assert_eq!(results.len(), 1);
1326 assert!(results[0].severity.is_drift(), "Should detect drift");
1327 assert!(results[0].statistic > 0.0);
1328 }
1329
1330 #[test]
1331 fn test_distributed_detector_ddsketch() {
1332 let values1: Vec<f64> = (1..=500).map(|i| i as f64).collect();
1333 let values2: Vec<f64> = (1..=500).map(|i| i as f64).collect();
1334
1335 let dataset1 = make_float_dataset(values1);
1336 let dataset2 = make_float_dataset(values2);
1337
1338 let detector = DistributedDriftDetector::new().with_sketch_type(SketchType::DDSketch);
1339 let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1340 let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1341
1342 let results = detector.compare(&sketch1, &sketch2).expect("compare");
1343
1344 assert!(!results.is_empty());
1345 assert_eq!(results[0].severity, DriftSeverity::None);
1346 }
1347
1348 #[test]
1349 fn test_distributed_detector_quantile_diffs() {
1350 let values1: Vec<f64> = (0..500).map(|i| i as f64).collect();
1351 let values2: Vec<f64> = (100..600).map(|i| i as f64).collect();
1352
1353 let dataset1 = make_float_dataset(values1);
1354 let dataset2 = make_float_dataset(values2);
1355
1356 let detector = DistributedDriftDetector::new().with_num_quantiles(10);
1357 let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1358 let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1359
1360 let results = detector.compare(&sketch1, &sketch2).expect("compare");
1361
1362 assert!(!results[0].quantile_diffs.is_empty());
1363 }
1364
1365 #[test]
1368 fn test_centroid_new_and_merge() {
1369 let mut c1 = Centroid::new(10.0, 2.0);
1370 let c2 = Centroid::new(20.0, 3.0);
1371 c1.merge(&c2);
1372 assert!((c1.mean - 16.0).abs() < f64::EPSILON);
1374 assert_eq!(c1.weight, 5.0);
1375 }
1376
1377 #[test]
1378 fn test_centroid_merge_zero_weights() {
1379 let mut c1 = Centroid::new(10.0, 0.0);
1380 let c2 = Centroid::new(20.0, 0.0);
1381 c1.merge(&c2);
1382 assert_eq!(c1.mean, 10.0);
1384 assert_eq!(c1.weight, 0.0);
1385 }
1386
1387 #[test]
1388 fn test_tdigest_add_weighted_non_finite() {
1389 let mut digest = TDigest::new(100.0);
1390 digest.add_weighted(f64::NAN, 1.0);
1391 digest.add_weighted(f64::INFINITY, 1.0);
1392 digest.add_weighted(f64::NEG_INFINITY, 1.0);
1393 assert!(digest.is_empty());
1394 }
1395
1396 #[test]
1397 fn test_tdigest_add_weighted_zero_weight() {
1398 let mut digest = TDigest::new(100.0);
1399 digest.add_weighted(5.0, 0.0);
1400 digest.add_weighted(10.0, -1.0);
1401 assert!(digest.is_empty());
1402 }
1403
1404 #[test]
1405 fn test_tdigest_num_centroids() {
1406 let mut digest = TDigest::new(100.0);
1407 assert_eq!(digest.num_centroids(), 0);
1408
1409 digest.add(5.0);
1410 assert!(digest.num_centroids() > 0);
1411 }
1412
1413 #[test]
1414 fn test_tdigest_quantile_clamp() {
1415 let mut digest = TDigest::new(100.0);
1416 digest.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1417
1418 let q_neg = digest.quantile(-0.5);
1420 let q_over = digest.quantile(1.5);
1421 assert_eq!(q_neg, digest.min());
1422 assert_eq!(q_over, digest.max());
1423 }
1424
1425 #[test]
1426 fn test_tdigest_merge_empty() {
1427 let merged = TDigest::merge(&[]);
1428 assert!(merged.is_empty());
1429 assert_eq!(merged.compression, 100.0);
1430 }
1431
1432 #[test]
1433 fn test_tdigest_cdf_empty() {
1434 let digest = TDigest::new(100.0);
1435 assert_eq!(digest.cdf(5.0), 0.0);
1436 }
1437
1438 #[test]
1439 fn test_tdigest_clone() {
1440 let mut digest = TDigest::new(100.0);
1441 digest.add_batch(&[1.0, 2.0, 3.0]);
1442
1443 let cloned = digest.clone();
1444 assert_eq!(cloned.count(), digest.count());
1445 assert_eq!(cloned.min(), digest.min());
1446 assert_eq!(cloned.max(), digest.max());
1447 }
1448
1449 #[test]
1450 fn test_tdigest_debug() {
1451 let digest = TDigest::new(100.0);
1452 let debug = format!("{:?}", digest);
1453 assert!(debug.contains("TDigest"));
1454 }
1455
1456 #[test]
1457 fn test_ddsketch_add_non_finite() {
1458 let mut sketch = DDSketch::new(0.01);
1459 sketch.add(f64::NAN);
1460 sketch.add(f64::INFINITY);
1461 sketch.add(f64::NEG_INFINITY);
1462 assert!(sketch.is_empty());
1463 }
1464
1465 #[test]
1466 fn test_ddsketch_add_zero() {
1467 let mut sketch = DDSketch::new(0.01);
1468 sketch.add(0.0);
1469 assert_eq!(sketch.count(), 1);
1470 assert_eq!(sketch.quantile(0.5), 0.0);
1471 }
1472
1473 #[test]
1474 fn test_ddsketch_quantile_clamp() {
1475 let mut sketch = DDSketch::new(0.01);
1476 sketch.add_batch(&[1.0, 2.0, 3.0, 4.0, 5.0]);
1477
1478 let q_neg = sketch.quantile(-0.5);
1479 let q_over = sketch.quantile(1.5);
1480 assert_eq!(q_neg, sketch.min());
1481 assert_eq!(q_over, sketch.max());
1482 }
1483
1484 #[test]
1485 fn test_ddsketch_merge_empty() {
1486 let merged = DDSketch::merge(&[]);
1487 assert!(merged.is_empty());
1488 }
1489
1490 #[test]
1491 fn test_ddsketch_alpha_clamp() {
1492 let sketch1 = DDSketch::new(0.00001);
1494 assert!(sketch1.alpha >= 0.0001);
1495
1496 let sketch2 = DDSketch::new(0.9);
1498 assert!(sketch2.alpha <= 0.5);
1499 }
1500
1501 #[test]
1502 fn test_ddsketch_clone() {
1503 let mut sketch = DDSketch::new(0.01);
1504 sketch.add_batch(&[1.0, 2.0, 3.0]);
1505
1506 let cloned = sketch.clone();
1507 assert_eq!(cloned.count(), sketch.count());
1508 assert_eq!(cloned.min(), sketch.min());
1509 }
1510
1511 #[test]
1512 fn test_ddsketch_debug() {
1513 let sketch = DDSketch::new(0.01);
1514 let debug = format!("{:?}", sketch);
1515 assert!(debug.contains("DDSketch"));
1516 }
1517
1518 #[test]
1519 fn test_sketch_type_equality() {
1520 assert_eq!(SketchType::TDigest, SketchType::TDigest);
1521 assert_ne!(SketchType::TDigest, SketchType::DDSketch);
1522 }
1523
1524 #[test]
1525 fn test_sketch_type_clone() {
1526 let st = SketchType::DDSketch;
1527 let cloned = st;
1528 assert_eq!(st, cloned);
1529 }
1530
1531 #[test]
1532 fn test_sketch_type_debug() {
1533 let st = SketchType::TDigest;
1534 let debug = format!("{:?}", st);
1535 assert!(debug.contains("TDigest"));
1536 }
1537
1538 #[test]
1539 fn test_data_sketch_new() {
1540 let sketch = DataSketch::new(SketchType::TDigest);
1541 assert_eq!(sketch.sketch_type, SketchType::TDigest);
1542 assert_eq!(sketch.row_count, 0);
1543 assert!(sketch.source.is_none());
1544 }
1545
1546 #[test]
1547 fn test_data_sketch_with_source() {
1548 let sketch = DataSketch::new(SketchType::TDigest).with_source("node1");
1549 assert_eq!(sketch.source, Some("node1".to_string()));
1550 }
1551
1552 #[test]
1553 fn test_data_sketch_merge_empty_error() {
1554 let result = DataSketch::merge(&[]);
1555 assert!(result.is_err());
1556 }
1557
1558 #[test]
1559 fn test_data_sketch_merge_different_types_error() {
1560 let sketch1 = DataSketch::new(SketchType::TDigest);
1561 let sketch2 = DataSketch::new(SketchType::DDSketch);
1562
1563 let result = DataSketch::merge(&[sketch1, sketch2]);
1564 assert!(result.is_err());
1565 }
1566
1567 #[test]
1568 fn test_data_sketch_quantile_not_found() {
1569 let sketch = DataSketch::new(SketchType::TDigest);
1570 assert!(sketch.quantile("nonexistent", 0.5).is_none());
1571 }
1572
1573 #[test]
1574 fn test_data_sketch_clone() {
1575 let values: Vec<f64> = (0..50).map(|i| i as f64).collect();
1576 let dataset = make_float_dataset(values);
1577 let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1578
1579 let cloned = sketch.clone();
1580 assert_eq!(cloned.row_count, sketch.row_count);
1581 assert_eq!(cloned.sketch_type, sketch.sketch_type);
1582 }
1583
1584 #[test]
1585 fn test_data_sketch_debug() {
1586 let sketch = DataSketch::new(SketchType::DDSketch);
1587 let debug = format!("{:?}", sketch);
1588 assert!(debug.contains("DataSketch"));
1589 }
1590
1591 #[test]
1592 fn test_sketch_drift_result_clone() {
1593 let result = SketchDriftResult {
1594 column: "test".to_string(),
1595 statistic: 0.5,
1596 severity: DriftSeverity::Medium,
1597 quantile_diffs: vec![(0.5, 0.1)],
1598 };
1599
1600 let cloned = result.clone();
1601 assert_eq!(cloned.column, result.column);
1602 assert_eq!(cloned.statistic, result.statistic);
1603 }
1604
1605 #[test]
1606 fn test_sketch_drift_result_debug() {
1607 let result = SketchDriftResult {
1608 column: "test".to_string(),
1609 statistic: 0.5,
1610 severity: DriftSeverity::None,
1611 quantile_diffs: vec![],
1612 };
1613
1614 let debug = format!("{:?}", result);
1615 assert!(debug.contains("SketchDriftResult"));
1616 }
1617
1618 #[test]
1619 fn test_distributed_detector_default() {
1620 let detector = DistributedDriftDetector::default();
1621 assert_eq!(detector.sketch_type, SketchType::TDigest);
1622 }
1623
1624 #[test]
1625 fn test_distributed_detector_compare_type_mismatch() {
1626 let sketch1 = DataSketch::new(SketchType::TDigest);
1627 let sketch2 = DataSketch::new(SketchType::DDSketch);
1628
1629 let detector = DistributedDriftDetector::new();
1630 let result = detector.compare(&sketch1, &sketch2);
1631 assert!(result.is_err());
1632 }
1633
1634 #[test]
1635 fn test_distributed_detector_num_quantiles_min() {
1636 let detector = DistributedDriftDetector::new().with_num_quantiles(1);
1637 assert!(detector.num_quantiles >= 5);
1638 }
1639
1640 #[test]
1641 fn test_tdigest_compression_triggers() {
1642 let mut digest = TDigest::new(10.0); for i in 0..1000 {
1645 digest.add(i as f64);
1646 }
1647 assert!(digest.num_centroids() < 1000);
1649 }
1650
1651 #[test]
1652 fn test_tdigest_serialization_invalid() {
1653 let result = TDigest::from_bytes(&[0, 1, 2, 3]);
1654 assert!(result.is_err());
1655 }
1656
1657 #[test]
1658 fn test_ddsketch_serialization_invalid() {
1659 let result = DDSketch::from_bytes(&[0, 1, 2, 3]);
1660 assert!(result.is_err());
1661 }
1662
1663 #[test]
1664 fn test_data_sketch_serialization_invalid() {
1665 let result = DataSketch::from_bytes(&[0, 1, 2, 3]);
1666 assert!(result.is_err());
1667 }
1668
1669 #[test]
1670 fn test_centroid_clone() {
1671 let c = Centroid::new(5.0, 2.0);
1672 let cloned = c.clone();
1673 assert_eq!(cloned.mean, c.mean);
1674 assert_eq!(cloned.weight, c.weight);
1675 }
1676
1677 #[test]
1678 fn test_centroid_debug() {
1679 let c = Centroid::new(5.0, 2.0);
1680 let debug = format!("{:?}", c);
1681 assert!(debug.contains("Centroid"));
1682 }
1683
1684 #[test]
1685 fn test_data_sketch_merge_ddsketch() {
1686 let values1: Vec<f64> = (1..=50).map(|i| i as f64).collect();
1687 let values2: Vec<f64> = (51..=100).map(|i| i as f64).collect();
1688
1689 let dataset1 = make_float_dataset(values1);
1690 let dataset2 = make_float_dataset(values2);
1691
1692 let sketch1 = DataSketch::from_dataset(&dataset1, SketchType::DDSketch).expect("sketch1");
1693 let sketch2 = DataSketch::from_dataset(&dataset2, SketchType::DDSketch).expect("sketch2");
1694
1695 let merged = DataSketch::merge(&[sketch1, sketch2]).expect("merge");
1696
1697 assert_eq!(merged.row_count, 100);
1698 assert_eq!(merged.sketch_type, SketchType::DDSketch);
1699 }
1700
1701 #[test]
1702 fn test_distributed_detector_severity_levels() {
1703 let values1: Vec<f64> = (0..100).map(|i| i as f64).collect();
1706 let values2: Vec<f64> = (0..100).map(|i| (i * 50) as f64).collect(); let dataset1 = make_float_dataset(values1);
1709 let dataset2 = make_float_dataset(values2);
1710
1711 let detector = DistributedDriftDetector::new().with_threshold(0.01);
1712 let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1713 let sketch2 = detector.create_sketch(&dataset2).expect("sketch2");
1714
1715 let results = detector.compare(&sketch1, &sketch2).expect("compare");
1716 assert!(!results.is_empty());
1717 assert!(results[0].statistic > 0.0);
1719 }
1720
1721 #[test]
1722 fn test_data_sketch_add_dataset_int_types() {
1723 use arrow::array::{Int32Array, Int64Array};
1725
1726 let schema = Arc::new(Schema::new(vec![
1727 Field::new("int32_col", DataType::Int32, false),
1728 Field::new("int64_col", DataType::Int64, false),
1729 ]));
1730
1731 let batch = RecordBatch::try_new(
1732 Arc::clone(&schema),
1733 vec![
1734 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
1735 Arc::new(Int64Array::from(vec![10i64, 20, 30, 40, 50])),
1736 ],
1737 )
1738 .expect("batch");
1739
1740 let dataset = ArrowDataset::from_batch(batch).expect("dataset");
1741 let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1742
1743 assert_eq!(sketch.row_count, 5);
1744 assert!(sketch.tdigests.contains_key("int32_col"));
1745 assert!(sketch.tdigests.contains_key("int64_col"));
1746 }
1747
1748 #[test]
1749 fn test_data_sketch_add_dataset_float32() {
1750 use arrow::array::Float32Array;
1751
1752 let schema = Arc::new(Schema::new(vec![Field::new(
1753 "float32_col",
1754 DataType::Float32,
1755 false,
1756 )]));
1757
1758 let batch = RecordBatch::try_new(
1759 Arc::clone(&schema),
1760 vec![Arc::new(Float32Array::from(vec![
1761 1.0f32, 2.0, 3.0, 4.0, 5.0,
1762 ]))],
1763 )
1764 .expect("batch");
1765
1766 let dataset = ArrowDataset::from_batch(batch).expect("dataset");
1767 let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1768
1769 assert_eq!(sketch.row_count, 5);
1770 assert!(sketch.tdigests.contains_key("float32_col"));
1771 }
1772
1773 #[test]
1774 fn test_data_sketch_non_numeric_columns_skipped() {
1775 use arrow::array::StringArray;
1776
1777 let schema = Arc::new(Schema::new(vec![
1778 Field::new("value", DataType::Float64, false),
1779 Field::new("name", DataType::Utf8, false),
1780 ]));
1781
1782 let batch = RecordBatch::try_new(
1783 Arc::clone(&schema),
1784 vec![
1785 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
1786 Arc::new(StringArray::from(vec!["a", "b", "c"])),
1787 ],
1788 )
1789 .expect("batch");
1790
1791 let dataset = ArrowDataset::from_batch(batch).expect("dataset");
1792 let sketch = DataSketch::from_dataset(&dataset, SketchType::TDigest).expect("sketch");
1793
1794 assert!(sketch.tdigests.contains_key("value"));
1796 assert!(!sketch.tdigests.contains_key("name"));
1797 }
1798
1799 #[test]
1800 fn test_distributed_detector_compare_missing_column() {
1801 let values1: Vec<f64> = (0..100).map(|i| i as f64).collect();
1803 let dataset1 = make_float_dataset(values1);
1804
1805 let detector = DistributedDriftDetector::new();
1806 let sketch1 = detector.create_sketch(&dataset1).expect("sketch1");
1807
1808 let sketch2 = DataSketch::new(SketchType::TDigest);
1810
1811 let results = detector.compare(&sketch1, &sketch2).expect("compare");
1812 assert!(!results.is_empty());
1814 }
1815}