1use std::cmp::Ordering;
45
46#[derive(Debug, Clone)]
47struct RankInfo<T>
48where
49 T: Clone,
50{
51 val: T,
52 rmin: i64,
53 rmax: i64,
54}
55
56impl<T> RankInfo<T>
57where
58 T: Clone,
59{
60 fn new(val: T, rmin: i64, rmax: i64) -> Self {
61 RankInfo { val, rmin, rmax }
62 }
63}
64
65impl<T: Clone + Ord> Ord for RankInfo<T> {
66 fn cmp(&self, other: &Self) -> Ordering {
67 self.val.cmp(&other.val)
68 }
69}
70
71impl<T: Clone + Ord> PartialOrd for RankInfo<T> {
72 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73 self.val.partial_cmp(&other.val)
74 }
75}
76
77impl<T: Clone + PartialEq> PartialEq for RankInfo<T> {
78 fn eq(&self, other: &Self) -> bool {
79 self.val == other.val
80 }
81}
82
83impl<T: Clone + PartialEq> Eq for RankInfo<T> {}
84
85#[derive(Clone)]
86pub struct FixedSizeEpsilonSummary<T>
87where
88 T: Clone + Ord,
89{
90 epsilon: f64,
91 b: usize,
92 level: usize,
93 cnt: usize,
94 s: Vec<Vec<RankInfo<T>>>,
95 cached_s_m: Option<Vec<RankInfo<T>>>,
96}
97
98impl<T> FixedSizeEpsilonSummary<T>
99where
100 T: Clone + Ord,
101{
102 pub fn new(n: usize, epsilon: f64) -> Self {
103 if n == 0 {
104 panic!("n should be greater than 0")
105 }
106
107 if epsilon <= 0.0 {
108 panic!("epsilon should be greater than 0");
109 }
110
111 let epsilon_n: f64 = (n as f64) * epsilon;
114 let number_of_levels = if epsilon_n > 1.0 {
115 (epsilon_n as f64).log2().ceil() as usize
116 } else {
117 1
118 };
119 let block_size = if number_of_levels > 1 {
120 (epsilon_n.log2() / epsilon).floor() as usize
121 } else {
122 n + 1
123 };
124
125 let mut s = vec![vec![]; number_of_levels];
126 s[0].reserve_exact(block_size);
127
128 FixedSizeEpsilonSummary {
129 epsilon,
130 b: block_size,
131 level: number_of_levels,
132 cnt: 0,
133 s,
134 cached_s_m: None,
135 }
136 }
137
138 pub fn update(&mut self, e: T) {
139 self.cached_s_m = None;
140 let rank_info = RankInfo::new(e, 0, 0);
141 self.s[0].push(rank_info);
142
143 self.cnt += 1;
144 if self.s[0].len() < self.b {
145 return;
146 }
147
148 self.s[0].sort();
149 for (i, r) in self.s[0].iter_mut().enumerate() {
150 r.rmin = i as i64;
151 r.rmax = i as i64;
152 }
153
154 let compressed_size = self.b / 2;
155 let mut s_c = compress(self.s[0].clone(), compressed_size, self.epsilon);
156
157 self.s[0].clear();
158 for k in 1..self.level {
159 if self.s[k].is_empty() {
160 self.s[k] = s_c;
161 break;
162 } else {
163 let t = merge(s_c, &self.s[k]);
164 s_c = compress(t, compressed_size, self.epsilon);
165 self.s[k].clear();
166 }
167 }
168 }
169
170 pub fn query(&mut self, r: f64) -> T {
171 if self.cached_s_m.is_none() {
172 self.s[0].sort();
173 for (i, r) in self.s[0].iter_mut().enumerate() {
174 r.rmin = i as i64;
175 r.rmax = i as i64;
176 }
177
178 let mut s_m = self.s[0].clone();
179 for i in 1..self.level {
180 s_m = merge(s_m, &self.s[i])
181 }
182
183 self.cached_s_m = Some(s_m);
184 }
185
186 let rank: i64 = ((self.cnt as f64) * r).floor() as i64;
187 let epsilon_n: i64 = ((self.cnt as f64) * self.epsilon).floor() as i64;
188 if let Some(e) = find_idx(&self.cached_s_m.as_ref().unwrap(), rank, epsilon_n) {
189 e
190 } else {
191 self.cached_s_m.as_ref().unwrap().last().unwrap().val.clone()
192 }
193 }
194
195 fn calc_s_m(&mut self, epsilon: f64) -> Vec<RankInfo<T>> {
196 self.s[0].sort();
197 for (i, r) in self.s[0].iter_mut().enumerate() {
198 r.rmin = i as i64;
199 r.rmax = i as i64;
200 }
201
202 let mut s_m = self.s[0].clone();
203 for i in 1..self.level {
204 s_m = merge(s_m, &self.s[i])
205 }
206
207 compress(s_m, self.b, epsilon)
208 }
209
210 fn finalize(&mut self, epsilon: f64) {
211 let s_m = self.calc_s_m(epsilon);
212 self.s.clear();
213 self.s.push(s_m);
214 }
215
216 #[inline]
217 pub fn size(&self) -> usize {
218 self.cnt
219 }
220}
221
222fn merge<T: Clone + Ord>(s_a: Vec<RankInfo<T>>, s_b: &[RankInfo<T>]) -> Vec<RankInfo<T>> {
223 if s_a.is_empty() {
224 return s_b.to_vec();
225 }
226
227 if s_b.is_empty() {
228 return s_a;
229 }
230
231 let mut s_m = Vec::with_capacity(s_a.len() + s_b.len());
232
233 let mut i1 = 0;
234 let mut i2 = 0;
235 let mut from;
236
237 while i1 < s_a.len() || i2 < s_b.len() {
238 let val;
239 let rmin;
240 let rmax;
241
242 if i1 < s_a.len() && i2 < s_b.len() {
243 if s_a[i1].val < s_b[i2].val {
244 val = s_a[i1].val.clone();
245 from = 1;
246 } else {
247 val = s_b[i2].val.clone();
248 from = 2;
249 }
250 } else if i1 < s_a.len() && i2 >= s_b.len() {
251 val = s_a[i1].val.clone();
252 from = 1;
253 } else {
254 val = s_b[i2].val.clone();
255 from = 2;
256 }
257
258 if from == 1 {
259 if 0 < i2 && i2 < s_b.len() {
260 rmin = s_a[i1].rmin + s_b[i2 - 1].rmin;
261 rmax = s_a[i1].rmax + s_b[i2].rmax - 1;
262 } else if i2 == 0 {
263 rmin = s_a[i1].rmin;
264 rmax = s_a[i1].rmax + s_b[i2].rmax - 1;
265 } else {
266 rmin = s_a[i1].rmin + s_b[i2 - 1].rmin;
267 rmax = s_a[i1].rmax + s_b[i2 - 1].rmax;
268 }
269
270 i1 += 1;
271 } else {
272 if 0 < i1 && i1 < s_a.len() {
273 rmin = s_a[i1 - 1].rmin + s_b[i2].rmin;
274 rmax = s_a[i1].rmax + s_b[i2].rmax - 1;
275 } else if i1 == 0 {
276 rmin = s_b[i2].rmin;
277 rmax = s_a[i1].rmax + s_b[i2].rmax - 1;
278 } else {
279 rmin = s_a[i1 - 1].rmin + s_b[i2].rmin;
280 rmax = s_a[i1 - 1].rmax + s_b[i2].rmax;
281 }
282
283 i2 += 1;
284 }
285
286 let rank_info = RankInfo::new(val, rmin, rmax);
287 s_m.push(rank_info);
288 }
289
290 s_m
291}
292
293fn compress<T: Clone>(mut s0: Vec<RankInfo<T>>, block_size: usize, epsilon: f64) -> Vec<RankInfo<T>> {
294 let mut s0_range = 0;
295 let mut e: f64 = 0.0;
296
297 for r in &s0 {
298 if s0_range < r.rmax {
299 s0_range = r.rmax;
300 }
301
302 if (r.rmax - r.rmin) as f64 > e {
303 e = (r.rmax - r.rmin) as f64;
304 }
305 }
306
307 let epsilon_n: f64 = epsilon * (s0_range as f64);
308 assert!(2.0 * epsilon_n >= e, "precision condition violated.");
309
310 let mut i = 0;
311 let mut j = 0;
312 let mut k = 0;
313 let n = s0.len();
314 while i <= block_size && j < n {
315 let r = ((i as f64) * (s0_range as f64) / (block_size as f64)).floor() as i64;
316
317 while j < n {
318 if s0[j].rmax >= r {
319 break;
320 }
321
322 j += 1;
323 }
324
325 assert!(j < n, "unable to find the summary with precision given.");
326 s0[k] = s0[j].clone();
327 k += 1;
328 j += 1;
329 i += 1;
330 }
331
332 s0.truncate(k);
333 s0
334}
335
336#[inline]
337#[allow(clippy::many_single_char_names)]
338#[allow(clippy::comparison_chain)]
339fn is_boundary(x: usize, boundaries: &[usize; 32]) -> Option<usize> {
340 let mut l = 0;
341 let mut r = 31;
342
343 while l < r {
344 let m = l + (r - l) / 2;
345 if boundaries[m] < x {
346 l = m + 1;
347 } else {
348 r = m;
349 }
350 }
351
352 if l < 31 && boundaries[l] == x {
353 return Some(l);
354 }
355
356 None
357}
358
359fn find_idx<T: Clone + Ord>(s_m: &[RankInfo<T>], rank: i64, epsilon_n: i64) -> Option<T> {
360 let mut l = 0usize;
361 let mut r = s_m.len() - 1;
362 while l < r {
363 let m = l + (r - l) / 2;
364 if s_m[m].rmin < rank {
365 l = m + 1;
366 } else if s_m[m].rmax > rank {
367 r = m;
368 } else {
369 r = m;
370 }
371 }
372
373 while l < s_m.len() && s_m[l].rmin >= rank - epsilon_n {
374 if s_m[l].rmax <= rank + epsilon_n {
375 return Some(s_m[l].val.clone());
376 }
377
378 l += 1;
379 }
380
381 None
382}
383
384pub struct UnboundEpsilonSummary<T>
385where
386 T: Clone + Ord,
387{
388 epsilon: f64,
389 cnt: usize,
390 s: Vec<FixedSizeEpsilonSummary<T>>,
391 s_c: FixedSizeEpsilonSummary<T>,
392 boundaries: [usize; 32],
393 cached_s_m: Option<Vec<RankInfo<T>>>,
394}
395
396impl<T> UnboundEpsilonSummary<T>
397where
398 T: Clone + Ord + std::fmt::Debug,
399{
400 pub fn new(epsilon: f64) -> Self {
401 if epsilon <= 0.0 {
402 panic!("epsilon should be greater than 0");
403 }
404
405 let s = vec![];
406
407 let n = (1.0_f64 / epsilon).floor() as usize;
408 let s_c = FixedSizeEpsilonSummary::new(n, epsilon / 2.0);
409 let mut boundaries: [usize; 32] = [0; 32];
410 for i in 0..32usize {
411 let boundary = (((usize::pow(2, i as u32) - 1) as f64) / epsilon).floor() as usize;
412 boundaries[i] = boundary
413 }
414
415 UnboundEpsilonSummary {
416 epsilon,
417 cnt: 0,
418 s,
419 s_c,
420 boundaries,
421 cached_s_m: None,
422 }
423 }
424
425 pub fn update(&mut self, e: T) {
426 self.cached_s_m = None;
427 self.s_c.update(e);
428
429 if let Some(x) = is_boundary(self.cnt + 1, &self.boundaries) {
430 self.s_c.finalize(self.epsilon / 2.0);
431
432 let upper_bound = self.boundaries[x + x];
433 let n = upper_bound - self.cnt - 1;
434 let mut summary = FixedSizeEpsilonSummary::new(n, self.epsilon / 2.0);
435 std::mem::swap(&mut self.s_c, &mut summary);
436
437 self.s.push(summary);
438 }
439 self.cnt += 1;
440 }
441
442 pub fn query(&mut self, r: f64) -> T {
443 if self.cached_s_m.is_none() {
444 let mut s_m = self.s_c.calc_s_m(self.epsilon / 2.0);
445 for i in 0..self.s.len() {
446 for j in 0..self.s[i].s.len() {
447 s_m = merge(s_m, &self.s[i].s[j])
448 }
449 }
450
451 self.cached_s_m = Some(s_m);
452 }
453
454 let rank: i64 = ((self.cnt as f64) * r).floor() as i64;
455 let epsilon_n: i64 = ((self.cnt as f64) * self.epsilon).floor() as i64;
456 if let Some(e) = find_idx(&self.cached_s_m.as_ref().unwrap(), rank, epsilon_n) {
457 e
458 } else {
459 self.cached_s_m.as_ref().unwrap().last().unwrap().val.clone()
460 }
461 }
462
463 #[inline]
464 pub fn size(&self) -> usize {
465 self.cnt
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472 use rand_distr::Distribution;
473
474 #[global_allocator]
476 static GLOBAL: &stats_alloc::StatsAlloc<std::alloc::System> = &stats_alloc::INSTRUMENTED_SYSTEM;
477
478 #[test]
479 fn test_merge_and_compress() {
480 let mut s0 = Vec::with_capacity(4);
481 let mut s1 = Vec::with_capacity(4);
482
483 s0.push(RankInfo::new(2, 1, 1));
484 s0.push(RankInfo::new(4, 3, 4));
485 s0.push(RankInfo::new(8, 5, 6));
486 s0.push(RankInfo::new(17, 8, 8));
487
488 s1.push(RankInfo::new(1, 1, 1));
489 s1.push(RankInfo::new(7, 3, 3));
490 s1.push(RankInfo::new(12, 5, 6));
491 s1.push(RankInfo::new(15, 8, 8));
492
493 let merged = merge(s0, &s1);
494
495 assert_eq!(merged.len(), 8);
496 let merged_vals: Vec<i32> = merged.iter().map(|x| x.val).collect();
497 let merged_rmins: Vec<i64> = merged.iter().map(|x| x.rmin).collect();
498 let merged_rmaxs: Vec<i64> = merged.iter().map(|x| x.rmax).collect();
499 assert_eq!(merged_vals, vec![1, 2, 4, 7, 8, 12, 15, 17]);
500 assert_eq!(merged_rmins, vec![1, 2, 4, 6, 8, 10, 13, 16]);
501 assert_eq!(merged_rmaxs, vec![1, 3, 6, 8, 11, 13, 15, 16]);
502
503 let epsilon: f64 = 0.2;
504 let compressed = compress(merged, 4, epsilon);
505 let compressed_vals: Vec<i32> = compressed.iter().map(|x| x.val).collect();
506 assert_eq!(compressed_vals, vec![1, 4, 7, 12, 17]);
507 }
508
509 #[test]
510 #[should_panic]
511 fn test_panic_on_negative_epsilon_when_constructing_fixedsize_summary() {
512 let _ = FixedSizeEpsilonSummary::<usize>::new(100, -0.1);
513 }
514
515 #[test]
516 #[should_panic]
517 fn test_panic_on_negative_epsilon_when_constructing_unbound_summary() {
518 let _ = UnboundEpsilonSummary::<usize>::new(-0.1);
519 }
520
521 #[test]
522 fn test_query_unbound_summary_with_insufficient_values() {
523 let epsilon = 0.1;
524 let n = 3;
525 let mut s = FixedSizeEpsilonSummary::new(n, epsilon);
526 for i in 1..=n {
527 s.update(i);
528 }
529
530 let rank: f64 = 1.0;
531 let ans = s.query(rank);
532 assert!(n == ans);
533 }
534
535 #[test]
536 fn test_query_with_small_n_on_fixedsize_summary() {
537 let epsilon = 0.1;
538 let n = 10;
539 let mut s = FixedSizeEpsilonSummary::new(n, epsilon);
540 for i in 1..=n {
541 s.update(i);
542 }
543
544 for i in 1..=n {
545 let rank: f64 = ((i - 1) as f64) / (n as f64);
546 let ans = s.query(rank);
547 assert!(i == ans);
548 }
549 }
550
551 #[test]
552 fn test_query_with_small_n_on_unbound_summary() {
553 let epsilon = 0.1;
554 let n = 10;
555 let mut s = UnboundEpsilonSummary::new(epsilon);
556 for i in 1..=n {
557 s.update(i);
558 }
559
560 for i in 1..=n {
561 let rank: f64 = ((i - 1) as f64) / (n as f64);
562 let ans = s.query(rank);
563 assert!(i == ans);
564 }
565 }
566
567 #[test]
568 fn test_normal_distribution_generated_seq_on_fixed_summary() {
569 let n = 1000000;
570 let epsilon: f64 = 0.01;
571 let mut s = FixedSizeEpsilonSummary::new(n, epsilon);
572
573 let dn = rand_distr::Normal::new(0.5f64, 0.2f64).unwrap();
574 let mut normal_rng = rand_pcg::Pcg64::new(0xcafef00dd15ea5e5, 0xa02bdbf7bb3c0a7ac28fa16a64abf96);
575
576 let startmem = stats_alloc::Region::new(&GLOBAL);
577 let mut records = Vec::with_capacity(n);
578 for _ in 0..n {
579 let x = dn.sample(&mut normal_rng);
580 records.push(unsafe { ordered_float::NotNan::new_unchecked(x) });
581 }
582 let mem = startmem.change();
583 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
584
585 println!("{}k", bytes_change / 1024);
586
587 records.sort_by(|a, b| a.partial_cmp(b).unwrap());
588
589 let startmem = stats_alloc::Region::new(&GLOBAL);
590 for i in 0..n {
591 s.update(records[i]);
592 }
593 let mem = startmem.change();
594 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
595
596 println!("{}k", bytes_change / 1024);
597
598 let quantile_estimated = s.query(0.5);
599 assert!((quantile_estimated - records[n / 2]).abs() < 0.01);
600
601 let quantile_estimated = s.query(0.0);
602 assert!((quantile_estimated - records[0]).abs() < 0.1);
603
604 let quantile_estimated = s.query(0.99);
605 assert!((quantile_estimated - records[n * 99 / 100]).abs() < 0.01);
606
607 let quantile_estimated = s.query(1.0);
608 assert!((quantile_estimated - records[n - 1]).abs() < 0.01);
609 }
610
611 #[test]
612 fn test_pareto_distribution_generated_seq_on_fixed_summary() {
613 let n = 1000000;
614 let epsilon: f64 = 0.001;
615 let mut s = FixedSizeEpsilonSummary::new(n, epsilon);
616
617 let dn = rand_distr::Pareto::new(5f64, 10f64).unwrap();
618 let mut normal_rng = rand_pcg::Pcg64::new(0xcafef00dd15ea5e5, 0xa02bdbf7bb3c0a7ac28fa16a64abf96);
619
620 let startmem = stats_alloc::Region::new(&GLOBAL);
621 let mut records = Vec::with_capacity(n);
622 for _ in 0..n {
623 let x = dn.sample(&mut normal_rng);
624 records.push(unsafe { ordered_float::NotNan::new_unchecked(x) });
625 }
626 let mem = startmem.change();
627 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
628
629 println!("{}k", bytes_change / 1024);
630
631 records.sort_by(|a, b| a.partial_cmp(b).unwrap());
632
633 let startmem = stats_alloc::Region::new(&GLOBAL);
634 for i in 0..n {
635 s.update(records[i]);
636 }
637 let mem = startmem.change();
638 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
639
640 println!("{}k", bytes_change / 1024);
641
642 let quantile_estimated = s.query(0.5);
643 assert!((quantile_estimated - records[n / 2]).abs() < 0.01);
644
645 let quantile_estimated = s.query(0.0);
646 assert!((quantile_estimated - records[0]).abs() < 0.01);
647
648 let quantile_estimated = s.query(0.99);
649 assert!((quantile_estimated - records[n * 99 / 100]).abs() < 0.01);
650
651 let quantile_estimated = s.query(1.0);
652 assert!((quantile_estimated - records[n - 1]).abs() < 0.01);
653 }
654
655 #[test]
656 fn test_normal_distribution_generated_seq_on_unbound_summary() {
657 let n = 1000000;
658 let epsilon: f64 = 0.01;
659 let mut s = UnboundEpsilonSummary::new(epsilon);
660
661 let dn = rand_distr::Normal::new(0.5f64, 0.2f64).unwrap();
662 let mut normal_rng = rand_pcg::Pcg64::new(0xcafef00dd15ea5e5, 0xa02bdbf7bb3c0a7ac28fa16a64abf96);
663
664 let startmem = stats_alloc::Region::new(&GLOBAL);
665 let mut records = Vec::with_capacity(n);
666 for _ in 0..n {
667 let x = dn.sample(&mut normal_rng);
668 records.push(unsafe { ordered_float::NotNan::new_unchecked(x) });
669 }
670 let mem = startmem.change();
671 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
672
673 println!("{}k", bytes_change / 1024);
674
675 records.sort_by(|a, b| a.partial_cmp(b).unwrap());
676
677 let startmem = stats_alloc::Region::new(&GLOBAL);
678 for i in 0..n {
679 s.update(records[i]);
680 }
681 let mem = startmem.change();
682 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
683
684 println!("{}k", bytes_change / 1024);
685
686 let quantile_estimated = s.query(0.5);
687 assert!((quantile_estimated - records[n / 2]).abs() < 0.01);
688
689 let quantile_estimated = s.query(0.0);
690 assert!((quantile_estimated - records[0]).abs() < 0.01);
691
692 let quantile_estimated = s.query(0.99);
693 assert!((quantile_estimated - records[n * 99 / 100]).abs() < 0.01);
694
695 let quantile_estimated = s.query(1.0);
696 assert!((quantile_estimated - records[n - 1]).abs() < 0.01);
697 }
698
699 #[test]
700 fn test_pareto_distribution_generated_seq_on_unbound_summary() {
701 let n = 1000000;
702 let epsilon: f64 = 0.001;
703 let mut s = UnboundEpsilonSummary::new(epsilon);
704
705 let dn = rand_distr::Pareto::new(5f64, 10f64).unwrap();
706 let mut normal_rng = rand_pcg::Pcg64::new(0xcafef00dd15ea5e5, 0xa02bdbf7bb3c0a7ac28fa16a64abf96);
707
708 let startmem = stats_alloc::Region::new(&GLOBAL);
709 let mut records = Vec::with_capacity(n);
710 for _ in 0..n {
711 let x = dn.sample(&mut normal_rng);
712 records.push(unsafe { ordered_float::NotNan::new_unchecked(x) });
713 }
714 let mem = startmem.change();
715 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
716
717 println!("{}k", bytes_change / 1024);
718
719 records.sort_by(|a, b| a.partial_cmp(b).unwrap());
720
721 let startmem = stats_alloc::Region::new(&GLOBAL);
722 for i in 0..n {
723 s.update(records[i]);
724 }
725 let mem = startmem.change();
726 let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
727
728 println!("{}k", bytes_change / 1024);
729
730 let quantile_estimated = s.query(0.5);
731 assert!((quantile_estimated - records[n / 2]).abs() < 0.01);
732
733 let quantile_estimated = s.query(0.0);
734 assert!((quantile_estimated - records[0]).abs() < 0.01);
735
736 let quantile_estimated = s.query(0.99);
737 assert!((quantile_estimated - records[n * 99 / 100]).abs() < 0.01);
738
739 let quantile_estimated = s.query(1.0);
740 assert!((quantile_estimated - records[n - 1]).abs() < 0.01);
741 }
742}