Skip to main content

sketches_ddsketch/
ddsketch.rs

1use std::{error, fmt};
2
3#[cfg(feature = "use_serde")]
4use serde::{Deserialize, Serialize};
5
6use crate::config::Config;
7use crate::store::Store;
8
9type Result<T> = std::result::Result<T, DDSketchError>;
10
11/// General error type for DDSketch, represents either an invalid quantile or an
12/// incompatible merge operation.
13#[derive(Debug, Clone)]
14pub enum DDSketchError {
15    Quantile,
16    Merge,
17}
18impl fmt::Display for DDSketchError {
19    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
20        match self {
21            DDSketchError::Quantile => {
22                write!(f, "Invalid quantile, must be between 0 and 1 (inclusive)")
23            }
24            DDSketchError::Merge => write!(f, "Can not merge sketches with different configs"),
25        }
26    }
27}
28impl error::Error for DDSketchError {
29    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
30        // Generic
31        None
32    }
33}
34
35/// This struct represents a [DDSketch](https://arxiv.org/pdf/1908.10693.pdf)
36#[derive(Clone)]
37#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
38pub struct DDSketch {
39    pub(crate) config: Config,
40    pub(crate) store: Store,
41    pub(crate) negative_store: Store,
42    pub(crate) min: f64,
43    pub(crate) max: f64,
44    pub(crate) sum: f64,
45    pub(crate) zero_count: u64,
46}
47
48impl Default for DDSketch {
49    fn default() -> Self {
50        Self::new(Default::default())
51    }
52}
53
54// XXX: functions should return Option<> in the case of empty
55impl DDSketch {
56    /// Construct a `DDSketch`. Requires a `Config` specifying the parameters of the sketch
57    pub fn new(config: Config) -> Self {
58        DDSketch {
59            config,
60            store: Store::new(config.max_num_bins as usize),
61            negative_store: Store::new(config.max_num_bins as usize),
62            min: f64::INFINITY,
63            max: f64::NEG_INFINITY,
64            sum: 0.0,
65            zero_count: 0,
66        }
67    }
68
69    /// Add the sample to the sketch
70    pub fn add(&mut self, v: f64) {
71        if v > self.config.min_possible() {
72            let key = self.config.key(v);
73            self.store.add(key);
74        } else if v < -self.config.min_possible() {
75            let key = self.config.key(-v);
76            self.negative_store.add(key);
77        } else {
78            self.zero_count += 1;
79        }
80
81        if v < self.min {
82            self.min = v;
83        }
84        if self.max < v {
85            self.max = v;
86        }
87        self.sum += v;
88    }
89
90    /// Return the quantile value for quantiles between 0.0 and 1.0. Result is an error, represented
91    /// as DDSketchError::Quantile if the requested quantile is outside of that range.
92    ///
93    /// If the sketch is empty the result is None, else Some(v) for the quantile value.
94    pub fn quantile(&self, q: f64) -> Result<Option<f64>> {
95        if !(0.0..=1.0).contains(&q) {
96            return Err(DDSketchError::Quantile);
97        }
98
99        if self.empty() {
100            return Ok(None);
101        }
102
103        if q == 0.0 {
104            return Ok(Some(self.min));
105        } else if q == 1.0 {
106            return Ok(Some(self.max));
107        }
108
109        let rank = (q * (self.count() as f64 - 1.0)) as u64;
110        let quantile;
111        if rank < self.negative_store.count() {
112            let reversed_rank = self.negative_store.count() - rank - 1;
113            let key = self.negative_store.key_at_rank(reversed_rank);
114            quantile = -self.config.value(key);
115        } else if rank < self.zero_count + self.negative_store.count() {
116            quantile = 0.0;
117        } else {
118            let key = self
119                .store
120                .key_at_rank(rank - self.zero_count - self.negative_store.count());
121            quantile = self.config.value(key);
122        }
123
124        Ok(Some(quantile))
125    }
126
127    /// Returns the minimum value seen, or None if sketch is empty
128    pub fn min(&self) -> Option<f64> {
129        if self.empty() {
130            None
131        } else {
132            Some(self.min)
133        }
134    }
135
136    /// Returns the maximum value seen, or None if sketch is empty
137    pub fn max(&self) -> Option<f64> {
138        if self.empty() {
139            None
140        } else {
141            Some(self.max)
142        }
143    }
144
145    /// Returns the sum of values seen, or None if sketch is empty
146    pub fn sum(&self) -> Option<f64> {
147        if self.empty() {
148            None
149        } else {
150            Some(self.sum)
151        }
152    }
153
154    /// Returns the number of values added to the sketch
155    pub fn count(&self) -> usize {
156        (self.store.count() + self.zero_count + self.negative_store.count()) as usize
157    }
158
159    /// Returns the length of the underlying `Store`. This is mainly only useful for understanding
160    /// how much the sketch has grown given the inserted values.
161    pub fn length(&self) -> usize {
162        self.store.length() as usize + self.negative_store.length() as usize
163    }
164
165    /// Merge the contents of another sketch into this one. The sketch that is merged into this one
166    /// is unchanged after the merge.
167    pub fn merge(&mut self, o: &DDSketch) -> Result<()> {
168        if self.config != o.config {
169            return Err(DDSketchError::Merge);
170        }
171
172        let was_empty = self.empty();
173
174        // Merge the stores
175        self.store.merge(&o.store);
176        self.negative_store.merge(&o.negative_store);
177        self.zero_count += o.zero_count;
178
179        // Need to ensure we don't override min/max with initializers
180        // if either store were empty
181        if was_empty {
182            self.min = o.min;
183            self.max = o.max;
184        } else if !o.empty() {
185            if o.min < self.min {
186                self.min = o.min
187            }
188            if o.max > self.max {
189                self.max = o.max;
190            }
191        }
192        self.sum += o.sum;
193
194        Ok(())
195    }
196
197    fn empty(&self) -> bool {
198        self.count() == 0
199    }
200
201    /// Encode this sketch into the Java-compatible binary format used by
202    /// `com.datadoghq.sketch.ddsketch.DDSketchWithExactSummaryStatistics`.
203    pub fn to_java_bytes(&self) -> Vec<u8> {
204        crate::encoding::encode_to_java_bytes(self)
205    }
206
207    /// Decode a sketch from the Java-compatible binary format.
208    /// Accepts bytes produced by Java's `DDSketchWithExactSummaryStatistics.encode()`
209    /// with or without the `0x02` version prefix.
210    pub fn from_java_bytes(
211        bytes: &[u8],
212    ) -> std::result::Result<Self, crate::encoding::DecodeError> {
213        crate::encoding::decode_from_java_bytes(bytes)
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use approx::assert_relative_eq;
220
221    use crate::{Config, DDSketch};
222
223    #[test]
224    fn test_add_zero() {
225        let alpha = 0.01;
226        let c = Config::new(alpha, 2048, 10e-9);
227        let mut dd = DDSketch::new(c);
228        dd.add(0.0);
229    }
230
231    #[test]
232    fn test_quartiles() {
233        let alpha = 0.01;
234        let c = Config::new(alpha, 2048, 10e-9);
235        let mut dd = DDSketch::new(c);
236
237        // Initialize sketch with {1.0, 2.0, 3.0, 4.0}
238        for i in 1..5 {
239            dd.add(i as f64);
240        }
241
242        // We expect the following mappings from quantile to value:
243        // [0,0.33]: 1.0, (0.34,0.66]: 2.0, (0.67,0.99]: 3.0, (0.99, 1.0]: 4.0
244        let test_cases = vec![
245            (0.0, 1.0),
246            (0.25, 1.0),
247            (0.33, 1.0),
248            (0.34, 2.0),
249            (0.5, 2.0),
250            (0.66, 2.0),
251            (0.67, 3.0),
252            (0.75, 3.0),
253            (0.99, 3.0),
254            (1.0, 4.0),
255        ];
256
257        for (q, val) in test_cases {
258            assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha);
259        }
260    }
261
262    #[test]
263    fn test_neg_quartiles() {
264        let alpha = 0.01;
265        let c = Config::new(alpha, 2048, 10e-9);
266        let mut dd = DDSketch::new(c);
267
268        // Initialize sketch with {1.0, 2.0, 3.0, 4.0}
269        for i in 1..5 {
270            dd.add(-i as f64);
271        }
272
273        let test_cases = vec![
274            (0.0, -4.0),
275            (0.25, -4.0),
276            (0.5, -3.0),
277            (0.75, -2.0),
278            (1.0, -1.0),
279        ];
280
281        for (q, val) in test_cases {
282            assert_relative_eq!(dd.quantile(q).unwrap().unwrap(), val, max_relative = alpha);
283        }
284    }
285
286    #[test]
287    fn test_simple_quantile() {
288        let c = Config::defaults();
289        let mut dd = DDSketch::new(c);
290
291        for i in 1..101 {
292            dd.add(i as f64);
293        }
294
295        assert_eq!(dd.quantile(0.95).unwrap().unwrap().ceil(), 95.0);
296
297        assert!(dd.quantile(-1.01).is_err());
298        assert!(dd.quantile(1.01).is_err());
299    }
300
301    #[test]
302    fn test_empty_sketch() {
303        let c = Config::defaults();
304        let dd = DDSketch::new(c);
305
306        assert_eq!(dd.quantile(0.98).unwrap(), None);
307        assert_eq!(dd.max(), None);
308        assert_eq!(dd.min(), None);
309        assert_eq!(dd.sum(), None);
310        assert_eq!(dd.count(), 0);
311
312        assert!(dd.quantile(1.01).is_err());
313    }
314
315    #[test]
316    fn test_basic_histogram_data() {
317        let values = &[
318            0.754225035,
319            0.752900282,
320            0.752812246,
321            0.752602367,
322            0.754310155,
323            0.753525981,
324            0.752981082,
325            0.752715536,
326            0.751667941,
327            0.755079054,
328            0.753528150,
329            0.755188464,
330            0.752508723,
331            0.750064549,
332            0.753960428,
333            0.751139298,
334            0.752523560,
335            0.753253428,
336            0.753498342,
337            0.751858358,
338            0.752104636,
339            0.753841300,
340            0.754467374,
341            0.753814334,
342            0.750881719,
343            0.753182556,
344            0.752576884,
345            0.753945708,
346            0.753571911,
347            0.752314573,
348            0.752586651,
349        ];
350
351        let c = Config::defaults();
352        let mut dd = DDSketch::new(c);
353
354        for value in values {
355            dd.add(*value);
356        }
357
358        assert_eq!(dd.max(), Some(0.755188464));
359        assert_eq!(dd.min(), Some(0.750064549));
360        assert_eq!(dd.count(), 31);
361        assert_eq!(dd.sum(), Some(23.343630625000003));
362
363        assert!(dd.quantile(0.25).unwrap().is_some());
364        assert!(dd.quantile(0.5).unwrap().is_some());
365        assert!(dd.quantile(0.75).unwrap().is_some());
366    }
367
368    #[test]
369    fn test_length() {
370        let mut dd = DDSketch::default();
371        assert_eq!(dd.length(), 0);
372
373        dd.add(1.0);
374        assert_eq!(dd.length(), 128);
375        dd.add(2.0);
376        dd.add(3.0);
377        assert_eq!(dd.length(), 128);
378
379        dd.add(-1.0);
380        assert_eq!(dd.length(), 256);
381        dd.add(-2.0);
382        dd.add(-3.0);
383        assert_eq!(dd.length(), 256);
384    }
385}