use std::collections::{HashMap, HashSet};
use crate::bridge::physical_plan::AggregateSpec;
use nodedb_types::Value;
pub(super) const ARRAY_AGG_CAP: usize = 10_000;
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) enum AggAccum {
Count { n: u64 },
SumAvg { sum: f64, comp: f64, n: u64 },
SumAvgDistinct { seen: HashMap<Vec<u8>, f64> },
Min { best: Option<Value> },
Max { best: Option<Value> },
CountDistinct { seen: HashSet<Vec<u8>> },
Welford { n: u64, mean: f64, m2: f64 },
Hll {
hll: nodedb_types::approx::HyperLogLog,
},
TDigest {
digest: nodedb_types::approx::TDigest,
},
TopK {
ss: nodedb_types::approx::SpaceSaving,
k: usize,
},
ArrayAgg { values: Vec<Value> },
ArrayAggDistinct {
seen: HashSet<Vec<u8>>,
values: Vec<Value>,
},
PercentileCont { values: Vec<f64>, pct: f64 },
StringAgg { parts: Vec<String> },
}
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct GroupState {
pub(super) accums: Vec<AggAccum>,
}
impl GroupState {
pub(crate) fn new(aggregates: &[AggregateSpec]) -> Self {
Self {
accums: aggregates.iter().map(AggAccum::new).collect(),
}
}
pub(crate) fn feed(&mut self, aggregates: &[AggregateSpec], doc: &[u8]) {
for (accum, agg) in self.accums.iter_mut().zip(aggregates) {
accum.feed(agg, doc);
}
}
pub(crate) fn merge_from(&mut self, other: GroupState) {
super::merge::merge_group_state(self, other);
}
pub(crate) fn finalize(self, aggregates: &[AggregateSpec]) -> Vec<(String, Value)> {
self.accums
.into_iter()
.zip(aggregates)
.map(|(accum, agg)| (agg.alias.clone(), accum.finalize(agg)))
.collect()
}
}