1use crate::error::Error;
2use crate::index_mapping::IndexMappingLayout::{LogCubic, LOG};
3use crate::index_mapping::{IndexMapping, IndexMappingLayout};
4use crate::input::Input;
5use crate::output::Output;
6use crate::serde;
7use crate::store::{
8 BinEncodingMode, CollapsingHighestDenseStore, CollapsingLowestDenseStore, Store,
9 UnboundedSizeDenseStore,
10};
11
12pub struct DDSketch {
13 index_mapping: IndexMapping,
14 min_indexed_value: f64,
15 max_indexed_value: f64,
16 negative_value_store: Box<dyn Store>,
17 positive_value_store: Box<dyn Store>,
18 zero_count: f64,
19}
20
21#[derive(PartialEq)]
22pub struct Flag {
23 marker: u8,
24}
25
26pub enum FlagType {
27 SketchFeatures = 0b00,
28 PositiveStore = 0b01,
29 IndexMapping = 0b10,
30 NegativeStore = 0b11,
31}
32
33impl DDSketch {
34 pub fn accept(&mut self, value: f64) {
35 self.accept_with_count(value, 1.0);
36 }
37
38 pub fn accept_with_count(&mut self, value: f64, count: f64) {
39 if count < 0.0 {
40 return;
41 }
42
43 if value < -self.max_indexed_value || value > self.max_indexed_value {
44 return;
45 }
46
47 if value > self.min_indexed_value {
48 self.positive_value_store
49 .add(self.index_mapping.index(value), 1.0);
50 } else if value < -self.min_indexed_value {
51 self.negative_value_store
52 .add(self.index_mapping.index(-value), 1.0);
53 } else {
54 self.zero_count += 1.0;
55 }
56 }
57
58 pub fn is_empty(&self) -> bool {
59 self.zero_count == 0.0
60 && self.negative_value_store.is_empty()
61 && self.positive_value_store.is_empty()
62 }
63
64 pub fn clear(&mut self) {
65 self.negative_value_store.clear();
66 self.positive_value_store.clear();
67 self.zero_count = 0.0;
68 }
69
70 pub fn get_count(&mut self) -> f64 {
71 self.zero_count
72 + self.negative_value_store.get_total_count()
73 + self.positive_value_store.get_total_count()
74 }
75
76 pub fn get_sum(&mut self) -> Option<f64> {
77 let count = self.get_count();
78 if count <= 0.0 {
79 return None;
80 }
81
82 let mut sum = 0.0;
83 sum -= self.negative_value_store.get_sum(&self.index_mapping);
84 sum += self.positive_value_store.get_sum(&self.index_mapping);
85
86 Some(sum)
87 }
88
89 pub fn get_max(&mut self) -> Option<f64> {
90 if !self.positive_value_store.is_empty() {
91 Some(
92 self.index_mapping
93 .value(self.positive_value_store.get_max_index()),
94 )
95 } else if self.zero_count > 0.0 {
96 Some(0.0)
97 } else if !self.negative_value_store.is_empty() {
98 Some(
99 -self
100 .index_mapping
101 .value(self.negative_value_store.get_min_index()),
102 )
103 } else {
104 None
105 }
106 }
107
108 pub fn get_min(&mut self) -> Option<f64> {
109 if !self.negative_value_store.is_empty() {
110 Some(
111 -self
112 .index_mapping
113 .value(self.negative_value_store.get_max_index()),
114 )
115 } else if self.zero_count > 0.0 {
116 Some(0.0)
117 } else if !self.positive_value_store.is_empty() {
118 Some(
119 self.index_mapping
120 .value(self.positive_value_store.get_min_index()),
121 )
122 } else {
123 None
124 }
125 }
126
127 pub fn get_average(&mut self) -> Option<f64> {
128 let count = self.get_count();
129 if count <= 0.0 {
130 return None;
131 }
132 Some(self.get_sum()? / count)
133 }
134
135 pub fn get_value_at_quantile(self: &mut DDSketch, quantile: f64) -> Option<f64> {
136 if !(0.0..=1.0).contains(&quantile) {
137 return None;
138 }
139
140 let count = self.get_count();
141 if count <= 0.0 {
142 return None;
143 }
144
145 let rank = quantile * (count - 1.0);
146
147 let mut n: f64 = 0.0;
148
149 let negative_bin_iterator = self.negative_value_store.get_descending_iter();
150 for bin in negative_bin_iterator {
151 n += bin.1;
152 if n > rank {
153 return Some(-self.index_mapping.value(bin.0));
154 }
155 }
156
157 n += self.zero_count;
158 if n > rank {
159 return Some(0.0);
160 }
161
162 let positive_bin_iterator = self.positive_value_store.get_ascending_iter();
163 for bin in positive_bin_iterator {
164 n += bin.1;
165 if n > rank {
166 return Some(self.index_mapping.value(bin.0));
167 }
168 }
169
170 None
171 }
172
173 pub fn decode_and_merge_with(&mut self, bytes: &Vec<u8>) -> Result<(), Error> {
174 let mut input = Input::wrap(bytes);
175 while input.has_remaining() {
176 let flag = Flag::decode(&mut input)?;
177 let flag_type = flag.get_type()?;
178 match flag_type {
179 FlagType::PositiveStore => {
180 let mode = BinEncodingMode::of_flag(flag.get_marker())?;
181 self.positive_value_store
182 .decode_and_merge_with(&mut input, mode)?;
183 }
184 FlagType::NegativeStore => {
185 let mode = BinEncodingMode::of_flag(flag.get_marker())?;
186 self.negative_value_store
187 .decode_and_merge_with(&mut input, mode)?;
188 }
189 FlagType::IndexMapping => {
190 let layout = IndexMappingLayout::of_flag(&flag)?;
191 let gamma = input.read_double_le()?;
192 let index_offset = input.read_double_le()?;
193 let decoded_index_mapping =
194 IndexMapping::with_gamma_offset(layout, gamma, index_offset)?;
195 if self.index_mapping != decoded_index_mapping {
196 return Err(Error::InvalidArgument("Unmatched IndexMapping"));
197 }
198 }
199 FlagType::SketchFeatures => {
200 if Flag::ZERO_COUNT == flag {
201 self.zero_count += serde::decode_var_double(&mut input)?;
202 } else {
203 serde::ignore_exact_summary_statistic_flags(&mut input, flag)?;
204 }
205 }
206 }
207 }
208 Ok(())
209 }
210
211 pub fn merge_with(&mut self, other: &DDSketch) -> Result<(), Error> {
212 if self.index_mapping != other.index_mapping {
213 return Err(Error::InvalidArgument("Unmatched indexMapping."));
214 }
215 self.negative_value_store
216 .merge_with(other.negative_value_store.get_descending_stream());
217 self.positive_value_store
218 .merge_with(other.positive_value_store.get_descending_stream());
219 self.zero_count += other.zero_count;
220 Ok(())
221 }
222
223 pub fn encode(&self) -> Result<Vec<u8>, Error> {
224 let mut output = Output::with_capacity(64);
225 self.index_mapping.encode(&mut output)?;
226
227 if self.zero_count != 0.0 {
228 Flag::ZERO_COUNT.encode(&mut output)?;
229 serde::encode_var_double(&mut output, self.zero_count)?;
230 }
231
232 self.positive_value_store
233 .encode(&mut output, FlagType::PositiveStore)?;
234 self.negative_value_store
235 .encode(&mut output, FlagType::NegativeStore)?;
236
237 Ok(output.trim())
238 }
239
240 pub fn decode(bytes: &Vec<u8>) -> Result<DDSketch, Error> {
241 let mut input = Input::wrap(bytes);
242 let mut positive_value_store = UnboundedSizeDenseStore::new();
243 let mut negative_value_store = UnboundedSizeDenseStore::new();
244 let mut index_mapping = None;
245 let mut zero_count = 0.0;
246 while input.has_remaining() {
247 let flag = Flag::decode(&mut input)?;
248 let flag_type = flag.get_type()?;
249 match flag_type {
250 FlagType::PositiveStore => {
251 let mode = BinEncodingMode::of_flag(flag.get_marker())?;
252 positive_value_store.decode_and_merge_with(&mut input, mode)?;
253 }
254 FlagType::NegativeStore => {
255 let mode = BinEncodingMode::of_flag(flag.get_marker())?;
256 negative_value_store.decode_and_merge_with(&mut input, mode)?;
257 }
258 FlagType::IndexMapping => {
259 let layout = IndexMappingLayout::of_flag(&flag)?;
260 let gamma = input.read_double_le()?;
261 let index_offset = input.read_double_le()?;
262 index_mapping = Some(IndexMapping::with_gamma_offset(
263 layout,
264 gamma,
265 index_offset,
266 )?);
267 }
268 FlagType::SketchFeatures => {
269 if Flag::ZERO_COUNT == flag {
270 zero_count += serde::decode_var_double(&mut input)?;
271 } else {
272 serde::ignore_exact_summary_statistic_flags(&mut input, flag)?;
273 }
274 }
275 }
276 }
277
278 match index_mapping {
279 Some(mapping) => {
280 let min_indexed_value = f64::max(0.0, mapping.min_indexable_value());
281 let max_indexed_value = mapping.max_indexable_value();
282 Ok(DDSketch {
283 index_mapping: mapping,
284 negative_value_store: Box::new(negative_value_store),
285 positive_value_store: Box::new(positive_value_store),
286 min_indexed_value,
287 max_indexed_value,
288 zero_count,
289 })
290 }
291 None => Err(Error::InvalidArgument("No IndexMapping decoded")),
292 }
293 }
294}
295
296impl DDSketch {
298 pub fn collapsing_lowest_dense(
299 relative_accuracy: f64,
300 max_num_bins: usize,
301 ) -> Result<DDSketch, Error> {
302 let index_mapping = IndexMapping::with_relative_accuracy(LogCubic, relative_accuracy)?;
303 let negative_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
304 let positive_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
305 let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
306 let max_indexed_value = index_mapping.max_indexable_value();
307 let zero_count = 0.0;
308
309 Ok(DDSketch {
310 index_mapping,
311 negative_value_store: Box::new(negative_value_store),
312 positive_value_store: Box::new(positive_value_store),
313 min_indexed_value,
314 max_indexed_value,
315 zero_count,
316 })
317 }
318
319 pub fn collapsing_highest_dense(
320 relative_accuracy: f64,
321 max_num_bins: usize,
322 ) -> Result<DDSketch, Error> {
323 let index_mapping = IndexMapping::with_relative_accuracy(LogCubic, relative_accuracy)?;
324 let negative_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
325 let positive_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
326 let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
327 let max_indexed_value = index_mapping.max_indexable_value();
328 let zero_count = 0.0;
329 Ok(DDSketch {
330 index_mapping,
331 negative_value_store: Box::new(negative_value_store),
332 positive_value_store: Box::new(positive_value_store),
333 min_indexed_value,
334 max_indexed_value,
335 zero_count,
336 })
337 }
338
339 pub fn unbounded_dense(relative_accuracy: f64) -> Result<DDSketch, Error> {
340 let index_mapping = IndexMapping::with_relative_accuracy(LogCubic, relative_accuracy)?;
341 let negative_value_store = UnboundedSizeDenseStore::new();
342 let positive_value_store = UnboundedSizeDenseStore::new();
343 let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
344 let max_indexed_value = index_mapping.max_indexable_value();
345 let zero_count = 0.0;
346 Ok(DDSketch {
347 index_mapping,
348 negative_value_store: Box::new(negative_value_store),
349 positive_value_store: Box::new(positive_value_store),
350 min_indexed_value,
351 max_indexed_value,
352 zero_count,
353 })
354 }
355
356 pub fn logarithmic_collapsing_lowest_dense(
357 relative_accuracy: f64,
358 max_num_bins: usize,
359 ) -> Result<DDSketch, Error> {
360 let index_mapping = IndexMapping::with_relative_accuracy(LOG, relative_accuracy)?;
361 let negative_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
362 let positive_value_store = CollapsingLowestDenseStore::with_capacity(max_num_bins)?;
363 let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
364 let max_indexed_value = index_mapping.max_indexable_value();
365 let zero_count = 0.0;
366 Ok(DDSketch {
367 index_mapping,
368 negative_value_store: Box::new(negative_value_store),
369 positive_value_store: Box::new(positive_value_store),
370 min_indexed_value,
371 max_indexed_value,
372 zero_count,
373 })
374 }
375
376 pub fn logarithmic_collapsing_highest_dense(
377 relative_accuracy: f64,
378 max_num_bins: usize,
379 ) -> Result<DDSketch, Error> {
380 let index_mapping = IndexMapping::with_relative_accuracy(LOG, relative_accuracy)?;
381 let negative_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
382 let positive_value_store = CollapsingHighestDenseStore::with_capacity(max_num_bins)?;
383 let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
384 let max_indexed_value = index_mapping.max_indexable_value();
385 let zero_count = 0.0;
386 Ok(DDSketch {
387 index_mapping,
388 negative_value_store: Box::new(negative_value_store),
389 positive_value_store: Box::new(positive_value_store),
390 min_indexed_value,
391 max_indexed_value,
392 zero_count,
393 })
394 }
395
396 pub fn logarithmic_unbounded_size_dense_store(
397 relative_accuracy: f64,
398 ) -> Result<DDSketch, Error> {
399 let index_mapping = IndexMapping::with_relative_accuracy(LOG, relative_accuracy)?;
400 let negative_value_store = UnboundedSizeDenseStore::new();
401 let positive_value_store = UnboundedSizeDenseStore::new();
402 let min_indexed_value = f64::max(0.0, index_mapping.min_indexable_value());
403 let max_indexed_value = index_mapping.max_indexable_value();
404 let zero_count = 0.0;
405 Ok(DDSketch {
406 index_mapping,
407 negative_value_store: Box::new(negative_value_store),
408 positive_value_store: Box::new(positive_value_store),
409 min_indexed_value,
410 max_indexed_value,
411 zero_count,
412 })
413 }
414}
415
416impl Flag {
417 pub const ZERO_COUNT: Flag = Flag::with_type(FlagType::SketchFeatures, 1);
418 pub const COUNT: Flag = Flag::with_type(FlagType::SketchFeatures, 0x28);
419 pub const SUM: Flag = Flag::with_type(FlagType::SketchFeatures, 0x21);
420 pub const MIN: Flag = Flag::with_type(FlagType::SketchFeatures, 0x22);
421 pub const MAX: Flag = Flag::with_type(FlagType::SketchFeatures, 0x23);
422
423 pub const fn new(marker: u8) -> Flag {
424 Flag { marker }
425 }
426
427 pub fn decode(input: &mut Input) -> Result<Flag, Error> {
428 let marker = input.read_byte()?;
429 Ok(Flag::new(marker))
430 }
431
432 pub fn get_type(&self) -> Result<FlagType, Error> {
433 FlagType::value_of(self.marker & 3)
434 }
435
436 pub fn get_marker(&self) -> u8 {
437 self.marker
438 }
439
440 pub fn encode(&self, output: &mut Output) -> Result<(), Error> {
441 output.write_byte(self.marker)
442 }
443
444 pub const fn with_type(flag_type: FlagType, sub_flag: u8) -> Flag {
445 let t = flag_type as u8;
446 Flag::new(t | (sub_flag << 2))
447 }
448}
449
450impl FlagType {
451 pub fn value_of(t: u8) -> Result<FlagType, Error> {
452 match t {
453 0b00 => Ok(FlagType::SketchFeatures),
454 0b01 => Ok(FlagType::PositiveStore),
455 0b10 => Ok(FlagType::IndexMapping),
456 0b11 => Ok(FlagType::NegativeStore),
457 _ => Err(Error::InvalidArgument("Unknown FlagType.")),
458 }
459 }
460}