1use std::{
13 any::Any,
14 cmp::Ordering,
15 mem,
16 ops::AddAssign,
17 sync::{Arc, RwLock},
18};
19
20use crate::{
21 metrics::{Descriptor, MetricsError, Number, NumberKind, Result},
22 sdk::export::metrics::{Aggregator, Count, Max, Min, MinMaxSumCount, Sum},
23};
24
25const INITIAL_NUM_BINS: usize = 128;
26const GROW_LEFT_BY: i64 = 128;
27
28const DEFAULT_MAX_NUM_BINS: i64 = 2048;
29const DEFAULT_ALPHA: f64 = 0.01;
30const DEFAULT_MIN_BOUNDARY: f64 = 1.0e-9;
31
32pub fn ddsketch(config: &DdSketchConfig, kind: NumberKind) -> DdSketchAggregator {
34 DdSketchAggregator::new(config, kind)
35}
36
37#[derive(Debug)]
50pub struct DdSketchAggregator {
51 inner: RwLock<Inner>,
52}
53
54impl DdSketchAggregator {
55 pub fn new(config: &DdSketchConfig, kind: NumberKind) -> DdSketchAggregator {
60 DdSketchAggregator {
61 inner: RwLock::new(Inner::new(config, kind)),
62 }
63 }
64}
65
66impl Default for DdSketchAggregator {
67 fn default() -> Self {
68 DdSketchAggregator::new(
69 &DdSketchConfig::new(DEFAULT_ALPHA, DEFAULT_MAX_NUM_BINS, DEFAULT_MIN_BOUNDARY),
70 NumberKind::F64,
71 )
72 }
73}
74
75impl Sum for DdSketchAggregator {
76 fn sum(&self) -> Result<Number> {
77 self.inner
78 .read()
79 .map_err(From::from)
80 .map(|inner| inner.sum.clone())
81 }
82}
83
84impl Min for DdSketchAggregator {
85 fn min(&self) -> Result<Number> {
86 self.inner
87 .read()
88 .map_err(From::from)
89 .map(|inner| inner.min_value.clone())
90 }
91}
92
93impl Max for DdSketchAggregator {
94 fn max(&self) -> Result<Number> {
95 self.inner
96 .read()
97 .map_err(From::from)
98 .map(|inner| inner.max_value.clone())
99 }
100}
101
102impl Count for DdSketchAggregator {
103 fn count(&self) -> Result<u64> {
104 self.inner
105 .read()
106 .map_err(From::from)
107 .map(|inner| inner.count())
108 }
109}
110
111impl MinMaxSumCount for DdSketchAggregator {}
112
113impl Aggregator for DdSketchAggregator {
114 fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()> {
115 self.inner
116 .write()
117 .map_err(From::from)
118 .map(|mut inner| inner.add(number, descriptor.number_kind()))
119 }
120
121 fn synchronized_move(
122 &self,
123 destination: &Arc<(dyn Aggregator + Send + Sync)>,
124 descriptor: &Descriptor,
125 ) -> Result<()> {
126 if let Some(other) = destination.as_any().downcast_ref::<Self>() {
127 other
128 .inner
129 .write()
130 .map_err(From::from)
131 .and_then(|mut other| {
132 self.inner.write().map_err(From::from).map(|mut inner| {
133 let kind = descriptor.number_kind();
134 other.max_value = mem::replace(&mut inner.max_value, kind.zero());
135 other.min_value = mem::replace(&mut inner.min_value, kind.zero());
136 other.key_epsilon = mem::take(&mut inner.key_epsilon);
137 other.offset = mem::take(&mut inner.offset);
138 other.gamma = mem::take(&mut inner.gamma);
139 other.gamma_ln = mem::take(&mut inner.gamma_ln);
140 other.positive_store = mem::take(&mut inner.positive_store);
141 other.negative_store = mem::take(&mut inner.negative_store);
142 other.sum = mem::replace(&mut inner.sum, kind.zero());
143 })
144 })
145 } else {
146 Err(MetricsError::InconsistentAggregator(format!(
147 "Expected {:?}, got: {:?}",
148 self, destination
149 )))
150 }
151 }
152
153 fn merge(
154 &self,
155 other: &(dyn Aggregator + Send + Sync),
156 _descriptor: &Descriptor,
157 ) -> Result<()> {
158 if let Some(other) = other.as_any().downcast_ref::<DdSketchAggregator>() {
159 self.inner.write()
160 .map_err(From::from)
161 .and_then(|mut inner| {
162 other.inner.read()
163 .map_err(From::from)
164 .and_then(|other| {
165 if inner.positive_store.max_num_bins != other.positive_store.max_num_bins {
167 return Err(MetricsError::InconsistentAggregator(format!(
168 "When merging two DDSKetchAggregators, their max number of bins must be the same. Expect max number of bins to be {:?}, but get {:?}", inner.positive_store.max_num_bins, other.positive_store.max_num_bins
169 )));
170 }
171 if inner.negative_store.max_num_bins != other.negative_store.max_num_bins {
172 return Err(MetricsError::InconsistentAggregator(format!(
173 "When merging two DDSKetchAggregators, their max number of bins must be the same. Expect max number of bins to be {:?}, but get {:?}", inner.negative_store.max_num_bins, other.negative_store.max_num_bins
174 )));
175 }
176
177
178 if (inner.gamma - other.gamma).abs() > std::f64::EPSILON {
179 return Err(MetricsError::InconsistentAggregator(format!(
180 "When merging two DDSKetchAggregators, their gamma must be the same. Expect max number of bins to be {:?}, but get {:?}", inner.gamma, other.gamma
181 )));
182 }
183
184 if other.count() == 0 {
185 return Ok(());
186 }
187
188 if inner.count() == 0 {
189 inner.positive_store.merge(&other.positive_store);
190 inner.negative_store.merge(&other.negative_store);
191 inner.sum = other.sum.clone();
192 inner.min_value = other.min_value.clone();
193 inner.max_value = other.max_value.clone();
194 return Ok(());
195 }
196
197 inner.positive_store.merge(&other.positive_store);
198 inner.negative_store.merge(&other.negative_store);
199
200 inner.sum = match inner.kind {
201 NumberKind::F64 =>
202 Number::from(inner.sum.to_f64(&inner.kind) + other.sum.to_f64(&other.kind)),
203 NumberKind::U64 => Number::from(inner.sum.to_u64(&inner.kind) + other.sum.to_u64(&other.kind)),
204 NumberKind::I64 => Number::from(inner.sum.to_i64(&inner.kind) + other.sum.to_i64(&other.kind))
205 };
206
207 if inner.min_value.partial_cmp(&inner.kind, &other.min_value) == Some(Ordering::Greater) {
208 inner.min_value = other.min_value.clone();
209 };
210
211 if inner.max_value.partial_cmp(&inner.kind, &other.max_value) == Some(Ordering::Less) {
212 inner.max_value = other.max_value.clone();
213 }
214
215 Ok(())
216 })
217 })
218 } else {
219 Err(MetricsError::InconsistentAggregator(format!(
220 "Expected {:?}, got: {:?}",
221 self, other
222 )))
223 }
224 }
225
226 fn as_any(&self) -> &dyn Any {
227 self
228 }
229}
230
231#[derive(Debug)]
233pub struct DdSketchConfig {
234 alpha: f64,
235 max_num_bins: i64,
236 key_epsilon: f64,
237}
238
239impl DdSketchConfig {
240 pub fn new(alpha: f64, max_num_bins: i64, key_epsilon: f64) -> Self {
242 DdSketchConfig {
243 alpha,
244 max_num_bins,
245 key_epsilon,
246 }
247 }
248}
249
250#[derive(Debug)]
261struct Inner {
262 positive_store: Store,
263 negative_store: Store,
264 kind: NumberKind,
265 sum: Number,
267 gamma: f64,
269 gamma_ln: f64,
271 key_epsilon: f64,
274 offset: i64,
277
278 min_value: Number,
280 max_value: Number,
282}
283
284impl Inner {
285 fn new(config: &DdSketchConfig, kind: NumberKind) -> Inner {
286 let gamma: f64 = 1.0 + 2.0 * config.alpha / (1.0 - config.alpha);
287 let mut inner = Inner {
288 positive_store: Store::new(config.max_num_bins / 2),
289 negative_store: Store::new(config.max_num_bins / 2),
290 min_value: kind.max(),
291 max_value: kind.min(),
292 sum: kind.zero(),
293 gamma,
294 gamma_ln: gamma.ln(),
295 key_epsilon: config.key_epsilon,
296 offset: 0,
297 kind,
298 };
299 inner.offset = -(inner.log_gamma(inner.key_epsilon)).ceil() as i64 + 1i64;
301 inner
302 }
303
304 fn add(&mut self, v: &Number, kind: &NumberKind) {
305 let key = self.key(v, kind);
306 match v.partial_cmp(kind, &Number::from(0.0)) {
307 Some(Ordering::Greater) | Some(Ordering::Equal) => {
308 self.positive_store.add(key);
309 }
310 Some(Ordering::Less) => {
311 self.negative_store.add(key);
312 }
313 _ => {
314 return;
316 }
317 }
318
319 if self.min_value.partial_cmp(&self.kind, v) == Some(Ordering::Greater) {
321 self.min_value = v.clone();
322 }
323
324 if self.max_value.partial_cmp(&self.kind, v) == Some(Ordering::Less) {
325 self.max_value = v.clone();
326 }
327
328 match &self.kind {
329 NumberKind::I64 => {
330 self.sum = Number::from(self.sum.to_i64(&self.kind) + v.to_i64(kind));
331 }
332 NumberKind::U64 => {
333 self.sum = Number::from(self.sum.to_u64(&self.kind) + v.to_u64(kind));
334 }
335 NumberKind::F64 => {
336 self.sum = Number::from(self.sum.to_f64(&self.kind) + v.to_f64(kind));
337 }
338 }
339 }
340
341 fn key(&self, num: &Number, kind: &NumberKind) -> i64 {
342 if num.to_f64(kind) < -self.key_epsilon {
343 let positive_num = match kind {
344 NumberKind::F64 => Number::from(-num.to_f64(kind)),
345 NumberKind::U64 => Number::from(num.to_u64(kind)),
346 NumberKind::I64 => Number::from(-num.to_i64(kind)),
347 };
348 (-self.log_gamma(positive_num.to_f64(kind)).ceil()) as i64 - self.offset
349 } else if num.to_f64(kind) > self.key_epsilon {
350 self.log_gamma(num.to_f64(kind)).ceil() as i64 + self.offset
351 } else {
352 0i64
353 }
354 }
355
356 fn log_gamma(&self, num: f64) -> f64 {
358 num.ln() / self.gamma_ln
359 }
360
361 fn count(&self) -> u64 {
362 self.negative_store.count + self.positive_store.count
363 }
364}
365
366#[derive(Debug)]
367struct Store {
368 bins: Vec<u64>,
369 count: u64,
370 min_key: i64,
371 max_key: i64,
372 max_num_bins: i64,
376}
377
378impl Default for Store {
379 fn default() -> Self {
380 Store {
381 bins: vec![0; INITIAL_NUM_BINS],
382 count: 0,
383 min_key: 0,
384 max_key: 0,
385 max_num_bins: DEFAULT_MAX_NUM_BINS,
386 }
387 }
388}
389
390impl Store {
392 fn new(max_num_bins: i64) -> Store {
393 Store {
394 bins: vec![
395 0;
396 if max_num_bins as usize > INITIAL_NUM_BINS {
397 INITIAL_NUM_BINS
398 } else {
399 max_num_bins as usize
400 }
401 ],
402 count: 0u64,
403 min_key: 0i64,
404 max_key: 0i64,
405 max_num_bins,
406 }
407 }
408
409 fn add(&mut self, key: i64) {
418 if self.count == 0 {
419 self.max_key = key;
420 self.min_key = key - self.bins.len() as i64 + 1
421 }
422
423 if key < self.min_key {
424 self.grow_left(key)
425 } else if key > self.max_key {
426 self.grow_right(key)
427 }
428 let idx = if key - self.min_key < 0 {
429 0
430 } else {
431 key - self.min_key
432 };
433 let bin_count = self.bins.get_mut(idx as usize).unwrap();
435 *bin_count += 1;
436 self.count += 1;
437 }
438
439 fn grow_left(&mut self, key: i64) {
440 if self.min_key < key || self.bins.len() >= self.max_num_bins as usize {
441 return;
442 }
443
444 let min_key = if self.max_key - key >= self.max_num_bins {
445 self.max_key - self.max_num_bins + 1
446 } else {
447 let mut min_key = self.min_key;
448 while min_key > key {
449 min_key -= GROW_LEFT_BY;
450 }
451 min_key
452 };
453
454 let expected_len = (self.max_key - min_key + 1) as usize;
459 let mut new_bins = vec![0u64; expected_len];
460 let old_bin_slice = &mut new_bins[(self.min_key - min_key) as usize..];
461 old_bin_slice.copy_from_slice(&self.bins);
462
463 self.bins = new_bins;
464 self.min_key = min_key;
465 }
466
467 fn grow_right(&mut self, key: i64) {
468 if self.max_key > key {
469 return;
470 }
471
472 if key - self.max_key >= self.max_num_bins {
473 self.bins = vec![0; self.max_num_bins as usize];
476 self.max_key = key;
477 self.min_key = key - self.max_num_bins + 1;
478 self.bins.get_mut(0).unwrap().add_assign(self.count);
479 } else if key - self.min_key >= self.max_num_bins {
480 let min_key = key - self.max_num_bins + 1;
481 let upper_bound = if min_key < self.max_key + 1 {
482 min_key
483 } else {
484 self.max_key + 1
485 } - self.min_key;
486 let n = self.bins.iter().take(upper_bound as usize).sum::<u64>();
487
488 if self.bins.len() < self.max_num_bins as usize {
489 let mut new_bins = vec![0; self.max_num_bins as usize];
490 new_bins[0..self.bins.len() - (min_key - self.min_key) as usize]
491 .as_mut()
492 .copy_from_slice(&self.bins[(min_key - self.min_key) as usize..]);
493 self.bins = new_bins;
494 } else {
495 self.bins.drain(0..(min_key - self.min_key) as usize);
497 if self.max_num_bins > self.max_key - min_key + 1 {
498 self.bins.resize(
499 self.bins.len()
500 + (self.max_num_bins - (self.max_key - min_key + 1)) as usize,
501 0,
502 )
503 }
504 }
505 self.max_key = key;
506 self.min_key = min_key;
507 self.bins.get_mut(0).unwrap().add_assign(n);
508 } else {
509 let mut new_bin = vec![0; (key - self.min_key + 1) as usize];
510 new_bin[0..self.bins.len()]
511 .as_mut()
512 .copy_from_slice(&self.bins);
513 self.bins = new_bin;
514 self.max_key = key;
515 }
516 }
517
518 fn merge(&mut self, other: &Store) {
520 if self.count == 0 {
521 return;
522 }
523 if other.count == 0 {
524 self.bins = other.bins.clone();
525 self.min_key = other.min_key;
526 self.max_key = other.max_key;
527 self.count = other.count;
528 }
529
530 if self.max_key > other.max_key {
531 if other.min_key < self.min_key {
532 self.grow_left(other.min_key);
533 }
534 let start = if other.min_key > self.min_key {
535 other.min_key
536 } else {
537 self.min_key
538 } as usize;
539 for i in start..other.max_key as usize {
540 self.bins[i - self.min_key as usize] = other.bins[i - other.min_key as usize];
541 }
542 let mut n = 0;
543 for i in other.min_key as usize..self.min_key as usize {
544 n += other.bins[i - other.min_key as usize]
545 }
546 self.bins[0] += n;
547 } else if other.min_key < self.min_key {
548 let mut tmp_bins = vec![0u64; other.bins.len()];
549 tmp_bins.as_mut_slice().copy_from_slice(&other.bins);
550
551 for i in self.min_key as usize..self.max_key as usize {
552 tmp_bins[i - other.min_key as usize] += self.bins[i - self.min_key as usize];
553 }
554
555 self.bins = tmp_bins;
556 self.max_key = other.max_key;
557 self.min_key = other.min_key;
558 } else {
559 self.grow_right(other.max_key);
560 for i in other.min_key as usize..(other.max_key + 1) as usize {
561 self.bins[i - self.min_key as usize] += other.bins[i - other.min_key as usize];
562 }
563 }
564
565 self.count += other.count;
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572 use crate::metrics::{Descriptor, InstrumentKind, Number, NumberKind};
573 use crate::sdk::export::metrics::{Aggregator, Count, Max, Min, Sum};
574 use rand_distr::{Distribution, Exp, LogNormal, Normal};
575 use std::cmp::Ordering;
576 use std::sync::Arc;
577
578 const TEST_MAX_BINS: i64 = 1024;
579 const TEST_ALPHA: f64 = 0.01;
580 const TEST_KEY_EPSILON: f64 = 1.0e-9;
581
582 struct Dataset {
585 data: Vec<Number>,
586 kind: NumberKind,
587 }
588
589 impl Dataset {
590 fn from_f64_vec(data: Vec<f64>) -> Dataset {
591 Dataset {
592 data: data.into_iter().map(Number::from).collect::<Vec<Number>>(),
593 kind: NumberKind::F64,
594 }
595 }
596
597 fn from_u64_vec(data: Vec<u64>) -> Dataset {
598 Dataset {
599 data: data.into_iter().map(Number::from).collect::<Vec<Number>>(),
600 kind: NumberKind::U64,
601 }
602 }
603
604 fn from_i64_vec(data: Vec<i64>) -> Dataset {
605 Dataset {
606 data: data.into_iter().map(Number::from).collect::<Vec<Number>>(),
607 kind: NumberKind::I64,
608 }
609 }
610
611 fn sum(&self) -> Number {
612 match self.kind {
613 NumberKind::F64 => {
614 Number::from(self.data.iter().map(|e| e.to_f64(&self.kind)).sum::<f64>())
615 }
616 NumberKind::U64 => {
617 Number::from(self.data.iter().map(|e| e.to_u64(&self.kind)).sum::<u64>())
618 }
619 NumberKind::I64 => {
620 Number::from(self.data.iter().map(|e| e.to_i64(&self.kind)).sum::<i64>())
621 }
622 }
623 }
624 }
625
626 fn generate_linear_dataset_f64(start: f64, step: f64, num: usize) -> Vec<f64> {
627 let mut vec = Vec::with_capacity(num);
628 for i in 0..num {
629 vec.push((start + i as f64 * step) as f64);
630 }
631 vec
632 }
633
634 fn generate_linear_dataset_u64(start: u64, step: u64, num: usize) -> Vec<u64> {
635 let mut vec = Vec::with_capacity(num);
636 for i in 0..num {
637 vec.push(start + i as u64 * step);
638 }
639 vec
640 }
641
642 fn generate_linear_dataset_i64(start: i64, step: i64, num: usize) -> Vec<i64> {
643 let mut vec = Vec::with_capacity(num);
644 for i in 0..num {
645 vec.push(start + i as i64 * step);
646 }
647 vec
648 }
649
650 fn generate_normal_dataset(mean: f64, stddev: f64, num: usize) -> Vec<f64> {
652 let normal = Normal::new(mean, stddev).unwrap();
653 let mut data = Vec::with_capacity(num);
654 for _ in 0..num {
655 data.push(normal.sample(&mut rand::thread_rng()));
656 }
657 data.as_mut_slice()
658 .sort_by(|a, b| a.partial_cmp(b).unwrap());
659 data
660 }
661
662 fn generate_log_normal_dataset(mean: f64, stddev: f64, num: usize) -> Vec<f64> {
664 let normal = LogNormal::new(mean, stddev).unwrap();
665 let mut data = Vec::with_capacity(num);
666 for _ in 0..num {
667 data.push(normal.sample(&mut rand::thread_rng()));
668 }
669 data.as_mut_slice()
670 .sort_by(|a, b| a.partial_cmp(b).unwrap());
671 data
672 }
673
674 fn generate_exponential_dataset(rate: f64, num: usize) -> Vec<f64> {
675 let exponential = Exp::new(rate).unwrap();
676 let mut data = Vec::with_capacity(num);
677 for _ in 0..num {
678 data.push(exponential.sample(&mut rand::thread_rng()));
679 }
680 data.as_mut_slice()
681 .sort_by(|a, b| a.partial_cmp(b).unwrap());
682 data
683 }
684
685 fn evaluate_sketch(dataset: Dataset) {
688 let kind = &dataset.kind;
689 let ddsketch = DdSketchAggregator::new(
690 &DdSketchConfig::new(TEST_ALPHA, TEST_MAX_BINS, TEST_KEY_EPSILON),
691 kind.clone(),
692 );
693 let descriptor = Descriptor::new(
694 "test".to_string(),
695 "test",
696 None,
697 InstrumentKind::ValueRecorder,
698 kind.clone(),
699 );
700
701 for i in &dataset.data {
702 let _ = ddsketch.update(i, &descriptor);
703 }
704
705 assert_eq!(
706 ddsketch
707 .min()
708 .unwrap()
709 .partial_cmp(kind, dataset.data.get(0).unwrap()),
710 Some(Ordering::Equal)
711 );
712 assert_eq!(
713 ddsketch
714 .max()
715 .unwrap()
716 .partial_cmp(kind, dataset.data.last().unwrap()),
717 Some(Ordering::Equal)
718 );
719 assert_eq!(
720 ddsketch.sum().unwrap().partial_cmp(kind, &dataset.sum()),
721 Some(Ordering::Equal)
722 );
723 assert_eq!(ddsketch.count().unwrap(), dataset.data.len() as u64);
724 }
725
726 #[test]
731 fn test_insert_into_store() {
732 let mut store = Store::new(200);
733 for i in -100..1300 {
734 store.add(i)
735 }
736 assert_eq!(store.count, 1400);
737 assert_eq!(store.bins.len(), 200);
738 }
739
740 #[test]
742 fn test_grow_right() {
743 let mut store = Store::new(150);
744 for i in &[-100, -50, 150, -20, 10] {
745 store.add(*i)
746 }
747 assert_eq!(store.count, 5);
748 }
749
750 #[test]
752 fn test_grow_left() {
753 let mut store = Store::new(150);
754 for i in &[500, 150, 10] {
755 store.add(*i)
756 }
757 assert_eq!(store.count, 3);
758 }
759
760 #[test]
772 fn test_merge_stores() {
773 let mut store1 = Store::new(300);
774 let mut store2 = Store::new(200);
775 for i in 500..1000 {
776 store1.add(i);
777 store2.add(i);
778 }
779 store1.merge(&store2);
780 assert_eq!(store1.bins.get(0), Some(&201));
781 assert_eq!(&store1.bins[1..100], vec![1u64; 99].as_slice());
782 assert_eq!(store1.bins[100], 302);
783 assert_eq!(&store1.bins[101..], vec![2u64; 199].as_slice());
784 assert_eq!(store1.count, 1000);
785 }
786
787 #[test]
790 fn test_linear_distribution() {
791 let mut dataset = Dataset::from_u64_vec(generate_linear_dataset_u64(12, 3, 5000));
793 evaluate_sketch(dataset);
794
795 dataset = Dataset::from_i64_vec(generate_linear_dataset_i64(-12, 3, 5000));
797 evaluate_sketch(dataset);
798
799 dataset = Dataset::from_f64_vec(generate_linear_dataset_f64(-12.0, 3.0, 5000));
801 evaluate_sketch(dataset);
802 }
803
804 #[test]
805 fn test_normal_distribution() {
806 let mut dataset = Dataset::from_f64_vec(generate_normal_dataset(150.0, 1.2, 100));
807 evaluate_sketch(dataset);
808
809 dataset = Dataset::from_f64_vec(generate_normal_dataset(-30.0, 4.4, 100));
810 evaluate_sketch(dataset);
811 }
812
813 #[test]
814 fn test_log_normal_distribution() {
815 let dataset = Dataset::from_f64_vec(generate_log_normal_dataset(120.0, 0.5, 100));
816 evaluate_sketch(dataset);
817 }
818
819 #[test]
820 fn test_exponential_distribution() {
821 let dataset = Dataset::from_f64_vec(generate_exponential_dataset(2.0, 500));
822 evaluate_sketch(dataset);
823 }
824
825 #[test]
827 fn test_synchronized_move() {
828 let dataset = Dataset::from_f64_vec(generate_normal_dataset(1.0, 3.5, 100));
829 let kind = &dataset.kind;
830 let ddsketch = DdSketchAggregator::new(
831 &DdSketchConfig::new(TEST_ALPHA, TEST_MAX_BINS, TEST_KEY_EPSILON),
832 kind.clone(),
833 );
834 let descriptor = Descriptor::new(
835 "test".to_string(),
836 "test",
837 None,
838 InstrumentKind::ValueRecorder,
839 kind.clone(),
840 );
841 for i in &dataset.data {
842 let _ = ddsketch.update(i, &descriptor);
843 }
844 let expected_sum = ddsketch.sum().unwrap().to_f64(&NumberKind::F64);
845 let expected_count = ddsketch.count().unwrap();
846 let expected_min = ddsketch.min().unwrap().to_f64(&NumberKind::F64);
847 let expected_max = ddsketch.max().unwrap().to_f64(&NumberKind::F64);
848
849 let moved_ddsketch: Arc<(dyn Aggregator + Send + Sync)> =
850 Arc::new(DdSketchAggregator::new(
851 &DdSketchConfig::new(TEST_ALPHA, TEST_MAX_BINS, TEST_KEY_EPSILON),
852 NumberKind::F64,
853 ));
854 let _ = ddsketch
855 .synchronized_move(&moved_ddsketch, &descriptor)
856 .expect("Fail to sync move");
857 let moved_ddsketch = moved_ddsketch
858 .as_any()
859 .downcast_ref::<DdSketchAggregator>()
860 .expect("Fail to cast dyn Aggregator down to DDSketchAggregator");
861
862 assert!(
864 (moved_ddsketch.max().unwrap().to_f64(&NumberKind::F64) - expected_max).abs()
865 < std::f64::EPSILON
866 );
867 assert!(
868 (moved_ddsketch.min().unwrap().to_f64(&NumberKind::F64) - expected_min).abs()
869 < std::f64::EPSILON
870 );
871 assert!(
872 (moved_ddsketch.sum().unwrap().to_f64(&NumberKind::F64) - expected_sum).abs()
873 < std::f64::EPSILON
874 );
875 assert_eq!(moved_ddsketch.count().unwrap(), expected_count);
876 }
877}