Skip to main content

libdd_ddsketch/
lib.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4//! This crate defines a minimal implementation of DDSketch.
5//!
6//! DDSketch is a data sketch used to generate percentiles over streaming data using constant
7//! memory. A DDSketch is essentially a histogram that partitions the range of positive values into
8//! an infinite number of indexed bins whose size grows exponentially. It keeps track of the number
9//! of values (or possibly floating-point weights) added to each bin. Negative values are
10//! partitioned like positive values, symmetrically to zero. The value zero as well as its close
11//! neighborhood that would be mapped to extreme bin indexes is mapped to a specific counter.
12
13#![cfg_attr(not(test), deny(clippy::panic))]
14#![cfg_attr(not(test), deny(clippy::unwrap_used))]
15#![cfg_attr(not(test), deny(clippy::expect_used))]
16#![cfg_attr(not(test), deny(clippy::todo))]
17#![cfg_attr(not(test), deny(clippy::unimplemented))]
18
19use std::collections::{HashMap, VecDeque};
20
21use prost::Message;
22
23/// Protobuf representation of DDSketch - generated by build.rs
24#[rustfmt::skip]
25pub mod pb;
26
27/// This is a minimal DDSketch implementation
28///
29/// This implementation only supports a part of the standard (which is also only the parts dd
30/// backend supports :shrug:)
31/// - max length contiguous bin store, with lower bin collapse behavior.
32/// - Positive or zero values
33///
34/// The default sketch has a 1% relative accuracy, and only accepts positive points
35///
36/// See <https://github.com/DataDog/sketches-go> for the reference implementation
37#[derive(Debug, Default, Clone)]
38pub struct DDSketch {
39    store: LowCollapsingDenseStore, // Store the weight of each bin
40    zero_count: f64,                // Store the weight of the bin of value 0
41    mapping: LogMapping,            // Bin-Value mapping
42}
43
44impl DDSketch {
45    /// Return an iterator over `(value, weight)` pair for each bin
46    pub fn ordered_bins(&self) -> Vec<(f64, f64)> {
47        let mut bins: Vec<_> = std::iter::once((0.0, self.zero_count))
48            .chain(self.store.bins().map(|(b, v)| (self.mapping.value(b), v)))
49            .collect();
50        bins.sort_by(|a, b| a.0.total_cmp(&b.0));
51        bins
52    }
53
54    // Return the number of points in the sketch
55    pub fn count(&self) -> f64 {
56        self.zero_count + self.store.bins.iter().sum::<f64>()
57    }
58
59    /// Add a point with value `point` to the sketch
60    /// `point` must be positive
61    pub fn add(&mut self, point: f64) -> Result<(), Box<dyn std::error::Error>> {
62        self.add_with_count(point, 1.0)
63    }
64
65    /// Add `count` point with value `point` to the sketch
66    /// `count` and `point` must be positive
67    pub fn add_with_count(
68        &mut self,
69        point: f64,
70        count: f64,
71    ) -> Result<(), Box<dyn std::error::Error>> {
72        if count.is_nan() || count.is_infinite() {
73            return Err("count is invalid".into());
74        }
75        if point < 0.0 || point.is_nan() || point.is_infinite() {
76            return Err("point is invalid".into());
77        } else if point < self.mapping.min_indexable_value {
78            self.zero_count += count;
79        } else {
80            let index = self.mapping.index(point);
81            *self.store.bin_mut(index) += count;
82        }
83        Ok(())
84    }
85
86    /// Return a protobuf of the sketch
87    pub fn into_pb(self) -> pb::DdSketch {
88        let contiguous_bins: Vec<f64> = self.store.bins.into();
89        pb::DdSketch {
90            mapping: Some(pb::IndexMapping {
91                gamma: self.mapping.gamma,
92                index_offset: self.mapping.index_offset,
93                interpolation: pb::index_mapping::Interpolation::None.into(),
94            }),
95            positive_values: Some(pb::Store {
96                bin_counts: HashMap::new(),
97                contiguous_bin_counts: contiguous_bins,
98                contiguous_bin_index_offset: self.store.offset,
99            }),
100            zero_count: self.zero_count,
101            negative_values: Some(pb::Store {
102                bin_counts: HashMap::new(),
103                contiguous_bin_counts: Vec::new(),
104                contiguous_bin_index_offset: 0,
105            }),
106        }
107    }
108
109    /// Return a serialized protobuf of the sketch
110    pub fn encode_to_vec(self) -> Vec<u8> {
111        self.into_pb().encode_to_vec()
112    }
113}
114
115/// A store mapping the bin indexes to their respective weights
116///
117/// Stores the weights as contiguousBinCounts, only the bins within `offset` and the highest
118/// non empty bin are stored.
119///
120/// Stores the weights of a contiguous range of bins containing all non-empty bins. The range start
121/// at index `offset` and end at index `offset + bins.len()`
122///
123/// The range of stored bins is updated when accessing the index of a bin out of the range
124/// with [`Self::bin_idx_to_bin_idx()`]. If the `max_size` is reached the lower bins are
125/// collapsed together to free space.
126#[derive(Debug, Clone)]
127struct LowCollapsingDenseStore {
128    bins: VecDeque<f64>,
129    offset: i32,
130    max_size: i32,
131}
132
133impl LowCollapsingDenseStore {
134    fn new(max_size: i32) -> Option<Self> {
135        if max_size < 0 {
136            return None;
137        }
138        Some(Self {
139            bins: VecDeque::new(),
140            offset: 0,
141            max_size,
142        })
143    }
144
145    /// Return an iterator over the bins
146    /// The iterator yields pairs `(bin_index, count)`
147    fn bins(&self) -> impl Iterator<Item = (i32, f64)> + '_ {
148        self.bins
149            .iter()
150            .enumerate()
151            .map(|(i, &v)| (i as i32 + self.offset, v))
152    }
153
154    /// Return a mutable reference to the bin at index `bin_index`
155    fn bin_mut(&mut self, bin_index: i32) -> &mut f64 {
156        let store_index = self.bin_idx_to_store_idx(bin_index);
157        &mut self.bins[store_index]
158    }
159
160    /// Return the `store_index` of the bin at index `bin_index`
161    fn bin_idx_to_store_idx(&mut self, bin_index: i32) -> usize {
162        if self.bins.is_empty() {
163            // If the bins are empty, start them at the index
164            self.offset = bin_index;
165            self.bins.push_back(0.0);
166            return 0;
167        }
168
169        // General case
170        // Bucket lower than the stored range
171        if bin_index < self.offset {
172            let additional_low_bins = self.offset - bin_index;
173            debug_assert!(additional_low_bins >= 0);
174
175            let additional_low_bins = std::cmp::min(
176                additional_low_bins as usize,
177                self.max_size as usize - self.bins.len(),
178            );
179
180            self.bins.reserve(additional_low_bins);
181            for _ in 0..additional_low_bins {
182                self.bins.push_front(0.0);
183            }
184
185            self.offset -= additional_low_bins as i32;
186            0
187        }
188        // Bucket higher than the stored range
189        else if self.offset + self.bins.len() as i32 <= bin_index {
190            let bin_range_size = bin_index - self.offset + 1; // Number of bucket to store
191
192            if bin_range_size > self.max_size {
193                self.collapse_low_bins(bin_range_size - self.max_size);
194            }
195            debug_assert!(self.bins.len() as i32 <= self.max_size);
196
197            let store_index = bin_index - self.offset;
198            for _ in 0..(store_index - self.bins.len() as i32 + 1) {
199                self.bins.push_back(0.0);
200            }
201            store_index as usize
202        }
203        // Bucket within the stored range
204        else {
205            (bin_index - self.offset) as usize
206        }
207    }
208
209    /// Collapse the `bin_number` lowest bins
210    fn collapse_low_bins(&mut self, bin_number: i32) {
211        let mut count = 0.0;
212        for _ in 0..bin_number {
213            count += self.bins.pop_front().unwrap_or(0.0);
214        }
215        if let Some(lowest_bin) = self.bins.front_mut() {
216            *lowest_bin += count;
217        } else {
218            self.bins.push_front(count);
219        }
220        self.offset += bin_number;
221    }
222}
223
224impl Default for LowCollapsingDenseStore {
225    fn default() -> Self {
226        #[allow(clippy::unwrap_used)]
227        Self::new(2048).unwrap()
228    }
229}
230
231/// Logarithmic mapping of bucket index to value
232#[derive(Debug, Clone, Copy)]
233struct LogMapping {
234    gamma: f64,
235    multiplier: f64,
236    min_indexable_value: f64,
237    index_offset: f64,
238}
239
240impl LogMapping {
241    fn new(gamma: f64, offset: f64) -> Option<Self> {
242        if gamma <= 1.0 {
243            return None;
244        }
245        let multiplier = Self::multiplier_from_gamma(gamma);
246        Some(Self {
247            gamma,
248            multiplier,
249            min_indexable_value: max(
250                // So that the value representing the lowest bucket is >= std::f64::MIN_POSITIVE
251                f64::MIN_POSITIVE * gamma,
252                // Minimum value so that index >= i32::MIN
253                ((i32::MIN as f64 - offset) / multiplier + 1.0).exp(),
254            )?,
255            index_offset: offset,
256        })
257    }
258
259    /// Returns the multiplier used to convert ln to base-gamma logarithm
260    fn multiplier_from_gamma(gamma: f64) -> f64 {
261        1.0 / gamma.ln()
262    }
263
264    /// Returns the relative accuracy guaranteed by the mapping
265    fn relative_accuracy(&self) -> f64 {
266        1.0 - 2.0 / (1.0 + self.gamma)
267    }
268
269    /// Returns the index of the bucket containing `value`
270    fn index(&self, value: f64) -> i32 {
271        (value.ln() * self.multiplier + self.index_offset).floor() as i32
272    }
273
274    /// Returns the value representing the bucket at `index`
275    fn value(&self, index: i32) -> f64 {
276        ((index as f64 - self.index_offset) / self.multiplier).exp()
277            * (1.0 + self.relative_accuracy())
278    }
279}
280
281impl Default for LogMapping {
282    fn default() -> Self {
283        const RELATIVE_ACCURACY: f64 = 0.007751937984496124;
284        const GAMMA: f64 = (1.0 + RELATIVE_ACCURACY) / (1.0 - RELATIVE_ACCURACY);
285
286        const BACKEND_SKETCH_MIN_VALUE: f64 = 1e-9;
287        // offset used in datadog's backend for sketches
288        let offset: f64 = (1.0 - (BACKEND_SKETCH_MIN_VALUE.ln() / GAMMA.ln()).floor()) + 0.5;
289
290        #[allow(clippy::unwrap_used)]
291        Self::new(GAMMA, offset).unwrap()
292    }
293}
294
295fn max(a: f64, b: f64) -> Option<f64> {
296    if a.is_nan() || b.is_nan() {
297        None
298    } else if a > b {
299        Some(a)
300    } else {
301        Some(b)
302    }
303}
304
305#[cfg(test)]
306mod test {
307    use prost::Message;
308
309    use super::*;
310
311    macro_rules! assert_within {
312        ($x:expr, $y:expr, $tolerance:expr) => {
313            let diff = $x - $y;
314            assert!(
315                -$tolerance < diff && diff < $tolerance,
316                "x: {} y: {}",
317                $x,
318                $y,
319            );
320        };
321    }
322
323    #[test]
324    fn test_exponential_mapping_within_tolerances() {
325        let mapping = LogMapping::default();
326
327        let values: &[f64] = &[1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
328        for &value in values {
329            let index = mapping.index(value);
330            let value_bucket = mapping.value(index);
331
332            assert_within!(value_bucket / value, 1.0, 0.01);
333        }
334    }
335
336    #[test]
337    fn test_exponential_mapping_relative_accuracy() {
338        let mapping = LogMapping::default();
339
340        assert_within!(
341            mapping.relative_accuracy(),
342            0.007751937984496138,
343            f64::EPSILON
344        );
345    }
346
347    #[test]
348    fn test_sketch_add() {
349        let mut sketch = DDSketch::default();
350        let points: &[f64] = &[0.0, 1e-5, 0.1, 2.0, 10.0, 25.0, 10000.0];
351        for (i, &point) in points.iter().enumerate() {
352            assert!(sketch.add_with_count(point, i as f64 + 1.0).is_ok());
353        }
354
355        dbg!(sketch.store.bins.len(), sketch.store.offset);
356
357        for (i, (value, count)) in sketch
358            .ordered_bins()
359            .into_iter()
360            .filter(|(_, p)| *p != 0.0)
361            .enumerate()
362        {
363            if points[i] == 0.0 {
364                assert_within!(value, 0.0, f64::EPSILON);
365                assert_within!(count, i as f64 + 1.0, f64::EPSILON);
366            } else {
367                assert_within!(value / points[i], 1.0, 0.01);
368                assert_within!(count, i as f64 + 1.0, f64::EPSILON);
369            }
370        }
371    }
372
373    #[test]
374    fn test_skecth_add_negative() {
375        let mut sketch = DDSketch::default();
376        assert!(sketch.add(-1.0).is_err());
377    }
378
379    #[test]
380    fn test_skecth_add_nan() {
381        let mut sketch = DDSketch::default();
382        assert!(sketch.add(f64::NAN).is_err());
383    }
384
385    #[test]
386    fn test_sketch_count_add_with_count() {
387        let mut sketch = DDSketch::default();
388        assert_within!(sketch.count(), 0.0, f64::EPSILON);
389
390        let points: &[f64] = &[0.0, 1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
391        for (i, &point) in points.iter().enumerate() {
392            assert!(sketch.add_with_count(point, i as f64).is_ok());
393        }
394
395        assert_within!(sketch.count(), 21.0, f64::EPSILON);
396    }
397
398    #[test]
399    fn test_sketch_count_add() {
400        let mut sketch = DDSketch::default();
401        assert_within!(sketch.count(), 0.0, f64::EPSILON);
402
403        let points: &[f64] = &[0.0, 1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
404        for &point in points.iter() {
405            assert!(sketch.add(point).is_ok());
406        }
407        assert_within!(sketch.count(), 7.0, f64::EPSILON);
408
409        assert!(sketch.add(1.0).is_ok());
410        assert_within!(sketch.count(), 8.0, f64::EPSILON);
411
412        for n in 0..100 {
413            assert!(sketch.add(n as f64).is_ok());
414        }
415        assert_within!(sketch.count(), 108.0, f64::EPSILON);
416    }
417
418    #[test]
419    fn test_skecth_encode() {
420        let mut sketch = DDSketch::default();
421        let points: &[f64] = &[0.0, 1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
422        for (i, &point) in points.iter().enumerate() {
423            assert!(sketch.add_with_count(point, i as f64).is_ok());
424        }
425
426        let pb_sketch = sketch.into_pb().encode_to_vec();
427        assert!(!pb_sketch.is_empty());
428    }
429
430    #[test]
431    fn test_low_collapsing_store() {
432        let mut store = LowCollapsingDenseStore::new(5).unwrap();
433
434        // Test initial push up to capacity
435        for i in 0..5 {
436            *store.bin_mut(i + 10) = 1.0;
437        }
438        for (i, b) in store.bins().enumerate() {
439            assert_eq!(b.0, i as i32 + 10);
440            assert_eq!(b.1, 1.0)
441        }
442
443        // Indexing existing bins
444        for i in 0..5 {
445            *store.bin_mut(i + 10) += 1.0;
446        }
447        for (i, b) in store.bins().enumerate() {
448            assert_eq!(b.0, i as i32 + 10);
449            assert_eq!(b.1, 2.0)
450        }
451    }
452
453    #[test]
454    fn test_low_collapsing_store_low_bins_are_collapsed() {
455        let mut store = LowCollapsingDenseStore::new(5).unwrap();
456
457        // Test initial push up to capacity to max
458        for i in 0..5 {
459            *store.bin_mut(i + 10) = 1.0;
460        }
461
462        // Indexing low bins at max capacity
463        for i in 0..3 {
464            *store.bin_mut(i) += 1.0;
465        }
466        for (i, b) in store.bins().enumerate() {
467            assert_eq!(b.0, i as i32 + 10);
468            if i == 0 {
469                assert_eq!(b.1, 4.0)
470            } else {
471                assert_eq!(b.1, 1.0)
472            }
473        }
474
475        // Indexing higer bins collapses lower bins
476        *store.bin_mut(15) = 1.0;
477        for (i, b) in store.bins().enumerate() {
478            assert_eq!(b.0, i as i32 + 11);
479            if i == 0 {
480                assert_eq!(b.1, 5.0)
481            } else {
482                assert_eq!(b.1, 1.0)
483            }
484        }
485    }
486
487    #[test]
488    fn test_low_collapsing_store_up_expansion() {
489        let mut store = LowCollapsingDenseStore::new(3).unwrap();
490
491        *store.bin_mut(1) = 1.0;
492        *store.bin_mut(3) = 1.0;
493        assert_eq!(
494            store.bins().collect::<Vec<_>>(),
495            &[(1, 1.0), (2, 0.0), (3, 1.0)]
496        )
497    }
498
499    #[test]
500    fn test_low_collapsing_store_down_expansion() {
501        let mut store = LowCollapsingDenseStore::new(3).unwrap();
502
503        *store.bin_mut(3) = 1.0;
504        *store.bin_mut(1) = 1.0;
505        assert_eq!(
506            store.bins().collect::<Vec<_>>(),
507            &[(1, 1.0), (2, 0.0), (3, 1.0)]
508        )
509    }
510}