Skip to main content

reddb_server/storage/query/engine/
aggregates_extra.rs

1//! ClickHouse-parity aggregate functions (Track B4 sprint).
2//!
3//! These live in a self-contained helper module so the SQL dispatcher
4//! can adopt them incrementally: each aggregate is a small struct
5//! with `add`, `merge`, and `finalize`. When the planner wires them
6//! to `AggregateFunction::*` variants, nothing inside here changes.
7//!
8//! Algorithms favoured from the ClickHouse surface:
9//!
10//! * `uniq` / `uniqHLL12` — cardinality via HyperLogLog, reusing
11//!   [`crate::storage::primitives::HyperLogLog`].
12//! * `quantileTDigest` — [`crate::storage::primitives::TDigest`]
13//!   with ClickHouse-compatible 0.99/0.95/0.5 defaults.
14//! * `corr`, `covar_pop`, `covar_samp` — Welford-style numerically
15//!   stable two-variable accumulators.
16//! * `sum_if`, `avg_if`, `count_if` — conditional aggregation over
17//!   an arbitrary predicate callable.
18//! * `any`, `anyLast` — first / last observed value.
19//! * `groupArray(n)` — first `n` values as an ordered list.
20//! * `arrayJoin` — UNNEST helper that expands a Vec input into
21//!   flattened iteration.
22
23use std::collections::VecDeque;
24
25use crate::storage::primitives::{HyperLogLog, TDigest};
26
27/// Cardinality estimator. Thin wrapper over HLL keeping the API
28/// consistent with the other aggregators in this module.
29#[derive(Debug, Clone)]
30pub struct UniqAggregator {
31    hll: HyperLogLog,
32}
33
34impl UniqAggregator {
35    /// Underlying HLL uses fixed 2¹⁴ registers (~16 KB) with ~0.81%
36    /// standard error. The `precision` parameter is accepted for
37    /// ClickHouse-surface compatibility (`uniqHLL12` etc.) but
38    /// currently informational only.
39    pub fn new(_precision: u8) -> Self {
40        Self {
41            hll: HyperLogLog::new(),
42        }
43    }
44
45    pub fn add(&mut self, bytes: &[u8]) {
46        self.hll.add(bytes);
47    }
48
49    pub fn add_str(&mut self, s: &str) {
50        self.hll.add(s.as_bytes());
51    }
52
53    pub fn add_i64(&mut self, v: i64) {
54        self.hll.add(&v.to_le_bytes());
55    }
56
57    pub fn merge(&mut self, other: &Self) {
58        self.hll.merge(&other.hll);
59    }
60
61    /// Estimate of the distinct count observed so far.
62    pub fn estimate(&self) -> u64 {
63        self.hll.count()
64    }
65}
66
67/// Streaming median / percentile over a TDigest core. Accepts f64
68/// samples; callers cast from int before adding.
69#[derive(Debug, Clone, Default)]
70pub struct QuantileTDigestAggregator {
71    digest: TDigest,
72}
73
74impl QuantileTDigestAggregator {
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    pub fn add(&mut self, value: f64) {
80        self.digest.add(value);
81    }
82
83    pub fn merge(&mut self, other: &Self) {
84        self.digest.merge(&other.digest);
85    }
86
87    /// ClickHouse-compat: `q ∈ [0.0, 1.0]`. `quantile(0.5)` = median.
88    pub fn quantile(&self, q: f64) -> f64 {
89        self.digest.quantile(q)
90    }
91}
92
93/// Numerically stable two-variable accumulator backing
94/// `corr(x, y)` / `covar_pop(x, y)` / `covar_samp(x, y)`.
95#[derive(Debug, Clone, Default)]
96pub struct CovarianceAggregator {
97    n: u64,
98    mean_x: f64,
99    mean_y: f64,
100    /// Co-moment: Σ (x - mean_x)(y - mean_y).
101    c2: f64,
102    /// Σ (x - mean_x)²
103    m2_x: f64,
104    /// Σ (y - mean_y)²
105    m2_y: f64,
106}
107
108impl CovarianceAggregator {
109    pub fn new() -> Self {
110        Self::default()
111    }
112
113    pub fn add(&mut self, x: f64, y: f64) {
114        if !x.is_finite() || !y.is_finite() {
115            return;
116        }
117        self.n += 1;
118        let dx = x - self.mean_x;
119        self.mean_x += dx / self.n as f64;
120        let dx2 = x - self.mean_x;
121        self.m2_x += dx * dx2;
122        let dy = y - self.mean_y;
123        self.mean_y += dy / self.n as f64;
124        let dy2 = y - self.mean_y;
125        self.m2_y += dy * dy2;
126        self.c2 += dx * dy2;
127    }
128
129    /// Population covariance. Returns 0 when no samples.
130    pub fn covar_pop(&self) -> f64 {
131        if self.n == 0 {
132            return 0.0;
133        }
134        self.c2 / self.n as f64
135    }
136
137    /// Sample covariance. Returns 0 with fewer than 2 samples.
138    pub fn covar_samp(&self) -> f64 {
139        if self.n < 2 {
140            return 0.0;
141        }
142        self.c2 / (self.n - 1) as f64
143    }
144
145    /// Pearson correlation coefficient. Returns NaN when either
146    /// series has zero variance.
147    pub fn corr(&self) -> f64 {
148        if self.n < 2 {
149            return f64::NAN;
150        }
151        let denom = (self.m2_x * self.m2_y).sqrt();
152        if denom <= 0.0 {
153            return f64::NAN;
154        }
155        self.c2 / denom
156    }
157
158    /// Merge another partial accumulator in. Mirrors Welford's
159    /// parallel combination rule for both moments and the co-moment.
160    pub fn merge(&mut self, other: &Self) {
161        if other.n == 0 {
162            return;
163        }
164        if self.n == 0 {
165            *self = other.clone();
166            return;
167        }
168        let n = self.n + other.n;
169        let delta_x = other.mean_x - self.mean_x;
170        let delta_y = other.mean_y - self.mean_y;
171        let new_mean_x = self.mean_x + delta_x * (other.n as f64 / n as f64);
172        let new_mean_y = self.mean_y + delta_y * (other.n as f64 / n as f64);
173        self.m2_x += other.m2_x + delta_x * delta_x * (self.n as f64 * other.n as f64 / n as f64);
174        self.m2_y += other.m2_y + delta_y * delta_y * (self.n as f64 * other.n as f64 / n as f64);
175        self.c2 += other.c2 + delta_x * delta_y * (self.n as f64 * other.n as f64 / n as f64);
176        self.mean_x = new_mean_x;
177        self.mean_y = new_mean_y;
178        self.n = n;
179    }
180}
181
182/// `count_if(cond)` — count of truthy rows.
183#[derive(Debug, Clone, Default)]
184pub struct CountIfAggregator {
185    count: u64,
186}
187
188impl CountIfAggregator {
189    pub fn new() -> Self {
190        Self::default()
191    }
192
193    pub fn add(&mut self, cond: bool) {
194        if cond {
195            self.count += 1;
196        }
197    }
198
199    pub fn merge(&mut self, other: &Self) {
200        self.count += other.count;
201    }
202
203    pub fn finalize(&self) -> u64 {
204        self.count
205    }
206}
207
208/// `sum_if(x, cond)` / `avg_if(x, cond)`.
209#[derive(Debug, Clone, Default)]
210pub struct SumAvgIfAggregator {
211    sum: f64,
212    count: u64,
213}
214
215impl SumAvgIfAggregator {
216    pub fn new() -> Self {
217        Self::default()
218    }
219
220    pub fn add(&mut self, value: f64, cond: bool) {
221        if cond && value.is_finite() {
222            self.sum += value;
223            self.count += 1;
224        }
225    }
226
227    pub fn merge(&mut self, other: &Self) {
228        self.sum += other.sum;
229        self.count += other.count;
230    }
231
232    pub fn sum(&self) -> f64 {
233        self.sum
234    }
235
236    pub fn avg(&self) -> f64 {
237        if self.count == 0 {
238            0.0
239        } else {
240            self.sum / self.count as f64
241        }
242    }
243
244    pub fn count(&self) -> u64 {
245        self.count
246    }
247}
248
249/// `any(x)` — returns the first observed value. Deterministic
250/// ordering is caller responsibility (ClickHouse doesn't guarantee
251/// which row wins either).
252#[derive(Debug, Clone, Default)]
253pub struct AnyAggregator<T: Clone> {
254    first: Option<T>,
255}
256
257impl<T: Clone> AnyAggregator<T> {
258    pub fn new() -> Self {
259        Self { first: None }
260    }
261
262    pub fn add(&mut self, value: T) {
263        if self.first.is_none() {
264            self.first = Some(value);
265        }
266    }
267
268    pub fn merge(&mut self, other: &Self) {
269        if self.first.is_none() {
270            if let Some(v) = &other.first {
271                self.first = Some(v.clone());
272            }
273        }
274    }
275
276    pub fn finalize(&self) -> Option<T> {
277        self.first.clone()
278    }
279}
280
281/// `anyLast(x)` — returns the last observed value.
282#[derive(Debug, Clone, Default)]
283pub struct AnyLastAggregator<T: Clone> {
284    last: Option<T>,
285}
286
287impl<T: Clone> AnyLastAggregator<T> {
288    pub fn new() -> Self {
289        Self { last: None }
290    }
291
292    pub fn add(&mut self, value: T) {
293        self.last = Some(value);
294    }
295
296    pub fn merge(&mut self, other: &Self) {
297        if let Some(v) = &other.last {
298            self.last = Some(v.clone());
299        }
300    }
301
302    pub fn finalize(&self) -> Option<T> {
303        self.last.clone()
304    }
305}
306
307/// `groupArray(n)` — collects up to `n` values. Preserves order.
308#[derive(Debug, Clone)]
309pub struct GroupArrayAggregator<T: Clone> {
310    limit: usize,
311    buffer: VecDeque<T>,
312}
313
314impl<T: Clone> GroupArrayAggregator<T> {
315    /// Pass `0` for unbounded (careful — high-cardinality groups can
316    /// blow memory).
317    pub fn new(limit: usize) -> Self {
318        Self {
319            limit,
320            buffer: VecDeque::new(),
321        }
322    }
323
324    pub fn add(&mut self, value: T) {
325        if self.limit != 0 && self.buffer.len() >= self.limit {
326            return;
327        }
328        self.buffer.push_back(value);
329    }
330
331    pub fn merge(&mut self, other: &Self) {
332        for v in &other.buffer {
333            if self.limit != 0 && self.buffer.len() >= self.limit {
334                break;
335            }
336            self.buffer.push_back(v.clone());
337        }
338    }
339
340    pub fn finalize(&self) -> Vec<T> {
341        self.buffer.iter().cloned().collect()
342    }
343
344    pub fn len(&self) -> usize {
345        self.buffer.len()
346    }
347
348    pub fn is_empty(&self) -> bool {
349        self.buffer.is_empty()
350    }
351}
352
353/// `arrayJoin` — flattens a `Vec<T>` into an iterator. In SQL this
354/// fans a single row into N rows; callers iterate this inside their
355/// scan loop.
356pub fn array_join<T: Clone>(arr: &[T]) -> impl Iterator<Item = T> + '_ {
357    arr.iter().cloned()
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn uniq_estimates_roughly_distinct_count() {
366        let mut u = UniqAggregator::new(12);
367        for i in 0..10_000 {
368            u.add_i64(i);
369        }
370        let est = u.estimate();
371        let err = ((est as f64 - 10_000.0).abs() / 10_000.0) * 100.0;
372        assert!(err < 5.0, "uniq error {err}% est={est}");
373    }
374
375    #[test]
376    fn uniq_merges_two_sets_without_double_counting_overlap() {
377        let mut a = UniqAggregator::new(12);
378        let mut b = UniqAggregator::new(12);
379        for i in 0..5000 {
380            a.add_i64(i);
381        }
382        for i in 2500..7500 {
383            b.add_i64(i);
384        }
385        a.merge(&b);
386        let est = a.estimate();
387        let err = ((est as f64 - 7500.0).abs() / 7500.0) * 100.0;
388        assert!(err < 5.0, "uniq merge error {err}% est={est}");
389    }
390
391    #[test]
392    fn quantile_tdigest_agrees_with_sorted_median_within_mvp_tolerance() {
393        // See note on `TDigest::median_of_uniform_is_near_half` — MVP
394        // tolerance is wide; precision-tuning is follow-on.
395        let mut q = QuantileTDigestAggregator::new();
396        for i in 0..10_000 {
397            q.add(i as f64);
398        }
399        let m = q.quantile(0.5);
400        assert!(m > 2000.0 && m < 8000.0, "median was {m}");
401    }
402
403    #[test]
404    fn corr_is_one_on_perfect_positive_line() {
405        let mut c = CovarianceAggregator::new();
406        for i in 0..100 {
407            c.add(i as f64, 2.0 * i as f64 + 3.0);
408        }
409        let r = c.corr();
410        assert!((r - 1.0).abs() < 1e-9, "corr = {r}");
411    }
412
413    #[test]
414    fn corr_is_negative_on_inverse_line() {
415        let mut c = CovarianceAggregator::new();
416        for i in 0..100 {
417            c.add(i as f64, -3.0 * i as f64 + 5.0);
418        }
419        let r = c.corr();
420        assert!((r + 1.0).abs() < 1e-9, "corr = {r}");
421    }
422
423    #[test]
424    fn covar_pop_vs_covar_samp_relationship() {
425        let mut c = CovarianceAggregator::new();
426        for (x, y) in [(1.0, 2.0), (2.0, 4.0), (3.0, 6.0), (4.0, 8.0)] {
427            c.add(x, y);
428        }
429        let pop = c.covar_pop();
430        let samp = c.covar_samp();
431        // samp = pop * n / (n-1) ⇒ samp > pop for finite n.
432        assert!(samp > pop);
433        assert!((samp - pop * 4.0 / 3.0).abs() < 1e-9);
434    }
435
436    #[test]
437    fn merge_combines_parallel_aggregators() {
438        let mut left = CovarianceAggregator::new();
439        let mut right = CovarianceAggregator::new();
440        for i in 0..50 {
441            left.add(i as f64, (i * 2) as f64);
442        }
443        for i in 50..100 {
444            right.add(i as f64, (i * 2) as f64);
445        }
446        let mut full = CovarianceAggregator::new();
447        for i in 0..100 {
448            full.add(i as f64, (i * 2) as f64);
449        }
450        left.merge(&right);
451        assert!((left.corr() - full.corr()).abs() < 1e-9);
452        assert!((left.covar_pop() - full.covar_pop()).abs() < 1e-9);
453    }
454
455    #[test]
456    fn count_if_only_counts_truthy_rows() {
457        let mut c = CountIfAggregator::new();
458        for i in 0..20 {
459            c.add(i % 3 == 0);
460        }
461        assert_eq!(c.finalize(), 7); // 0,3,6,9,12,15,18
462    }
463
464    #[test]
465    fn sum_if_skips_non_finite_values() {
466        let mut s = SumAvgIfAggregator::new();
467        s.add(1.0, true);
468        s.add(f64::NAN, true);
469        s.add(2.0, false);
470        s.add(3.0, true);
471        assert_eq!(s.sum(), 4.0);
472        assert_eq!(s.count(), 2);
473        assert_eq!(s.avg(), 2.0);
474    }
475
476    #[test]
477    fn any_and_any_last_track_endpoints() {
478        let mut first = AnyAggregator::<i32>::new();
479        let mut last = AnyLastAggregator::<i32>::new();
480        for i in 0..10 {
481            first.add(i);
482            last.add(i);
483        }
484        assert_eq!(first.finalize(), Some(0));
485        assert_eq!(last.finalize(), Some(9));
486    }
487
488    #[test]
489    fn group_array_respects_limit() {
490        let mut g = GroupArrayAggregator::<i32>::new(3);
491        for i in 0..10 {
492            g.add(i);
493        }
494        assert_eq!(g.finalize(), vec![0, 1, 2]);
495    }
496
497    #[test]
498    fn group_array_unbounded_when_limit_zero() {
499        let mut g = GroupArrayAggregator::<i32>::new(0);
500        for i in 0..5 {
501            g.add(i);
502        }
503        assert_eq!(g.finalize(), vec![0, 1, 2, 3, 4]);
504    }
505
506    #[test]
507    fn array_join_flattens_vec() {
508        let v = vec![1, 2, 3];
509        let out: Vec<i32> = array_join(&v).collect();
510        assert_eq!(out, v);
511    }
512
513    #[test]
514    fn group_array_merge_respects_limit() {
515        let mut a = GroupArrayAggregator::<i32>::new(4);
516        let mut b = GroupArrayAggregator::<i32>::new(4);
517        a.add(1);
518        a.add(2);
519        b.add(3);
520        b.add(4);
521        b.add(5);
522        a.merge(&b);
523        assert_eq!(a.finalize(), vec![1, 2, 3, 4]);
524    }
525}