use crate::parser::{AggregationOperator, DataType};
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
use super::common::{
ThresholdCmp, agg_semiring_new, aggregation_optimize_pipeline, aggregation_pre_leave_pipeline,
key_pattern, row_pattern, tuple,
};
fn avg_result_from_key(arity: usize, agg_pos: usize) -> TokenStream {
let key_arity = arity - 1;
let mut parts = Vec::with_capacity(arity);
let mut ki = 0usize;
for i in 0..arity {
if i == agg_pos {
parts.push(quote! { agg_val.avg() });
} else {
let kf = format_ident!("k{}", ki);
parts.push(quote! { #kf });
ki += 1;
}
}
debug_assert_eq!(ki, key_arity);
tuple(&parts)
}
pub(crate) fn aggregation_avg_optimize(
arity: usize,
agg_pos: usize,
agg_type: DataType,
) -> TokenStream {
aggregation_optimize_pipeline(
arity,
agg_pos,
row_pattern(arity),
agg_semiring_new(AggregationOperator::Avg, agg_pos, agg_type),
ThresholdCmp::Ne,
avg_result_from_key(arity, agg_pos),
)
}
pub(crate) fn aggregation_avg_pre_leave(
arity: usize,
agg_pos: usize,
agg_type: DataType,
) -> TokenStream {
aggregation_pre_leave_pipeline(
arity,
agg_pos,
row_pattern(arity),
agg_semiring_new(AggregationOperator::Avg, agg_pos, agg_type),
)
}
pub(crate) fn aggregation_avg_post_leave(arity: usize, agg_pos: usize) -> TokenStream {
let key_pat = key_pattern(arity);
let result = avg_result_from_key(arity, agg_pos);
quote! {
.consolidate()
.inner
.map(move |(#key_pat, t, agg_val)| {
let row = #result;
(row, t, SEMIRING_ONE)
})
.as_collection()
}
}