opentelemetry_spanprocessor_any/sdk/metrics/aggregators/
ddsketch.rs

1//! DDSketch quantile sketch with relative-error guarantees.
2//! DDSketch is a fast and fully-mergeable quantile sketch with relative-error guarantees.
3//!
4//! The main difference between this approach and previous art is DDSKetch employ a new method to
5//! compute the error. Traditionally, the error rate of one sketch is evaluated by rank accuracy,
6//! which can still generate a relative large variance if the dataset has long tail.
7//!
8//! DDSKetch, on the contrary, employs relative error rate that could work well on long tail dataset.
9//!
10//! The detail of this algorithm can be found in <https://arxiv.org/pdf/1908.10693>
11
12use std::{
13    any::Any,
14    cmp::Ordering,
15    mem,
16    ops::AddAssign,
17    sync::{Arc, RwLock},
18};
19
20use crate::{
21    metrics::{Descriptor, MetricsError, Number, NumberKind, Result},
22    sdk::export::metrics::{Aggregator, Count, Max, Min, MinMaxSumCount, Sum},
23};
24
25const INITIAL_NUM_BINS: usize = 128;
26const GROW_LEFT_BY: i64 = 128;
27
28const DEFAULT_MAX_NUM_BINS: i64 = 2048;
29const DEFAULT_ALPHA: f64 = 0.01;
30const DEFAULT_MIN_BOUNDARY: f64 = 1.0e-9;
31
32/// An aggregator to calculate quantile
33pub fn ddsketch(config: &DdSketchConfig, kind: NumberKind) -> DdSketchAggregator {
34    DdSketchAggregator::new(config, kind)
35}
36
37/// DDSKetch quantile sketch algorithm
38///
39/// It can give q-quantiles with α-accurate for any 0<=q<=1.
40///
41/// Here the accurate is calculated based on relative-error rate. Thus, the error guarantee adapts the scale of the output data. With relative error guarantee, the histogram can be more accurate in the area of low data density. For example, the long tail of response time data.
42///
43/// For example, if the actual percentile is 1 second, and relative-error guarantee
44/// is 2%, then the value should within the range of 0.98 to 1.02
45/// second. But if the actual percentile is 1 millisecond, with the same relative-error
46/// guarantee, the value returned should within the range of 0.98 to 1.02 millisecond.
47///
48/// In order to support both negative and positive inputs, DDSketchAggregator has two DDSketch store within itself to store the negative and positive inputs.
49#[derive(Debug)]
50pub struct DdSketchAggregator {
51    inner: RwLock<Inner>,
52}
53
54impl DdSketchAggregator {
55    /// Create a new DDSKetchAggregator that would yield a quantile with relative error rate less
56    /// than `alpha`
57    ///
58    /// The input should have a granularity larger than `key_epsilon`
59    pub fn new(config: &DdSketchConfig, kind: NumberKind) -> DdSketchAggregator {
60        DdSketchAggregator {
61            inner: RwLock::new(Inner::new(config, kind)),
62        }
63    }
64}
65
66impl Default for DdSketchAggregator {
67    fn default() -> Self {
68        DdSketchAggregator::new(
69            &DdSketchConfig::new(DEFAULT_ALPHA, DEFAULT_MAX_NUM_BINS, DEFAULT_MIN_BOUNDARY),
70            NumberKind::F64,
71        )
72    }
73}
74
75impl Sum for DdSketchAggregator {
76    fn sum(&self) -> Result<Number> {
77        self.inner
78            .read()
79            .map_err(From::from)
80            .map(|inner| inner.sum.clone())
81    }
82}
83
84impl Min for DdSketchAggregator {
85    fn min(&self) -> Result<Number> {
86        self.inner
87            .read()
88            .map_err(From::from)
89            .map(|inner| inner.min_value.clone())
90    }
91}
92
93impl Max for DdSketchAggregator {
94    fn max(&self) -> Result<Number> {
95        self.inner
96            .read()
97            .map_err(From::from)
98            .map(|inner| inner.max_value.clone())
99    }
100}
101
102impl Count for DdSketchAggregator {
103    fn count(&self) -> Result<u64> {
104        self.inner
105            .read()
106            .map_err(From::from)
107            .map(|inner| inner.count())
108    }
109}
110
111impl MinMaxSumCount for DdSketchAggregator {}
112
113impl Aggregator for DdSketchAggregator {
114    fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()> {
115        self.inner
116            .write()
117            .map_err(From::from)
118            .map(|mut inner| inner.add(number, descriptor.number_kind()))
119    }
120
121    fn synchronized_move(
122        &self,
123        destination: &Arc<(dyn Aggregator + Send + Sync)>,
124        descriptor: &Descriptor,
125    ) -> Result<()> {
126        if let Some(other) = destination.as_any().downcast_ref::<Self>() {
127            other
128                .inner
129                .write()
130                .map_err(From::from)
131                .and_then(|mut other| {
132                    self.inner.write().map_err(From::from).map(|mut inner| {
133                        let kind = descriptor.number_kind();
134                        other.max_value = mem::replace(&mut inner.max_value, kind.zero());
135                        other.min_value = mem::replace(&mut inner.min_value, kind.zero());
136                        other.key_epsilon = mem::take(&mut inner.key_epsilon);
137                        other.offset = mem::take(&mut inner.offset);
138                        other.gamma = mem::take(&mut inner.gamma);
139                        other.gamma_ln = mem::take(&mut inner.gamma_ln);
140                        other.positive_store = mem::take(&mut inner.positive_store);
141                        other.negative_store = mem::take(&mut inner.negative_store);
142                        other.sum = mem::replace(&mut inner.sum, kind.zero());
143                    })
144                })
145        } else {
146            Err(MetricsError::InconsistentAggregator(format!(
147                "Expected {:?}, got: {:?}",
148                self, destination
149            )))
150        }
151    }
152
153    fn merge(
154        &self,
155        other: &(dyn Aggregator + Send + Sync),
156        _descriptor: &Descriptor,
157    ) -> Result<()> {
158        if let Some(other) = other.as_any().downcast_ref::<DdSketchAggregator>() {
159            self.inner.write()
160                .map_err(From::from)
161                .and_then(|mut inner| {
162                    other.inner.read()
163                        .map_err(From::from)
164                        .and_then(|other| {
165                            // assert that it can merge
166                            if inner.positive_store.max_num_bins != other.positive_store.max_num_bins {
167                                return Err(MetricsError::InconsistentAggregator(format!(
168                                    "When merging two DDSKetchAggregators, their max number of bins must be the same. Expect max number of bins to be {:?}, but get {:?}", inner.positive_store.max_num_bins, other.positive_store.max_num_bins
169                                )));
170                            }
171                            if inner.negative_store.max_num_bins != other.negative_store.max_num_bins {
172                                return Err(MetricsError::InconsistentAggregator(format!(
173                                    "When merging two DDSKetchAggregators, their max number of bins must be the same. Expect max number of bins to be {:?}, but get {:?}", inner.negative_store.max_num_bins, other.negative_store.max_num_bins
174                                )));
175                            }
176
177
178                            if (inner.gamma - other.gamma).abs() > std::f64::EPSILON {
179                                return Err(MetricsError::InconsistentAggregator(format!(
180                                    "When merging two DDSKetchAggregators, their gamma must be the same. Expect max number of bins to be {:?}, but get {:?}", inner.gamma, other.gamma
181                                )));
182                            }
183
184                            if other.count() == 0 {
185                                return Ok(());
186                            }
187
188                            if inner.count() == 0 {
189                                inner.positive_store.merge(&other.positive_store);
190                                inner.negative_store.merge(&other.negative_store);
191                                inner.sum = other.sum.clone();
192                                inner.min_value = other.min_value.clone();
193                                inner.max_value = other.max_value.clone();
194                                return Ok(());
195                            }
196
197                            inner.positive_store.merge(&other.positive_store);
198                            inner.negative_store.merge(&other.negative_store);
199
200                            inner.sum = match inner.kind {
201                                NumberKind::F64 =>
202                                    Number::from(inner.sum.to_f64(&inner.kind) + other.sum.to_f64(&other.kind)),
203                                NumberKind::U64 => Number::from(inner.sum.to_u64(&inner.kind) + other.sum.to_u64(&other.kind)),
204                                NumberKind::I64 => Number::from(inner.sum.to_i64(&inner.kind) + other.sum.to_i64(&other.kind))
205                            };
206
207                            if inner.min_value.partial_cmp(&inner.kind, &other.min_value) == Some(Ordering::Greater) {
208                                inner.min_value = other.min_value.clone();
209                            };
210
211                            if inner.max_value.partial_cmp(&inner.kind, &other.max_value) == Some(Ordering::Less) {
212                                inner.max_value = other.max_value.clone();
213                            }
214
215                            Ok(())
216                        })
217                })
218        } else {
219            Err(MetricsError::InconsistentAggregator(format!(
220                "Expected {:?}, got: {:?}",
221                self, other
222            )))
223        }
224    }
225
226    fn as_any(&self) -> &dyn Any {
227        self
228    }
229}
230
231/// DDSKetch Configuration.
232#[derive(Debug)]
233pub struct DdSketchConfig {
234    alpha: f64,
235    max_num_bins: i64,
236    key_epsilon: f64,
237}
238
239impl DdSketchConfig {
240    /// Create a new DDSKetch config
241    pub fn new(alpha: f64, max_num_bins: i64, key_epsilon: f64) -> Self {
242        DdSketchConfig {
243            alpha,
244            max_num_bins,
245            key_epsilon,
246        }
247    }
248}
249
250/// DDSKetch implementation.
251///
252/// Note that Inner is not thread-safe. All operation should be protected by a lock or other
253/// synchronization.
254///
255/// Inner will also convert all Number into actual primitive type and back.
256///
257/// According to the paper, the DDSKetch only support positive number. Inner support
258/// either positive or negative number. But cannot yield actual result when input has
259/// both positive and negative number.
260#[derive(Debug)]
261struct Inner {
262    positive_store: Store,
263    negative_store: Store,
264    kind: NumberKind,
265    // sum of all value within store
266    sum: Number,
267    // γ = (1 + α)/(1 - α)
268    gamma: f64,
269    // ln(γ)
270    gamma_ln: f64,
271    // The epsilon when map value to bin key. Any value between [-key_epsilon, key_epsilon] will
272    // be mapped to bin key 0. Must be a positive number.
273    key_epsilon: f64,
274    // offset is here to ensure that keys for positive numbers that are larger than min_value are
275    // greater than or equal to 1 while the keys for negative numbers are less than or equal to -1.
276    offset: i64,
277
278    // minimum number that in store.
279    min_value: Number,
280    // maximum number that in store.
281    max_value: Number,
282}
283
284impl Inner {
285    fn new(config: &DdSketchConfig, kind: NumberKind) -> Inner {
286        let gamma: f64 = 1.0 + 2.0 * config.alpha / (1.0 - config.alpha);
287        let mut inner = Inner {
288            positive_store: Store::new(config.max_num_bins / 2),
289            negative_store: Store::new(config.max_num_bins / 2),
290            min_value: kind.max(),
291            max_value: kind.min(),
292            sum: kind.zero(),
293            gamma,
294            gamma_ln: gamma.ln(),
295            key_epsilon: config.key_epsilon,
296            offset: 0,
297            kind,
298        };
299        // reset offset based on key_epsilon
300        inner.offset = -(inner.log_gamma(inner.key_epsilon)).ceil() as i64 + 1i64;
301        inner
302    }
303
304    fn add(&mut self, v: &Number, kind: &NumberKind) {
305        let key = self.key(v, kind);
306        match v.partial_cmp(kind, &Number::from(0.0)) {
307            Some(Ordering::Greater) | Some(Ordering::Equal) => {
308                self.positive_store.add(key);
309            }
310            Some(Ordering::Less) => {
311                self.negative_store.add(key);
312            }
313            _ => {
314                // if return none. Do nothing and return
315                return;
316            }
317        }
318
319        // update min and max
320        if self.min_value.partial_cmp(&self.kind, v) == Some(Ordering::Greater) {
321            self.min_value = v.clone();
322        }
323
324        if self.max_value.partial_cmp(&self.kind, v) == Some(Ordering::Less) {
325            self.max_value = v.clone();
326        }
327
328        match &self.kind {
329            NumberKind::I64 => {
330                self.sum = Number::from(self.sum.to_i64(&self.kind) + v.to_i64(kind));
331            }
332            NumberKind::U64 => {
333                self.sum = Number::from(self.sum.to_u64(&self.kind) + v.to_u64(kind));
334            }
335            NumberKind::F64 => {
336                self.sum = Number::from(self.sum.to_f64(&self.kind) + v.to_f64(kind));
337            }
338        }
339    }
340
341    fn key(&self, num: &Number, kind: &NumberKind) -> i64 {
342        if num.to_f64(kind) < -self.key_epsilon {
343            let positive_num = match kind {
344                NumberKind::F64 => Number::from(-num.to_f64(kind)),
345                NumberKind::U64 => Number::from(num.to_u64(kind)),
346                NumberKind::I64 => Number::from(-num.to_i64(kind)),
347            };
348            (-self.log_gamma(positive_num.to_f64(kind)).ceil()) as i64 - self.offset
349        } else if num.to_f64(kind) > self.key_epsilon {
350            self.log_gamma(num.to_f64(kind)).ceil() as i64 + self.offset
351        } else {
352            0i64
353        }
354    }
355
356    /// get the index of the bucket based on num
357    fn log_gamma(&self, num: f64) -> f64 {
358        num.ln() / self.gamma_ln
359    }
360
361    fn count(&self) -> u64 {
362        self.negative_store.count + self.positive_store.count
363    }
364}
365
366#[derive(Debug)]
367struct Store {
368    bins: Vec<u64>,
369    count: u64,
370    min_key: i64,
371    max_key: i64,
372    // maximum number of bins Store can have.
373    // In the worst case, the bucket can grow as large as the number of the elements inserted into.
374    // max_num_bins helps control the number of bins.
375    max_num_bins: i64,
376}
377
378impl Default for Store {
379    fn default() -> Self {
380        Store {
381            bins: vec![0; INITIAL_NUM_BINS],
382            count: 0,
383            min_key: 0,
384            max_key: 0,
385            max_num_bins: DEFAULT_MAX_NUM_BINS,
386        }
387    }
388}
389
390/// DDSKetchInner stores the data
391impl Store {
392    fn new(max_num_bins: i64) -> Store {
393        Store {
394            bins: vec![
395                0;
396                if max_num_bins as usize > INITIAL_NUM_BINS {
397                    INITIAL_NUM_BINS
398                } else {
399                    max_num_bins as usize
400                }
401            ],
402            count: 0u64,
403            min_key: 0i64,
404            max_key: 0i64,
405            max_num_bins,
406        }
407    }
408
409    /// Add count based on key.
410    ///
411    /// If key is not in [min_key, max_key], we will expand to left or right
412    ///
413    ///
414    /// The bins are essentially working in a round-robin fashion where we can use all space in bins
415    /// to represent any continuous space within length. That's why we need to offset the key
416    /// with `min_key` so that we get the actual bin index.
417    fn add(&mut self, key: i64) {
418        if self.count == 0 {
419            self.max_key = key;
420            self.min_key = key - self.bins.len() as i64 + 1
421        }
422
423        if key < self.min_key {
424            self.grow_left(key)
425        } else if key > self.max_key {
426            self.grow_right(key)
427        }
428        let idx = if key - self.min_key < 0 {
429            0
430        } else {
431            key - self.min_key
432        };
433        // we unwrap here because grow_left or grow_right will make sure the idx is less than vector size
434        let bin_count = self.bins.get_mut(idx as usize).unwrap();
435        *bin_count += 1;
436        self.count += 1;
437    }
438
439    fn grow_left(&mut self, key: i64) {
440        if self.min_key < key || self.bins.len() >= self.max_num_bins as usize {
441            return;
442        }
443
444        let min_key = if self.max_key - key >= self.max_num_bins {
445            self.max_key - self.max_num_bins + 1
446        } else {
447            let mut min_key = self.min_key;
448            while min_key > key {
449                min_key -= GROW_LEFT_BY;
450            }
451            min_key
452        };
453
454        // The new vector will contain three parts.
455        // First part is all 0, which is the part expended
456        // Second part is from existing bins.
457        // Third part is what's left.
458        let expected_len = (self.max_key - min_key + 1) as usize;
459        let mut new_bins = vec![0u64; expected_len];
460        let old_bin_slice = &mut new_bins[(self.min_key - min_key) as usize..];
461        old_bin_slice.copy_from_slice(&self.bins);
462
463        self.bins = new_bins;
464        self.min_key = min_key;
465    }
466
467    fn grow_right(&mut self, key: i64) {
468        if self.max_key > key {
469            return;
470        }
471
472        if key - self.max_key >= self.max_num_bins {
473            // if currently key minus currently max key is larger than maximum number of bins.
474            // Move all elements in current bins into the first bin
475            self.bins = vec![0; self.max_num_bins as usize];
476            self.max_key = key;
477            self.min_key = key - self.max_num_bins + 1;
478            self.bins.get_mut(0).unwrap().add_assign(self.count);
479        } else if key - self.min_key >= self.max_num_bins {
480            let min_key = key - self.max_num_bins + 1;
481            let upper_bound = if min_key < self.max_key + 1 {
482                min_key
483            } else {
484                self.max_key + 1
485            } - self.min_key;
486            let n = self.bins.iter().take(upper_bound as usize).sum::<u64>();
487
488            if self.bins.len() < self.max_num_bins as usize {
489                let mut new_bins = vec![0; self.max_num_bins as usize];
490                new_bins[0..self.bins.len() - (min_key - self.min_key) as usize]
491                    .as_mut()
492                    .copy_from_slice(&self.bins[(min_key - self.min_key) as usize..]);
493                self.bins = new_bins;
494            } else {
495                // bins length is equal to max number of bins
496                self.bins.drain(0..(min_key - self.min_key) as usize);
497                if self.max_num_bins > self.max_key - min_key + 1 {
498                    self.bins.resize(
499                        self.bins.len()
500                            + (self.max_num_bins - (self.max_key - min_key + 1)) as usize,
501                        0,
502                    )
503                }
504            }
505            self.max_key = key;
506            self.min_key = min_key;
507            self.bins.get_mut(0).unwrap().add_assign(n);
508        } else {
509            let mut new_bin = vec![0; (key - self.min_key + 1) as usize];
510            new_bin[0..self.bins.len()]
511                .as_mut()
512                .copy_from_slice(&self.bins);
513            self.bins = new_bin;
514            self.max_key = key;
515        }
516    }
517
518    /// Merge two stores
519    fn merge(&mut self, other: &Store) {
520        if self.count == 0 {
521            return;
522        }
523        if other.count == 0 {
524            self.bins = other.bins.clone();
525            self.min_key = other.min_key;
526            self.max_key = other.max_key;
527            self.count = other.count;
528        }
529
530        if self.max_key > other.max_key {
531            if other.min_key < self.min_key {
532                self.grow_left(other.min_key);
533            }
534            let start = if other.min_key > self.min_key {
535                other.min_key
536            } else {
537                self.min_key
538            } as usize;
539            for i in start..other.max_key as usize {
540                self.bins[i - self.min_key as usize] = other.bins[i - other.min_key as usize];
541            }
542            let mut n = 0;
543            for i in other.min_key as usize..self.min_key as usize {
544                n += other.bins[i - other.min_key as usize]
545            }
546            self.bins[0] += n;
547        } else if other.min_key < self.min_key {
548            let mut tmp_bins = vec![0u64; other.bins.len()];
549            tmp_bins.as_mut_slice().copy_from_slice(&other.bins);
550
551            for i in self.min_key as usize..self.max_key as usize {
552                tmp_bins[i - other.min_key as usize] += self.bins[i - self.min_key as usize];
553            }
554
555            self.bins = tmp_bins;
556            self.max_key = other.max_key;
557            self.min_key = other.min_key;
558        } else {
559            self.grow_right(other.max_key);
560            for i in other.min_key as usize..(other.max_key + 1) as usize {
561                self.bins[i - self.min_key as usize] += other.bins[i - other.min_key as usize];
562            }
563        }
564
565        self.count += other.count;
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use crate::metrics::{Descriptor, InstrumentKind, Number, NumberKind};
573    use crate::sdk::export::metrics::{Aggregator, Count, Max, Min, Sum};
574    use rand_distr::{Distribution, Exp, LogNormal, Normal};
575    use std::cmp::Ordering;
576    use std::sync::Arc;
577
578    const TEST_MAX_BINS: i64 = 1024;
579    const TEST_ALPHA: f64 = 0.01;
580    const TEST_KEY_EPSILON: f64 = 1.0e-9;
581
582    // Test utils
583
584    struct Dataset {
585        data: Vec<Number>,
586        kind: NumberKind,
587    }
588
589    impl Dataset {
590        fn from_f64_vec(data: Vec<f64>) -> Dataset {
591            Dataset {
592                data: data.into_iter().map(Number::from).collect::<Vec<Number>>(),
593                kind: NumberKind::F64,
594            }
595        }
596
597        fn from_u64_vec(data: Vec<u64>) -> Dataset {
598            Dataset {
599                data: data.into_iter().map(Number::from).collect::<Vec<Number>>(),
600                kind: NumberKind::U64,
601            }
602        }
603
604        fn from_i64_vec(data: Vec<i64>) -> Dataset {
605            Dataset {
606                data: data.into_iter().map(Number::from).collect::<Vec<Number>>(),
607                kind: NumberKind::I64,
608            }
609        }
610
611        fn sum(&self) -> Number {
612            match self.kind {
613                NumberKind::F64 => {
614                    Number::from(self.data.iter().map(|e| e.to_f64(&self.kind)).sum::<f64>())
615                }
616                NumberKind::U64 => {
617                    Number::from(self.data.iter().map(|e| e.to_u64(&self.kind)).sum::<u64>())
618                }
619                NumberKind::I64 => {
620                    Number::from(self.data.iter().map(|e| e.to_i64(&self.kind)).sum::<i64>())
621                }
622            }
623        }
624    }
625
626    fn generate_linear_dataset_f64(start: f64, step: f64, num: usize) -> Vec<f64> {
627        let mut vec = Vec::with_capacity(num);
628        for i in 0..num {
629            vec.push((start + i as f64 * step) as f64);
630        }
631        vec
632    }
633
634    fn generate_linear_dataset_u64(start: u64, step: u64, num: usize) -> Vec<u64> {
635        let mut vec = Vec::with_capacity(num);
636        for i in 0..num {
637            vec.push(start + i as u64 * step);
638        }
639        vec
640    }
641
642    fn generate_linear_dataset_i64(start: i64, step: i64, num: usize) -> Vec<i64> {
643        let mut vec = Vec::with_capacity(num);
644        for i in 0..num {
645            vec.push(start + i as i64 * step);
646        }
647        vec
648    }
649
650    /// generate a dataset with normal distribution. Return sorted dataset.
651    fn generate_normal_dataset(mean: f64, stddev: f64, num: usize) -> Vec<f64> {
652        let normal = Normal::new(mean, stddev).unwrap();
653        let mut data = Vec::with_capacity(num);
654        for _ in 0..num {
655            data.push(normal.sample(&mut rand::thread_rng()));
656        }
657        data.as_mut_slice()
658            .sort_by(|a, b| a.partial_cmp(b).unwrap());
659        data
660    }
661
662    /// generate a dataset with log normal distribution. Return sorted dataset.
663    fn generate_log_normal_dataset(mean: f64, stddev: f64, num: usize) -> Vec<f64> {
664        let normal = LogNormal::new(mean, stddev).unwrap();
665        let mut data = Vec::with_capacity(num);
666        for _ in 0..num {
667            data.push(normal.sample(&mut rand::thread_rng()));
668        }
669        data.as_mut_slice()
670            .sort_by(|a, b| a.partial_cmp(b).unwrap());
671        data
672    }
673
674    fn generate_exponential_dataset(rate: f64, num: usize) -> Vec<f64> {
675        let exponential = Exp::new(rate).unwrap();
676        let mut data = Vec::with_capacity(num);
677        for _ in 0..num {
678            data.push(exponential.sample(&mut rand::thread_rng()));
679        }
680        data.as_mut_slice()
681            .sort_by(|a, b| a.partial_cmp(b).unwrap());
682        data
683    }
684
685    /// Insert all element of data into ddsketch and assert the quantile result is within the error range.
686    /// Note that data must be sorted.
687    fn evaluate_sketch(dataset: Dataset) {
688        let kind = &dataset.kind;
689        let ddsketch = DdSketchAggregator::new(
690            &DdSketchConfig::new(TEST_ALPHA, TEST_MAX_BINS, TEST_KEY_EPSILON),
691            kind.clone(),
692        );
693        let descriptor = Descriptor::new(
694            "test".to_string(),
695            "test",
696            None,
697            InstrumentKind::ValueRecorder,
698            kind.clone(),
699        );
700
701        for i in &dataset.data {
702            let _ = ddsketch.update(i, &descriptor);
703        }
704
705        assert_eq!(
706            ddsketch
707                .min()
708                .unwrap()
709                .partial_cmp(kind, dataset.data.get(0).unwrap()),
710            Some(Ordering::Equal)
711        );
712        assert_eq!(
713            ddsketch
714                .max()
715                .unwrap()
716                .partial_cmp(kind, dataset.data.last().unwrap()),
717            Some(Ordering::Equal)
718        );
719        assert_eq!(
720            ddsketch.sum().unwrap().partial_cmp(kind, &dataset.sum()),
721            Some(Ordering::Equal)
722        );
723        assert_eq!(ddsketch.count().unwrap(), dataset.data.len() as u64);
724    }
725
726    // Test basic operation of Store
727
728    /// First set max_num_bins < number of keys, test to see if the store will collapse to left
729    /// most bin instead of expending beyond the max_num_bins
730    #[test]
731    fn test_insert_into_store() {
732        let mut store = Store::new(200);
733        for i in -100..1300 {
734            store.add(i)
735        }
736        assert_eq!(store.count, 1400);
737        assert_eq!(store.bins.len(), 200);
738    }
739
740    /// Test to see if copy_from_slice will panic because the range size is different in left and right
741    #[test]
742    fn test_grow_right() {
743        let mut store = Store::new(150);
744        for i in &[-100, -50, 150, -20, 10] {
745            store.add(*i)
746        }
747        assert_eq!(store.count, 5);
748    }
749
750    /// Test to see if copy_from_slice will panic because the range size is different in left and right
751    #[test]
752    fn test_grow_left() {
753        let mut store = Store::new(150);
754        for i in &[500, 150, 10] {
755            store.add(*i)
756        }
757        assert_eq!(store.count, 3);
758    }
759
760    /// Before merge, store1 should hold 300 bins that looks like [201,1,1,1,...],
761    /// store 2 should hold 200 bins looks like [301,1,1,...]
762    /// After merge, store 1 should still hold 300 bins with following distribution
763    ///
764    /// index [0,0] -> 201
765    ///
766    /// index [1,99] -> 1
767    ///
768    /// index [100, 100] -> 302
769    ///
770    /// index [101, 299] -> 2
771    #[test]
772    fn test_merge_stores() {
773        let mut store1 = Store::new(300);
774        let mut store2 = Store::new(200);
775        for i in 500..1000 {
776            store1.add(i);
777            store2.add(i);
778        }
779        store1.merge(&store2);
780        assert_eq!(store1.bins.get(0), Some(&201));
781        assert_eq!(&store1.bins[1..100], vec![1u64; 99].as_slice());
782        assert_eq!(store1.bins[100], 302);
783        assert_eq!(&store1.bins[101..], vec![2u64; 199].as_slice());
784        assert_eq!(store1.count, 1000);
785    }
786
787    // Test ddsketch with different distribution
788
789    #[test]
790    fn test_linear_distribution() {
791        // test u64
792        let mut dataset = Dataset::from_u64_vec(generate_linear_dataset_u64(12, 3, 5000));
793        evaluate_sketch(dataset);
794
795        // test i64
796        dataset = Dataset::from_i64_vec(generate_linear_dataset_i64(-12, 3, 5000));
797        evaluate_sketch(dataset);
798
799        // test f64
800        dataset = Dataset::from_f64_vec(generate_linear_dataset_f64(-12.0, 3.0, 5000));
801        evaluate_sketch(dataset);
802    }
803
804    #[test]
805    fn test_normal_distribution() {
806        let mut dataset = Dataset::from_f64_vec(generate_normal_dataset(150.0, 1.2, 100));
807        evaluate_sketch(dataset);
808
809        dataset = Dataset::from_f64_vec(generate_normal_dataset(-30.0, 4.4, 100));
810        evaluate_sketch(dataset);
811    }
812
813    #[test]
814    fn test_log_normal_distribution() {
815        let dataset = Dataset::from_f64_vec(generate_log_normal_dataset(120.0, 0.5, 100));
816        evaluate_sketch(dataset);
817    }
818
819    #[test]
820    fn test_exponential_distribution() {
821        let dataset = Dataset::from_f64_vec(generate_exponential_dataset(2.0, 500));
822        evaluate_sketch(dataset);
823    }
824
825    // Test Aggregator operation of DDSketch
826    #[test]
827    fn test_synchronized_move() {
828        let dataset = Dataset::from_f64_vec(generate_normal_dataset(1.0, 3.5, 100));
829        let kind = &dataset.kind;
830        let ddsketch = DdSketchAggregator::new(
831            &DdSketchConfig::new(TEST_ALPHA, TEST_MAX_BINS, TEST_KEY_EPSILON),
832            kind.clone(),
833        );
834        let descriptor = Descriptor::new(
835            "test".to_string(),
836            "test",
837            None,
838            InstrumentKind::ValueRecorder,
839            kind.clone(),
840        );
841        for i in &dataset.data {
842            let _ = ddsketch.update(i, &descriptor);
843        }
844        let expected_sum = ddsketch.sum().unwrap().to_f64(&NumberKind::F64);
845        let expected_count = ddsketch.count().unwrap();
846        let expected_min = ddsketch.min().unwrap().to_f64(&NumberKind::F64);
847        let expected_max = ddsketch.max().unwrap().to_f64(&NumberKind::F64);
848
849        let moved_ddsketch: Arc<(dyn Aggregator + Send + Sync)> =
850            Arc::new(DdSketchAggregator::new(
851                &DdSketchConfig::new(TEST_ALPHA, TEST_MAX_BINS, TEST_KEY_EPSILON),
852                NumberKind::F64,
853            ));
854        let _ = ddsketch
855            .synchronized_move(&moved_ddsketch, &descriptor)
856            .expect("Fail to sync move");
857        let moved_ddsketch = moved_ddsketch
858            .as_any()
859            .downcast_ref::<DdSketchAggregator>()
860            .expect("Fail to cast dyn Aggregator down to DDSketchAggregator");
861
862        // assert sum, max, min and count
863        assert!(
864            (moved_ddsketch.max().unwrap().to_f64(&NumberKind::F64) - expected_max).abs()
865                < std::f64::EPSILON
866        );
867        assert!(
868            (moved_ddsketch.min().unwrap().to_f64(&NumberKind::F64) - expected_min).abs()
869                < std::f64::EPSILON
870        );
871        assert!(
872            (moved_ddsketch.sum().unwrap().to_f64(&NumberKind::F64) - expected_sum).abs()
873                < std::f64::EPSILON
874        );
875        assert_eq!(moved_ddsketch.count().unwrap(), expected_count);
876    }
877}