use super::ast::{AggregateExpr, AggregateOp};
use crate::storage::schema::Value;
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(test)]
pub(crate) static MATERIALIZED_COUNT: AtomicUsize = AtomicUsize::new(0);
#[cfg(test)]
pub(crate) fn note_materialized() {
MATERIALIZED_COUNT.fetch_add(1, Ordering::Relaxed);
}
#[cfg(not(test))]
#[inline]
pub(crate) fn note_materialized() {}
enum Slot {
CountStar { count: u64 },
CountColumn { count: u64 },
Sum { sum: f64, seen_any: bool },
Avg { sum: f64, count: u64 },
Min { current: Option<Value> },
Max { current: Option<Value> },
}
impl Slot {
fn for_op(op: AggregateOp) -> Self {
match op {
AggregateOp::CountStar => Slot::CountStar { count: 0 },
AggregateOp::CountColumn => Slot::CountColumn { count: 0 },
AggregateOp::Sum => Slot::Sum {
sum: 0.0,
seen_any: false,
},
AggregateOp::Avg => Slot::Avg { sum: 0.0, count: 0 },
AggregateOp::Min => Slot::Min { current: None },
AggregateOp::Max => Slot::Max { current: None },
}
}
}
pub(super) struct GroupAccumulator {
slots: Vec<Slot>,
}
impl GroupAccumulator {
pub(super) fn new(aggregates: &[AggregateExpr]) -> Self {
Self {
slots: aggregates.iter().map(|a| Slot::for_op(a.op)).collect(),
}
}
pub(super) fn accumulate(&mut self, aggregates: &[AggregateExpr], inputs: &[Value]) {
for (slot, expr) in self.slots.iter_mut().zip(aggregates.iter()) {
match slot {
Slot::CountStar { count } => {
*count += 1;
}
Slot::CountColumn { count } => {
if let Some(v) = inputs.get(expr.input_index) {
if !matches!(v, Value::Null) {
*count += 1;
}
}
}
Slot::Sum { sum, seen_any } => {
if let Some(v) = inputs.get(expr.input_index) {
if let Some(n) = numeric_value(v) {
*sum += n;
*seen_any = true;
}
}
}
Slot::Avg { sum, count } => {
if let Some(v) = inputs.get(expr.input_index) {
if let Some(n) = numeric_value(v) {
*sum += n;
*count += 1;
}
}
}
Slot::Min { current } => {
if let Some(v) = inputs.get(expr.input_index) {
update_extreme(current, v, std::cmp::Ordering::Less);
}
}
Slot::Max { current } => {
if let Some(v) = inputs.get(expr.input_index) {
update_extreme(current, v, std::cmp::Ordering::Greater);
}
}
}
}
}
pub(super) fn finalize(self) -> Vec<Value> {
note_materialized();
self.slots
.into_iter()
.map(|slot| match slot {
Slot::CountStar { count } | Slot::CountColumn { count } => {
Value::Integer(count as i64)
}
Slot::Sum { sum, seen_any } => {
if seen_any {
Value::Float(sum)
} else {
Value::Null
}
}
Slot::Avg { sum, count } => {
if count == 0 {
Value::Null
} else {
Value::Float(sum / count as f64)
}
}
Slot::Min { current } | Slot::Max { current } => current.unwrap_or(Value::Null),
})
.collect()
}
}
fn numeric_value(v: &Value) -> Option<f64> {
match v {
Value::Integer(i) => Some(*i as f64),
Value::UnsignedInteger(u) => Some(*u as f64),
Value::Float(f) if f.is_finite() => Some(*f),
Value::Decimal(d) => Some(*d as f64),
Value::Boolean(b) => Some(if *b { 1.0 } else { 0.0 }),
_ => None,
}
}
fn update_extreme(current: &mut Option<Value>, candidate: &Value, target: std::cmp::Ordering) {
if matches!(candidate, Value::Null) {
return;
}
let Some(cand_key) = crate::storage::schema::value_to_canonical_key(candidate) else {
return;
};
match current {
None => {
*current = Some(candidate.clone());
}
Some(cur) => {
let Some(cur_key) = crate::storage::schema::value_to_canonical_key(cur) else {
*current = Some(candidate.clone());
return;
};
if cur_key.family() != cand_key.family() {
return;
}
if cand_key.cmp(&cur_key) == target {
*current = Some(candidate.clone());
}
}
}
}