use crate::{
builtins::{BuiltinSelectionPosition, BuiltinSinkAccumulator},
data::value::Val,
};
use super::{ReducerAccumulator, Sink};
pub(crate) struct SinkAccumulator<'a> {
sink: &'a Sink,
collect: Vec<Val>,
reducer: Option<ReducerAccumulator>,
first: Option<Val>,
last: Option<Val>,
nth: Option<Val>,
nth_seen: usize,
hll: [u8; HLL_M],
}
impl<'a> SinkAccumulator<'a> {
pub(crate) fn new(sink: &'a Sink) -> Self {
Self {
sink,
collect: Vec::new(),
reducer: sink.reducer_spec().map(ReducerAccumulator::new),
first: None,
last: None,
nth: None,
nth_seen: 0,
hll: [0; HLL_M],
}
}
pub(crate) fn push(&mut self, item: Val) -> bool {
if let Some(spec) = self.sink.builtin_sink_spec() {
return self.observe_builtin(spec.accumulator, item);
}
match self.sink {
Sink::Collect => {
self.observe_collect(item);
false
}
Sink::Reducer(_) => {
self.observe_reducer(&item);
false
}
Sink::ApproxCountDistinct => {
self.observe_approx_distinct(&item);
false
}
Sink::Nth(idx) => self.observe_nth(*idx, item),
Sink::Terminal(_) => false,
}
}
pub(crate) fn observe_builtin(
&mut self,
accumulator: BuiltinSinkAccumulator,
item: Val,
) -> bool {
self.observe_builtin_lazy(accumulator, || item, || None, || None)
.unwrap_or(false)
}
pub(crate) fn observe_builtin_lazy<F, N, K>(
&mut self,
accumulator: BuiltinSinkAccumulator,
materialize_item: F,
materialize_numeric: N,
hash_key: K,
) -> Option<bool>
where
F: FnOnce() -> Val,
N: FnOnce() -> Option<Val>,
K: FnOnce() -> Option<String>,
{
match accumulator {
BuiltinSinkAccumulator::Count => {
self.observe_count();
Some(false)
}
BuiltinSinkAccumulator::Numeric => {
let numeric_item = materialize_numeric().unwrap_or_else(materialize_item);
self.observe_numeric(&numeric_item);
Some(false)
}
BuiltinSinkAccumulator::ApproxDistinct => {
if let Some(key) = hash_key() {
self.observe_approx_distinct_key(&key);
} else {
self.observe_approx_distinct(&materialize_item());
}
Some(false)
}
BuiltinSinkAccumulator::SelectOne(BuiltinSelectionPosition::First) => {
Some(self.observe_first(materialize_item()))
}
BuiltinSinkAccumulator::SelectOne(BuiltinSelectionPosition::Last) => {
self.observe_last(materialize_item());
Some(false)
}
}
}
pub(crate) fn observe_collect(&mut self, item: Val) {
self.collect.push(item);
}
pub(crate) fn observe_reducer(&mut self, item: &Val) {
if let Some(reducer) = &mut self.reducer {
reducer.push(item);
}
}
pub(crate) fn observe_count(&mut self) {
self.observe_reducer(&Val::Null);
}
pub(crate) fn observe_numeric(&mut self, item: &Val) {
self.observe_reducer(item);
}
pub(crate) fn observe_first(&mut self, item: Val) -> bool {
if self.first.is_none() {
self.first = Some(item);
true
} else {
false
}
}
pub(crate) fn observe_last(&mut self, item: Val) {
self.last = Some(item);
}
pub(crate) fn observe_nth(&mut self, idx: usize, item: Val) -> bool {
self.observe_nth_lazy(idx, || item)
}
pub(crate) fn observe_nth_lazy<F>(&mut self, idx: usize, materialize_item: F) -> bool
where
F: FnOnce() -> Val,
{
if self.nth.is_some() {
return true;
}
if self.nth_seen == idx {
self.nth = Some(materialize_item());
true
} else {
self.nth_seen += 1;
false
}
}
pub(crate) fn observe_approx_distinct(&mut self, item: &Val) {
hll_observe(&mut self.hll, item);
}
pub(crate) fn observe_approx_distinct_key(&mut self, key: &str) {
hll_observe_key(&mut self.hll, key);
}
pub(crate) fn push_projected_numeric(&mut self, numeric_item: &Val) {
self.observe_reducer(numeric_item);
}
pub(crate) fn finish(self, unwrap_single_collect_obj: bool) -> Val {
if let Some(spec) = self.sink.builtin_sink_spec() {
return self.finish_builtin(spec.accumulator);
}
match self.sink {
Sink::Collect => {
if unwrap_single_collect_obj
&& self.collect.len() == 1
&& matches!(self.collect[0], Val::Obj(_))
{
self.collect.into_iter().next().unwrap()
} else {
Val::arr(self.collect)
}
}
Sink::Reducer(_) => self
.reducer
.expect("reducer sinks construct reducer")
.finish(),
Sink::ApproxCountDistinct => Val::Int(hll_estimate(&self.hll) as i64),
Sink::Nth(_) => self.nth.unwrap_or(Val::Null),
Sink::Terminal(_) => Val::Null,
}
}
fn finish_builtin(self, accumulator: BuiltinSinkAccumulator) -> Val {
match accumulator {
BuiltinSinkAccumulator::Count | BuiltinSinkAccumulator::Numeric => self
.reducer
.expect("reducer sinks construct reducer")
.finish(),
BuiltinSinkAccumulator::ApproxDistinct => Val::Int(hll_estimate(&self.hll) as i64),
BuiltinSinkAccumulator::SelectOne(BuiltinSelectionPosition::First) => {
self.first.unwrap_or(Val::Null)
}
BuiltinSinkAccumulator::SelectOne(BuiltinSelectionPosition::Last) => {
self.last.unwrap_or(Val::Null)
}
}
}
}
const HLL_P: u32 = 12;
const HLL_M: usize = 1 << HLL_P;
#[inline]
fn hll_hash_key(key: &str) -> u64 {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
static STATE: std::sync::OnceLock<RandomState> = std::sync::OnceLock::new();
let bs = STATE.get_or_init(RandomState::new);
let mut h = bs.build_hasher();
h.write(key.as_bytes());
h.finish()
}
fn hll_observe(reg: &mut [u8; HLL_M], v: &Val) {
use crate::util::val_to_key;
hll_observe_key(reg, &val_to_key(v));
}
fn hll_observe_key(reg: &mut [u8; HLL_M], key: &str) {
let h = hll_hash_key(key);
let idx = (h >> (64 - HLL_P)) as usize;
let w = (h << HLL_P) | (1u64 << (HLL_P - 1));
let lz = w.leading_zeros() as u8 + 1;
if lz > reg[idx] {
reg[idx] = lz;
}
}
fn hll_estimate(reg: &[u8; HLL_M]) -> f64 {
let mut z: f64 = 0.0;
let mut zeros: usize = 0;
for &r in reg.iter() {
z += 1.0 / (1u64 << r) as f64;
if r == 0 {
zeros += 1;
}
}
let m = HLL_M as f64;
let alpha_m = 0.7213 / (1.0 + 1.079 / m);
let raw = alpha_m * m * m / z;
if raw <= 2.5 * m && zeros > 0 {
return m * (m / zeros as f64).ln();
}
raw
}