1#![cfg_attr(not(test), deny(clippy::panic))]
14#![cfg_attr(not(test), deny(clippy::unwrap_used))]
15#![cfg_attr(not(test), deny(clippy::expect_used))]
16#![cfg_attr(not(test), deny(clippy::todo))]
17#![cfg_attr(not(test), deny(clippy::unimplemented))]
18
19use std::collections::{HashMap, VecDeque};
20
21use prost::Message;
22
23#[rustfmt::skip]
25pub mod pb;
26
27#[derive(Debug, Default, Clone)]
38pub struct DDSketch {
39 store: LowCollapsingDenseStore, zero_count: f64, mapping: LogMapping, }
43
44impl DDSketch {
45 pub fn ordered_bins(&self) -> Vec<(f64, f64)> {
47 let mut bins: Vec<_> = std::iter::once((0.0, self.zero_count))
48 .chain(self.store.bins().map(|(b, v)| (self.mapping.value(b), v)))
49 .collect();
50 bins.sort_by(|a, b| a.0.total_cmp(&b.0));
51 bins
52 }
53
54 pub fn count(&self) -> f64 {
56 self.zero_count + self.store.bins.iter().sum::<f64>()
57 }
58
59 pub fn add(&mut self, point: f64) -> Result<(), Box<dyn std::error::Error>> {
62 self.add_with_count(point, 1.0)
63 }
64
65 pub fn add_with_count(
68 &mut self,
69 point: f64,
70 count: f64,
71 ) -> Result<(), Box<dyn std::error::Error>> {
72 if count.is_nan() || count.is_infinite() {
73 return Err("count is invalid".into());
74 }
75 if point < 0.0 || point.is_nan() || point.is_infinite() {
76 return Err("point is invalid".into());
77 } else if point < self.mapping.min_indexable_value {
78 self.zero_count += count;
79 } else {
80 let index = self.mapping.index(point);
81 *self.store.bin_mut(index) += count;
82 }
83 Ok(())
84 }
85
86 pub fn into_pb(self) -> pb::DdSketch {
88 let contiguous_bins: Vec<f64> = self.store.bins.into();
89 pb::DdSketch {
90 mapping: Some(pb::IndexMapping {
91 gamma: self.mapping.gamma,
92 index_offset: self.mapping.index_offset,
93 interpolation: pb::index_mapping::Interpolation::None.into(),
94 }),
95 positive_values: Some(pb::Store {
96 bin_counts: HashMap::new(),
97 contiguous_bin_counts: contiguous_bins,
98 contiguous_bin_index_offset: self.store.offset,
99 }),
100 zero_count: self.zero_count,
101 negative_values: Some(pb::Store {
102 bin_counts: HashMap::new(),
103 contiguous_bin_counts: Vec::new(),
104 contiguous_bin_index_offset: 0,
105 }),
106 }
107 }
108
109 pub fn encode_to_vec(self) -> Vec<u8> {
111 self.into_pb().encode_to_vec()
112 }
113}
114
115#[derive(Debug, Clone)]
127struct LowCollapsingDenseStore {
128 bins: VecDeque<f64>,
129 offset: i32,
130 max_size: i32,
131}
132
133impl LowCollapsingDenseStore {
134 fn new(max_size: i32) -> Option<Self> {
135 if max_size < 0 {
136 return None;
137 }
138 Some(Self {
139 bins: VecDeque::new(),
140 offset: 0,
141 max_size,
142 })
143 }
144
145 fn bins(&self) -> impl Iterator<Item = (i32, f64)> + '_ {
148 self.bins
149 .iter()
150 .enumerate()
151 .map(|(i, &v)| (i as i32 + self.offset, v))
152 }
153
154 fn bin_mut(&mut self, bin_index: i32) -> &mut f64 {
156 let store_index = self.bin_idx_to_store_idx(bin_index);
157 &mut self.bins[store_index]
158 }
159
160 fn bin_idx_to_store_idx(&mut self, bin_index: i32) -> usize {
162 if self.bins.is_empty() {
163 self.offset = bin_index;
165 self.bins.push_back(0.0);
166 return 0;
167 }
168
169 if bin_index < self.offset {
172 let additional_low_bins = self.offset - bin_index;
173 debug_assert!(additional_low_bins >= 0);
174
175 let additional_low_bins = std::cmp::min(
176 additional_low_bins as usize,
177 self.max_size as usize - self.bins.len(),
178 );
179
180 self.bins.reserve(additional_low_bins);
181 for _ in 0..additional_low_bins {
182 self.bins.push_front(0.0);
183 }
184
185 self.offset -= additional_low_bins as i32;
186 0
187 }
188 else if self.offset + self.bins.len() as i32 <= bin_index {
190 let bin_range_size = bin_index - self.offset + 1; if bin_range_size > self.max_size {
193 self.collapse_low_bins(bin_range_size - self.max_size);
194 }
195 debug_assert!(self.bins.len() as i32 <= self.max_size);
196
197 let store_index = bin_index - self.offset;
198 for _ in 0..(store_index - self.bins.len() as i32 + 1) {
199 self.bins.push_back(0.0);
200 }
201 store_index as usize
202 }
203 else {
205 (bin_index - self.offset) as usize
206 }
207 }
208
209 fn collapse_low_bins(&mut self, bin_number: i32) {
211 let mut count = 0.0;
212 for _ in 0..bin_number {
213 count += self.bins.pop_front().unwrap_or(0.0);
214 }
215 if let Some(lowest_bin) = self.bins.front_mut() {
216 *lowest_bin += count;
217 } else {
218 self.bins.push_front(count);
219 }
220 self.offset += bin_number;
221 }
222}
223
224impl Default for LowCollapsingDenseStore {
225 fn default() -> Self {
226 #[allow(clippy::unwrap_used)]
227 Self::new(2048).unwrap()
228 }
229}
230
231#[derive(Debug, Clone, Copy)]
233struct LogMapping {
234 gamma: f64,
235 multiplier: f64,
236 min_indexable_value: f64,
237 index_offset: f64,
238}
239
240impl LogMapping {
241 fn new(gamma: f64, offset: f64) -> Option<Self> {
242 if gamma <= 1.0 {
243 return None;
244 }
245 let multiplier = Self::multiplier_from_gamma(gamma);
246 Some(Self {
247 gamma,
248 multiplier,
249 min_indexable_value: max(
250 f64::MIN_POSITIVE * gamma,
252 ((i32::MIN as f64 - offset) / multiplier + 1.0).exp(),
254 )?,
255 index_offset: offset,
256 })
257 }
258
259 fn multiplier_from_gamma(gamma: f64) -> f64 {
261 1.0 / gamma.ln()
262 }
263
264 fn relative_accuracy(&self) -> f64 {
266 1.0 - 2.0 / (1.0 + self.gamma)
267 }
268
269 fn index(&self, value: f64) -> i32 {
271 (value.ln() * self.multiplier + self.index_offset).floor() as i32
272 }
273
274 fn value(&self, index: i32) -> f64 {
276 ((index as f64 - self.index_offset) / self.multiplier).exp()
277 * (1.0 + self.relative_accuracy())
278 }
279}
280
281impl Default for LogMapping {
282 fn default() -> Self {
283 const RELATIVE_ACCURACY: f64 = 0.007751937984496124;
284 const GAMMA: f64 = (1.0 + RELATIVE_ACCURACY) / (1.0 - RELATIVE_ACCURACY);
285
286 const BACKEND_SKETCH_MIN_VALUE: f64 = 1e-9;
287 let offset: f64 = (1.0 - (BACKEND_SKETCH_MIN_VALUE.ln() / GAMMA.ln()).floor()) + 0.5;
289
290 #[allow(clippy::unwrap_used)]
291 Self::new(GAMMA, offset).unwrap()
292 }
293}
294
295fn max(a: f64, b: f64) -> Option<f64> {
296 if a.is_nan() || b.is_nan() {
297 None
298 } else if a > b {
299 Some(a)
300 } else {
301 Some(b)
302 }
303}
304
305#[cfg(test)]
306mod test {
307 use prost::Message;
308
309 use super::*;
310
311 macro_rules! assert_within {
312 ($x:expr, $y:expr, $tolerance:expr) => {
313 let diff = $x - $y;
314 assert!(
315 -$tolerance < diff && diff < $tolerance,
316 "x: {} y: {}",
317 $x,
318 $y,
319 );
320 };
321 }
322
323 #[test]
324 fn test_exponential_mapping_within_tolerances() {
325 let mapping = LogMapping::default();
326
327 let values: &[f64] = &[1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
328 for &value in values {
329 let index = mapping.index(value);
330 let value_bucket = mapping.value(index);
331
332 assert_within!(value_bucket / value, 1.0, 0.01);
333 }
334 }
335
336 #[test]
337 fn test_exponential_mapping_relative_accuracy() {
338 let mapping = LogMapping::default();
339
340 assert_within!(
341 mapping.relative_accuracy(),
342 0.007751937984496138,
343 f64::EPSILON
344 );
345 }
346
347 #[test]
348 fn test_sketch_add() {
349 let mut sketch = DDSketch::default();
350 let points: &[f64] = &[0.0, 1e-5, 0.1, 2.0, 10.0, 25.0, 10000.0];
351 for (i, &point) in points.iter().enumerate() {
352 assert!(sketch.add_with_count(point, i as f64 + 1.0).is_ok());
353 }
354
355 dbg!(sketch.store.bins.len(), sketch.store.offset);
356
357 for (i, (value, count)) in sketch
358 .ordered_bins()
359 .into_iter()
360 .filter(|(_, p)| *p != 0.0)
361 .enumerate()
362 {
363 if points[i] == 0.0 {
364 assert_within!(value, 0.0, f64::EPSILON);
365 assert_within!(count, i as f64 + 1.0, f64::EPSILON);
366 } else {
367 assert_within!(value / points[i], 1.0, 0.01);
368 assert_within!(count, i as f64 + 1.0, f64::EPSILON);
369 }
370 }
371 }
372
373 #[test]
374 fn test_skecth_add_negative() {
375 let mut sketch = DDSketch::default();
376 assert!(sketch.add(-1.0).is_err());
377 }
378
379 #[test]
380 fn test_skecth_add_nan() {
381 let mut sketch = DDSketch::default();
382 assert!(sketch.add(f64::NAN).is_err());
383 }
384
385 #[test]
386 fn test_sketch_count_add_with_count() {
387 let mut sketch = DDSketch::default();
388 assert_within!(sketch.count(), 0.0, f64::EPSILON);
389
390 let points: &[f64] = &[0.0, 1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
391 for (i, &point) in points.iter().enumerate() {
392 assert!(sketch.add_with_count(point, i as f64).is_ok());
393 }
394
395 assert_within!(sketch.count(), 21.0, f64::EPSILON);
396 }
397
398 #[test]
399 fn test_sketch_count_add() {
400 let mut sketch = DDSketch::default();
401 assert_within!(sketch.count(), 0.0, f64::EPSILON);
402
403 let points: &[f64] = &[0.0, 1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
404 for &point in points.iter() {
405 assert!(sketch.add(point).is_ok());
406 }
407 assert_within!(sketch.count(), 7.0, f64::EPSILON);
408
409 assert!(sketch.add(1.0).is_ok());
410 assert_within!(sketch.count(), 8.0, f64::EPSILON);
411
412 for n in 0..100 {
413 assert!(sketch.add(n as f64).is_ok());
414 }
415 assert_within!(sketch.count(), 108.0, f64::EPSILON);
416 }
417
418 #[test]
419 fn test_skecth_encode() {
420 let mut sketch = DDSketch::default();
421 let points: &[f64] = &[0.0, 1e-30, 0.1, 2.0, 10.0, 25.0, 10000.0];
422 for (i, &point) in points.iter().enumerate() {
423 assert!(sketch.add_with_count(point, i as f64).is_ok());
424 }
425
426 let pb_sketch = sketch.into_pb().encode_to_vec();
427 assert!(!pb_sketch.is_empty());
428 }
429
430 #[test]
431 fn test_low_collapsing_store() {
432 let mut store = LowCollapsingDenseStore::new(5).unwrap();
433
434 for i in 0..5 {
436 *store.bin_mut(i + 10) = 1.0;
437 }
438 for (i, b) in store.bins().enumerate() {
439 assert_eq!(b.0, i as i32 + 10);
440 assert_eq!(b.1, 1.0)
441 }
442
443 for i in 0..5 {
445 *store.bin_mut(i + 10) += 1.0;
446 }
447 for (i, b) in store.bins().enumerate() {
448 assert_eq!(b.0, i as i32 + 10);
449 assert_eq!(b.1, 2.0)
450 }
451 }
452
453 #[test]
454 fn test_low_collapsing_store_low_bins_are_collapsed() {
455 let mut store = LowCollapsingDenseStore::new(5).unwrap();
456
457 for i in 0..5 {
459 *store.bin_mut(i + 10) = 1.0;
460 }
461
462 for i in 0..3 {
464 *store.bin_mut(i) += 1.0;
465 }
466 for (i, b) in store.bins().enumerate() {
467 assert_eq!(b.0, i as i32 + 10);
468 if i == 0 {
469 assert_eq!(b.1, 4.0)
470 } else {
471 assert_eq!(b.1, 1.0)
472 }
473 }
474
475 *store.bin_mut(15) = 1.0;
477 for (i, b) in store.bins().enumerate() {
478 assert_eq!(b.0, i as i32 + 11);
479 if i == 0 {
480 assert_eq!(b.1, 5.0)
481 } else {
482 assert_eq!(b.1, 1.0)
483 }
484 }
485 }
486
487 #[test]
488 fn test_low_collapsing_store_up_expansion() {
489 let mut store = LowCollapsingDenseStore::new(3).unwrap();
490
491 *store.bin_mut(1) = 1.0;
492 *store.bin_mut(3) = 1.0;
493 assert_eq!(
494 store.bins().collect::<Vec<_>>(),
495 &[(1, 1.0), (2, 0.0), (3, 1.0)]
496 )
497 }
498
499 #[test]
500 fn test_low_collapsing_store_down_expansion() {
501 let mut store = LowCollapsingDenseStore::new(3).unwrap();
502
503 *store.bin_mut(3) = 1.0;
504 *store.bin_mut(1) = 1.0;
505 assert_eq!(
506 store.bins().collect::<Vec<_>>(),
507 &[(1, 1.0), (2, 0.0), (3, 1.0)]
508 )
509 }
510}