reddb_server/storage/query/engine/
aggregates_extra.rs1use std::collections::VecDeque;
24
25use crate::storage::primitives::{HyperLogLog, TDigest};
26
27#[derive(Debug, Clone)]
30pub struct UniqAggregator {
31 hll: HyperLogLog,
32}
33
34impl UniqAggregator {
35 pub fn new(_precision: u8) -> Self {
40 Self {
41 hll: HyperLogLog::new(),
42 }
43 }
44
45 pub fn add(&mut self, bytes: &[u8]) {
46 self.hll.add(bytes);
47 }
48
49 pub fn add_str(&mut self, s: &str) {
50 self.hll.add(s.as_bytes());
51 }
52
53 pub fn add_i64(&mut self, v: i64) {
54 self.hll.add(&v.to_le_bytes());
55 }
56
57 pub fn merge(&mut self, other: &Self) {
58 self.hll.merge(&other.hll);
59 }
60
61 pub fn estimate(&self) -> u64 {
63 self.hll.count()
64 }
65}
66
67#[derive(Debug, Clone, Default)]
70pub struct QuantileTDigestAggregator {
71 digest: TDigest,
72}
73
74impl QuantileTDigestAggregator {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn add(&mut self, value: f64) {
80 self.digest.add(value);
81 }
82
83 pub fn merge(&mut self, other: &Self) {
84 self.digest.merge(&other.digest);
85 }
86
87 pub fn quantile(&self, q: f64) -> f64 {
89 self.digest.quantile(q)
90 }
91}
92
93#[derive(Debug, Clone, Default)]
96pub struct CovarianceAggregator {
97 n: u64,
98 mean_x: f64,
99 mean_y: f64,
100 c2: f64,
102 m2_x: f64,
104 m2_y: f64,
106}
107
108impl CovarianceAggregator {
109 pub fn new() -> Self {
110 Self::default()
111 }
112
113 pub fn add(&mut self, x: f64, y: f64) {
114 if !x.is_finite() || !y.is_finite() {
115 return;
116 }
117 self.n += 1;
118 let dx = x - self.mean_x;
119 self.mean_x += dx / self.n as f64;
120 let dx2 = x - self.mean_x;
121 self.m2_x += dx * dx2;
122 let dy = y - self.mean_y;
123 self.mean_y += dy / self.n as f64;
124 let dy2 = y - self.mean_y;
125 self.m2_y += dy * dy2;
126 self.c2 += dx * dy2;
127 }
128
129 pub fn covar_pop(&self) -> f64 {
131 if self.n == 0 {
132 return 0.0;
133 }
134 self.c2 / self.n as f64
135 }
136
137 pub fn covar_samp(&self) -> f64 {
139 if self.n < 2 {
140 return 0.0;
141 }
142 self.c2 / (self.n - 1) as f64
143 }
144
145 pub fn corr(&self) -> f64 {
148 if self.n < 2 {
149 return f64::NAN;
150 }
151 let denom = (self.m2_x * self.m2_y).sqrt();
152 if denom <= 0.0 {
153 return f64::NAN;
154 }
155 self.c2 / denom
156 }
157
158 pub fn merge(&mut self, other: &Self) {
161 if other.n == 0 {
162 return;
163 }
164 if self.n == 0 {
165 *self = other.clone();
166 return;
167 }
168 let n = self.n + other.n;
169 let delta_x = other.mean_x - self.mean_x;
170 let delta_y = other.mean_y - self.mean_y;
171 let new_mean_x = self.mean_x + delta_x * (other.n as f64 / n as f64);
172 let new_mean_y = self.mean_y + delta_y * (other.n as f64 / n as f64);
173 self.m2_x += other.m2_x + delta_x * delta_x * (self.n as f64 * other.n as f64 / n as f64);
174 self.m2_y += other.m2_y + delta_y * delta_y * (self.n as f64 * other.n as f64 / n as f64);
175 self.c2 += other.c2 + delta_x * delta_y * (self.n as f64 * other.n as f64 / n as f64);
176 self.mean_x = new_mean_x;
177 self.mean_y = new_mean_y;
178 self.n = n;
179 }
180}
181
182#[derive(Debug, Clone, Default)]
184pub struct CountIfAggregator {
185 count: u64,
186}
187
188impl CountIfAggregator {
189 pub fn new() -> Self {
190 Self::default()
191 }
192
193 pub fn add(&mut self, cond: bool) {
194 if cond {
195 self.count += 1;
196 }
197 }
198
199 pub fn merge(&mut self, other: &Self) {
200 self.count += other.count;
201 }
202
203 pub fn finalize(&self) -> u64 {
204 self.count
205 }
206}
207
208#[derive(Debug, Clone, Default)]
210pub struct SumAvgIfAggregator {
211 sum: f64,
212 count: u64,
213}
214
215impl SumAvgIfAggregator {
216 pub fn new() -> Self {
217 Self::default()
218 }
219
220 pub fn add(&mut self, value: f64, cond: bool) {
221 if cond && value.is_finite() {
222 self.sum += value;
223 self.count += 1;
224 }
225 }
226
227 pub fn merge(&mut self, other: &Self) {
228 self.sum += other.sum;
229 self.count += other.count;
230 }
231
232 pub fn sum(&self) -> f64 {
233 self.sum
234 }
235
236 pub fn avg(&self) -> f64 {
237 if self.count == 0 {
238 0.0
239 } else {
240 self.sum / self.count as f64
241 }
242 }
243
244 pub fn count(&self) -> u64 {
245 self.count
246 }
247}
248
249#[derive(Debug, Clone, Default)]
253pub struct AnyAggregator<T: Clone> {
254 first: Option<T>,
255}
256
257impl<T: Clone> AnyAggregator<T> {
258 pub fn new() -> Self {
259 Self { first: None }
260 }
261
262 pub fn add(&mut self, value: T) {
263 if self.first.is_none() {
264 self.first = Some(value);
265 }
266 }
267
268 pub fn merge(&mut self, other: &Self) {
269 if self.first.is_none() {
270 if let Some(v) = &other.first {
271 self.first = Some(v.clone());
272 }
273 }
274 }
275
276 pub fn finalize(&self) -> Option<T> {
277 self.first.clone()
278 }
279}
280
281#[derive(Debug, Clone, Default)]
283pub struct AnyLastAggregator<T: Clone> {
284 last: Option<T>,
285}
286
287impl<T: Clone> AnyLastAggregator<T> {
288 pub fn new() -> Self {
289 Self { last: None }
290 }
291
292 pub fn add(&mut self, value: T) {
293 self.last = Some(value);
294 }
295
296 pub fn merge(&mut self, other: &Self) {
297 if let Some(v) = &other.last {
298 self.last = Some(v.clone());
299 }
300 }
301
302 pub fn finalize(&self) -> Option<T> {
303 self.last.clone()
304 }
305}
306
307#[derive(Debug, Clone)]
309pub struct GroupArrayAggregator<T: Clone> {
310 limit: usize,
311 buffer: VecDeque<T>,
312}
313
314impl<T: Clone> GroupArrayAggregator<T> {
315 pub fn new(limit: usize) -> Self {
318 Self {
319 limit,
320 buffer: VecDeque::new(),
321 }
322 }
323
324 pub fn add(&mut self, value: T) {
325 if self.limit != 0 && self.buffer.len() >= self.limit {
326 return;
327 }
328 self.buffer.push_back(value);
329 }
330
331 pub fn merge(&mut self, other: &Self) {
332 for v in &other.buffer {
333 if self.limit != 0 && self.buffer.len() >= self.limit {
334 break;
335 }
336 self.buffer.push_back(v.clone());
337 }
338 }
339
340 pub fn finalize(&self) -> Vec<T> {
341 self.buffer.iter().cloned().collect()
342 }
343
344 pub fn len(&self) -> usize {
345 self.buffer.len()
346 }
347
348 pub fn is_empty(&self) -> bool {
349 self.buffer.is_empty()
350 }
351}
352
353pub fn array_join<T: Clone>(arr: &[T]) -> impl Iterator<Item = T> + '_ {
357 arr.iter().cloned()
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn uniq_estimates_roughly_distinct_count() {
366 let mut u = UniqAggregator::new(12);
367 for i in 0..10_000 {
368 u.add_i64(i);
369 }
370 let est = u.estimate();
371 let err = ((est as f64 - 10_000.0).abs() / 10_000.0) * 100.0;
372 assert!(err < 5.0, "uniq error {err}% est={est}");
373 }
374
375 #[test]
376 fn uniq_merges_two_sets_without_double_counting_overlap() {
377 let mut a = UniqAggregator::new(12);
378 let mut b = UniqAggregator::new(12);
379 for i in 0..5000 {
380 a.add_i64(i);
381 }
382 for i in 2500..7500 {
383 b.add_i64(i);
384 }
385 a.merge(&b);
386 let est = a.estimate();
387 let err = ((est as f64 - 7500.0).abs() / 7500.0) * 100.0;
388 assert!(err < 5.0, "uniq merge error {err}% est={est}");
389 }
390
391 #[test]
392 fn quantile_tdigest_agrees_with_sorted_median_within_mvp_tolerance() {
393 let mut q = QuantileTDigestAggregator::new();
396 for i in 0..10_000 {
397 q.add(i as f64);
398 }
399 let m = q.quantile(0.5);
400 assert!(m > 2000.0 && m < 8000.0, "median was {m}");
401 }
402
403 #[test]
404 fn corr_is_one_on_perfect_positive_line() {
405 let mut c = CovarianceAggregator::new();
406 for i in 0..100 {
407 c.add(i as f64, 2.0 * i as f64 + 3.0);
408 }
409 let r = c.corr();
410 assert!((r - 1.0).abs() < 1e-9, "corr = {r}");
411 }
412
413 #[test]
414 fn corr_is_negative_on_inverse_line() {
415 let mut c = CovarianceAggregator::new();
416 for i in 0..100 {
417 c.add(i as f64, -3.0 * i as f64 + 5.0);
418 }
419 let r = c.corr();
420 assert!((r + 1.0).abs() < 1e-9, "corr = {r}");
421 }
422
423 #[test]
424 fn covar_pop_vs_covar_samp_relationship() {
425 let mut c = CovarianceAggregator::new();
426 for (x, y) in [(1.0, 2.0), (2.0, 4.0), (3.0, 6.0), (4.0, 8.0)] {
427 c.add(x, y);
428 }
429 let pop = c.covar_pop();
430 let samp = c.covar_samp();
431 assert!(samp > pop);
433 assert!((samp - pop * 4.0 / 3.0).abs() < 1e-9);
434 }
435
436 #[test]
437 fn merge_combines_parallel_aggregators() {
438 let mut left = CovarianceAggregator::new();
439 let mut right = CovarianceAggregator::new();
440 for i in 0..50 {
441 left.add(i as f64, (i * 2) as f64);
442 }
443 for i in 50..100 {
444 right.add(i as f64, (i * 2) as f64);
445 }
446 let mut full = CovarianceAggregator::new();
447 for i in 0..100 {
448 full.add(i as f64, (i * 2) as f64);
449 }
450 left.merge(&right);
451 assert!((left.corr() - full.corr()).abs() < 1e-9);
452 assert!((left.covar_pop() - full.covar_pop()).abs() < 1e-9);
453 }
454
455 #[test]
456 fn count_if_only_counts_truthy_rows() {
457 let mut c = CountIfAggregator::new();
458 for i in 0..20 {
459 c.add(i % 3 == 0);
460 }
461 assert_eq!(c.finalize(), 7); }
463
464 #[test]
465 fn sum_if_skips_non_finite_values() {
466 let mut s = SumAvgIfAggregator::new();
467 s.add(1.0, true);
468 s.add(f64::NAN, true);
469 s.add(2.0, false);
470 s.add(3.0, true);
471 assert_eq!(s.sum(), 4.0);
472 assert_eq!(s.count(), 2);
473 assert_eq!(s.avg(), 2.0);
474 }
475
476 #[test]
477 fn any_and_any_last_track_endpoints() {
478 let mut first = AnyAggregator::<i32>::new();
479 let mut last = AnyLastAggregator::<i32>::new();
480 for i in 0..10 {
481 first.add(i);
482 last.add(i);
483 }
484 assert_eq!(first.finalize(), Some(0));
485 assert_eq!(last.finalize(), Some(9));
486 }
487
488 #[test]
489 fn group_array_respects_limit() {
490 let mut g = GroupArrayAggregator::<i32>::new(3);
491 for i in 0..10 {
492 g.add(i);
493 }
494 assert_eq!(g.finalize(), vec![0, 1, 2]);
495 }
496
497 #[test]
498 fn group_array_unbounded_when_limit_zero() {
499 let mut g = GroupArrayAggregator::<i32>::new(0);
500 for i in 0..5 {
501 g.add(i);
502 }
503 assert_eq!(g.finalize(), vec![0, 1, 2, 3, 4]);
504 }
505
506 #[test]
507 fn array_join_flattens_vec() {
508 let v = vec![1, 2, 3];
509 let out: Vec<i32> = array_join(&v).collect();
510 assert_eq!(out, v);
511 }
512
513 #[test]
514 fn group_array_merge_respects_limit() {
515 let mut a = GroupArrayAggregator::<i32>::new(4);
516 let mut b = GroupArrayAggregator::<i32>::new(4);
517 a.add(1);
518 a.add(2);
519 b.add(3);
520 b.add(4);
521 b.add(5);
522 a.merge(&b);
523 assert_eq!(a.finalize(), vec![1, 2, 3, 4]);
524 }
525}