sketches_rust/
sketch.rs

1use crate::error::Error;
2use crate::index_mapping::IndexMappingLayout::{LogCubic, LOG};
3use crate::index_mapping::{IndexMapping, IndexMappingLayout};
4use crate::input::Input;
5use crate::output::Output;
6use crate::serde;
7use crate::store::{
8    BinEncodingMode, CollapsingHighestDenseStore, CollapsingLowestDenseStore, Store,
9    UnboundedSizeDenseStore,
10};
11
12pub struct DDSketch {
13    index_mapping: IndexMapping,
14    min_indexed_value: f64,
15    max_indexed_value: f64,
16    negative_value_store: Box<dyn Store>,
17    positive_value_store: Box<dyn Store>,
18    zero_count: f64,
19}
20
21#[derive(PartialEq)]
22pub struct Flag {
23    marker: u8,
24}
25
26pub enum FlagType {
27    SketchFeatures = 0b00,
28    PositiveStore = 0b01,
29    IndexMapping = 0b10,
30    NegativeStore = 0b11,
31}
32
33impl DDSketch {
34    pub fn accept(&mut self, value: f64) {
35        self.accept_with_count(value, 1.0);
36    }
37
38    pub fn accept_with_count(&mut self, value: f64, count: f64) {
39        if count < 0.0 {
40            return;
41        }
42
43        if value < -self.max_indexed_value || value > self.max_indexed_value {
44            return;
45        }
46
47        if value > self.min_indexed_value {
48            self.positive_value_store
49                .add(self.index_mapping.index(value), 1.0);
50        } else if value < -self.min_indexed_value {
51            self.negative_value_store
52                .add(self.index_mapping.index(-value), 1.0);
53        } else {
54            self.zero_count += 1.0;
55        }
56    }
57
58    pub fn is_empty(&self) -> bool {
59        self.zero_count == 0.0
60            && self.negative_value_store.is_empty()
61            && self.positive_value_store.is_empty()
62    }
63
64    pub fn clear(&mut self) {
65        self.negative_value_store.clear();
66        self.positive_value_store.clear();
67        self.zero_count = 0.0;
68    }
69
70    pub fn get_count(&mut self) -> f64 {
71        self.zero_count
72            + self.negative_value_store.get_total_count()
73            + self.positive_value_store.get_total_count()
74    }
75
76    pub fn get_sum(&mut self) -> Option<f64> {
77        let count = self.get_count();
78        if count <= 0.0 {
79            return None;
80        }
81
82        let mut sum = 0.0;
83        sum -= self.negative_value_store.get_sum(&self.index_mapping);
84        sum += self.positive_value_store.get_sum(&self.index_mapping);
85
86        Some(sum)
87    }
88
89    pub fn get_max(&mut self) -> Option<f64> {
90        if !self.positive_value_store.is_empty() {
91            Some(
92                self.index_mapping
93                    .value(self.positive_value_store.get_max_index()),
94            )
95        } else if self.zero_count > 0.0 {
96            Some(0.0)
97        } else if !self.negative_value_store.is_empty() {
98            Some(
99                -self
100                    .index_mapping
101                    .value(self.negative_value_store.get_min_index()),
102            )
103        } else {
104            None
105        }
106    }
107
108    pub fn get_min(&mut self) -> Option<f64> {
109        if !self.negative_value_store.is_empty() {
110            Some(
111                -self
112                    .index_mapping
113                    .value(self.negative_value_store.get_max_index()),
114            )
115        } else if self.zero_count > 0.0 {
116            Some(0.0)
117        } else if !self.positive_value_store.is_empty() {
118            Some(
119                self.index_mapping
120                    .value(self.positive_value_store.get_min_index()),
121            )
122        } else {
123            None
124        }
125    }
126
127    pub fn get_average(&mut self) -> Option<f64> {
128        let count = self.get_count();
129        if count <= 0.0 {
130            return None;
131        }
132        Some(self.get_sum()? / count)
133    }
134
135    pub fn get_value_at_quantile(self: &mut DDSketch, quantile: f64) -> Option<f64> {
136        if !(0.0..=1.0).contains(&quantile) {
137            return None;
138        }
139
140        let count = self.get_count();
141        if count <= 0.0 {
142            return None;
143        }
144
145        let rank = quantile * (count - 1.0);
146
147        let mut n: f64 = 0.0;
148
149        let negative_bin_iterator = self.negative_value_store.get_descending_iter();
150        for bin in negative_bin_iterator {
151            n += bin.1;
152            if n > rank {
153                return Some(-self.index_mapping.value(bin.0));
154            }
155        }
156
157        n += self.zero_count;
158        if n > rank {
159            return Some(0.0);
160        }
161
162        let positive_bin_iterator = self.positive_value_store.get_ascending_iter();
163        for bin in positive_bin_iterator {
164            n += bin.1;
165            if n > rank {
166                return Some(self.index_mapping.value(bin.0));
167            }
168        }
169
170        None
171    }
172
173    pub fn decode_and_merge_with(&mut self, bytes: &Vec<u8>) -> Result<(), Error> {
174        let mut input = Input::wrap(bytes);
175        while input.has_remaining() {
176            let flag = Flag::decode(&mut input)?;
177            let flag_type = flag.get_type()?;
178            match flag_type {
179                FlagType::PositiveStore => {
180                    let mode = BinEncodingMode::of_flag(flag.get_marker())?;
181                    self.positive_value_store
182                        .decode_and_merge_with(&mut input, mode)?;
183                }
184                FlagType::NegativeStore => {
185                    let mode = BinEncodingMode::of_flag(flag.get_marker())?;
186                    self.negative_value_store
187                        .decode_and_merge_with(&mut input, mode)?;
188                }
189                FlagType::IndexMapping => {
190                    let layout = IndexMappingLayout::of_flag(&flag)?;
191                    let gamma = input.read_double_le()?;
192                    let index_offset = input.read_double_le()?;
193                    let decoded_index_mapping =
194                        IndexMapping::with_gamma_offset(layout, gamma, index_offset)?;
195                    if self.index_mapping != decoded_index_mapping {
196                        return Err(Error::InvalidArgument("Unmatched IndexMapping"));
197                    }
198                }
199                FlagType::SketchFeatures => {
200                    if Flag::ZERO_COUNT == flag {
201                        self.zero_count += serde::decode_var_double(&mut input)?;
202                    } else {
203                        serde::ignore_exact_summary_statistic_flags(&mut input, flag)?;
204                    }
205                }
206            }
207        }
208        Ok(())
209    }
210
211    pub fn merge_with(&mut self, other: &DDSketch) -> Result<(), Error> {
212        if self.index_mapping != other.index_mapping {
213            return Err(Error::InvalidArgument("Unmatched indexMapping."));
214        }
215        self.negative_value_store
216            .merge_with(other.negative_value_store.get_descending_stream());
217        self.positive_value_store
218            .merge_with(other.positive_value_store.get_descending_stream());
219        self.zero_count += other.zero_count;
220        Ok(())
221    }
222
223    pub fn encode(&self) -> Result<Vec<u8>, Error> {
224        let mut output = Output::with_capacity(64);
225        self.index_mapping.encode(&mut output)?;
226
227        if self.zero_count != 0.0 {
228            Flag::ZERO_COUNT.encode(&mut output)?;
229            serde::encode_var_double(&mut output, self.zero_count)?;
230        }
231
232        self.positive_value_store
233            .encode(&mut output, FlagType::PositiveStore)?;
234        self.negative_value_store
235            .encode(&mut output, FlagType::NegativeStore)?;
236
237        Ok(output.trim())
238    }
239
240    pub fn decode(bytes: &Vec<u8>) -> Result<DDSketch, Error> {
241        let mut input = Input::wrap(bytes);
242        let mut positive_value_store = UnboundedSizeDenseStore::new();
243        let mut negative_value_store = UnboundedSizeDenseStore::new();
244        let mut index_mapping = None;
245        let mut zero_count = 0.0;
246        while input.has_remaining() {
247            let flag = Flag::decode(&mut input)?;
248            let flag_type = flag.get_type()?;
249            match flag_type {
250                FlagType::PositiveStore => {
251                    let mode = BinEncodingMode::of_flag(flag.get_marker())?;
252                    positive_value_store.decode_and_merge_with(&mut input, mode)?;
253                }
254                FlagType::NegativeStore => {
255                    let mode = BinEncodingMode::of_flag(flag.get_marker())?;
256                    negative_value_store.decode_and_merge_with(&mut input, mode)?;
257                }
258                FlagType::IndexMapping => {
259                    let layout = IndexMappingLayout::of_flag(&flag)?;
260                    let gamma = input.read_double_le()?;
261                    let index_offset = input.read_double_le()?;
262                    index_mapping = Some(IndexMapping::with_gamma_offset(
263                        layout,
264                        gamma,
265                        index_offset,
266                    )?);
267                }
268                FlagType::SketchFeatures => {
269                    if Flag::ZERO_COUNT == flag {
270                        zero_count += serde::decode_var_double(&mut input)?;
271                    } else {
272                        serde::ignore_exact_summary_statistic_flags(&mut input, flag)?;
273                    }
274                }
275            }
276        }
277
278        match index_mapping {
279            Some(mapping) => {
280                let min_indexed_value = f64::max(0.0, mapping.min_indexable_value());
281                let max_indexed_value = mapping.max_indexable_value();
282                Ok(DDSketch {
283                    index_mapping: mapping,
284                    negative_value_store: Box::new(negative_value_store),
285                    positive_value_store: Box::new(positive_value_store),
286                    min_indexed_value,
287                    max_indexed_value,
288                    zero_count,
289                })
290            }
291            None => Err(Error::InvalidArgument("No IndexMapping decoded")),
292        }
293    }
294}
295
296// factory methods
297impl DDSketch {
298    pub fn collapsing_lowest_dense(
299        relative_accuracy: f64,
300        max_num_bins: usize,
301    ) -> Result<DDSketch, Error> {
302        let index_mapping = IndexMapping::with_relative_accuracy(LogCubic, relative_accuracy)?;
303        let negative_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
304        let positive_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
305        let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
306        let max_indexed_value = index_mapping.max_indexable_value();
307        let zero_count = 0.0;
308
309        Ok(DDSketch {
310            index_mapping,
311            negative_value_store: Box::new(negative_value_store),
312            positive_value_store: Box::new(positive_value_store),
313            min_indexed_value,
314            max_indexed_value,
315            zero_count,
316        })
317    }
318
319    pub fn collapsing_highest_dense(
320        relative_accuracy: f64,
321        max_num_bins: usize,
322    ) -> Result<DDSketch, Error> {
323        let index_mapping = IndexMapping::with_relative_accuracy(LogCubic, relative_accuracy)?;
324        let negative_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
325        let positive_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
326        let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
327        let max_indexed_value = index_mapping.max_indexable_value();
328        let zero_count = 0.0;
329        Ok(DDSketch {
330            index_mapping,
331            negative_value_store: Box::new(negative_value_store),
332            positive_value_store: Box::new(positive_value_store),
333            min_indexed_value,
334            max_indexed_value,
335            zero_count,
336        })
337    }
338
339    pub fn unbounded_dense(relative_accuracy: f64) -> Result<DDSketch, Error> {
340        let index_mapping = IndexMapping::with_relative_accuracy(LogCubic, relative_accuracy)?;
341        let negative_value_store = UnboundedSizeDenseStore::new();
342        let positive_value_store = UnboundedSizeDenseStore::new();
343        let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
344        let max_indexed_value = index_mapping.max_indexable_value();
345        let zero_count = 0.0;
346        Ok(DDSketch {
347            index_mapping,
348            negative_value_store: Box::new(negative_value_store),
349            positive_value_store: Box::new(positive_value_store),
350            min_indexed_value,
351            max_indexed_value,
352            zero_count,
353        })
354    }
355
356    pub fn logarithmic_collapsing_lowest_dense(
357        relative_accuracy: f64,
358        max_num_bins: usize,
359    ) -> Result<DDSketch, Error> {
360        let index_mapping = IndexMapping::with_relative_accuracy(LOG, relative_accuracy)?;
361        let negative_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
362        let positive_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
363        let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
364        let max_indexed_value = index_mapping.max_indexable_value();
365        let zero_count = 0.0;
366        Ok(DDSketch {
367            index_mapping,
368            negative_value_store: Box::new(negative_value_store),
369            positive_value_store: Box::new(positive_value_store),
370            min_indexed_value,
371            max_indexed_value,
372            zero_count,
373        })
374    }
375
376    pub fn logarithmic_collapsing_highest_dense(
377        relative_accuracy: f64,
378        max_num_bins: usize,
379    ) -> Result<DDSketch, Error> {
380        let index_mapping = IndexMapping::with_relative_accuracy(LOG, relative_accuracy)?;
381        let negative_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
382        let positive_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
383        let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
384        let max_indexed_value = index_mapping.max_indexable_value();
385        let zero_count = 0.0;
386        Ok(DDSketch {
387            index_mapping,
388            negative_value_store: Box::new(negative_value_store),
389            positive_value_store: Box::new(positive_value_store),
390            min_indexed_value,
391            max_indexed_value,
392            zero_count,
393        })
394    }
395
396    pub fn logarithmic_unbounded_size_dense_store(
397        relative_accuracy: f64,
398    ) -> Result<DDSketch, Error> {
399        let index_mapping = IndexMapping::with_relative_accuracy(LOG, relative_accuracy)?;
400        let negative_value_store = UnboundedSizeDenseStore::new();
401        let positive_value_store = UnboundedSizeDenseStore::new();
402        let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
403        let max_indexed_value = index_mapping.max_indexable_value();
404        let zero_count = 0.0;
405        Ok(DDSketch {
406            index_mapping,
407            negative_value_store: Box::new(negative_value_store),
408            positive_value_store: Box::new(positive_value_store),
409            min_indexed_value,
410            max_indexed_value,
411            zero_count,
412        })
413    }
414}
415
416impl Flag {
417    pub const ZERO_COUNT: Flag = Flag::with_type(FlagType::SketchFeatures, 1);
418    pub const COUNT: Flag = Flag::with_type(FlagType::SketchFeatures, 0x28);
419    pub const SUM: Flag = Flag::with_type(FlagType::SketchFeatures, 0x21);
420    pub const MIN: Flag = Flag::with_type(FlagType::SketchFeatures, 0x22);
421    pub const MAX: Flag = Flag::with_type(FlagType::SketchFeatures, 0x23);
422
423    pub const fn new(marker: u8) -> Flag {
424        Flag { marker }
425    }
426
427    pub fn decode(input: &mut Input) -> Result<Flag, Error> {
428        let marker = input.read_byte()?;
429        Ok(Flag::new(marker))
430    }
431
432    pub fn get_type(&self) -> Result<FlagType, Error> {
433        FlagType::value_of(self.marker & 3)
434    }
435
436    pub fn get_marker(&self) -> u8 {
437        self.marker
438    }
439
440    pub fn encode(&self, output: &mut Output) -> Result<(), Error> {
441        output.write_byte(self.marker)
442    }
443
444    pub const fn with_type(flag_type: FlagType, sub_flag: u8) -> Flag {
445        let t = flag_type as u8;
446        Flag::new(t | (sub_flag << 2))
447    }
448}
449
450impl FlagType {
451    pub fn value_of(t: u8) -> Result<FlagType, Error> {
452        match t {
453            0b00 => Ok(FlagType::SketchFeatures),
454            0b01 => Ok(FlagType::PositiveStore),
455            0b10 => Ok(FlagType::IndexMapping),
456            0b11 => Ok(FlagType::NegativeStore),
457            _ => Err(Error::InvalidArgument("Unknown FlagType.")),
458        }
459    }
460}