use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use crate::data::executor::handlers::columnar_agg_support::{AggAccum, GroupKey};
use crate::types::{DatabaseId, TenantId};
use super::core::SpillCore;
pub(in crate::data::executor::handlers) struct ColumnarGroupBySpiller {
core: SpillCore<GroupKey, Vec<AggAccum>>,
in_mem: HashMap<GroupKey, Vec<AggAccum>>,
cap: usize,
governor: Option<Arc<nodedb_mem::MemoryGovernor>>,
feed_counter: u64,
db: DatabaseId,
tenant: TenantId,
}
impl ColumnarGroupBySpiller {
pub(in crate::data::executor::handlers) fn new(
spill_dir: PathBuf,
cap: usize,
governor: Option<Arc<nodedb_mem::MemoryGovernor>>,
db: DatabaseId,
tenant: TenantId,
) -> crate::Result<Self> {
Ok(Self {
core: SpillCore::new(spill_dir)?,
in_mem: HashMap::new(),
cap: cap.max(1),
governor,
feed_counter: 0,
db,
tenant,
})
}
pub(in crate::data::executor::handlers) fn get_or_insert_with(
&mut self,
key: GroupKey,
num_aggs: usize,
) -> crate::Result<&mut Vec<AggAccum>> {
self.feed_counter += 1;
if self.feed_counter.is_multiple_of(10_000) {
let estimated_growth = std::mem::size_of::<AggAccum>() * num_aggs * 10_000;
if let Some(ref gov) = self.governor
&& gov
.try_reserve(
self.db,
self.tenant,
nodedb_mem::EngineId::Query,
estimated_growth,
)
.is_err()
{
self.spill_current_run()?;
}
}
if !self.in_mem.contains_key(&key) && self.in_mem.len() >= self.cap {
self.spill_current_run()?;
}
Ok(self
.in_mem
.entry(key)
.or_insert_with(|| (0..num_aggs).map(|_| AggAccum::new()).collect()))
}
fn spill_current_run(&mut self) -> crate::Result<()> {
if self.in_mem.is_empty() {
return Ok(());
}
self.core.flush_run(self.in_mem.drain())?;
Ok(())
}
pub(in crate::data::executor::handlers) fn finalize(
mut self,
) -> crate::Result<HashMap<GroupKey, Vec<AggAccum>>> {
self.core.merge(&mut self.in_mem, self.cap, |dst, src| {
for (d, s) in dst.iter_mut().zip(src) {
d.count += s.count;
d.sum += s.sum;
if s.min < d.min {
d.min = s.min;
}
if s.max > d.max {
d.max = s.max;
}
}
})
}
}