use std::sync::Arc;
use super::{FactorizedResult, LazyFactorizedChainOperator, Operator, OperatorResult};
use crate::execution::DataChunk;
use crate::execution::factorized_chunk::FactorizedChunk;
use crate::execution::vector::ValueVector;
use grafeo_common::types::{LogicalType, Value};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum FactorizedAggregate {
Count,
CountColumn {
column_idx: usize,
},
Sum {
column_idx: usize,
},
Avg {
column_idx: usize,
},
Min {
column_idx: usize,
},
Max {
column_idx: usize,
},
}
impl FactorizedAggregate {
#[must_use]
pub fn count() -> Self {
Self::Count
}
#[must_use]
pub fn count_column(column_idx: usize) -> Self {
Self::CountColumn { column_idx }
}
#[must_use]
pub fn sum(column_idx: usize) -> Self {
Self::Sum { column_idx }
}
#[must_use]
pub fn avg(column_idx: usize) -> Self {
Self::Avg { column_idx }
}
#[must_use]
pub fn min(column_idx: usize) -> Self {
Self::Min { column_idx }
}
#[must_use]
pub fn max(column_idx: usize) -> Self {
Self::Max { column_idx }
}
pub fn compute(&self, chunk: &FactorizedChunk) -> Value {
let multiplicities = match self {
Self::CountColumn { .. } | Self::Sum { .. } | Self::Avg { .. } => {
Some(chunk.compute_path_multiplicities())
}
_ => None,
};
self.compute_with_multiplicities(chunk, multiplicities.as_deref())
}
pub fn compute_with_multiplicities(
&self,
chunk: &FactorizedChunk,
multiplicities: Option<&[usize]>,
) -> Value {
match self {
Self::Count => Value::Int64(i64::try_from(chunk.count_rows()).unwrap_or(i64::MAX)),
Self::CountColumn { column_idx } => {
if chunk.level_count() == 0 {
return Value::Int64(0);
}
let deepest_idx = chunk.level_count() - 1;
let Some(deepest) = chunk.level(deepest_idx) else {
return Value::Int64(0);
};
let Some(col) = deepest.column(*column_idx) else {
return Value::Int64(0);
};
let computed;
let mults = match multiplicities {
Some(m) => m,
None => {
computed = chunk.compute_path_multiplicities();
&computed
}
};
let mut count: i64 = 0;
for (phys_idx, mult) in mults.iter().enumerate() {
if let Some(value) = col.get_physical(phys_idx)
&& !matches!(value, Value::Null)
{
count = count.saturating_add(i64::try_from(*mult).unwrap_or(i64::MAX));
}
}
Value::Int64(count)
}
Self::Sum { column_idx } => match chunk.sum_deepest(*column_idx) {
Some(sum) => Value::Float64(sum),
None => Value::Null,
},
Self::Avg { column_idx } => match chunk.avg_deepest(*column_idx) {
Some(avg) => Value::Float64(avg),
None => Value::Null,
},
Self::Min { column_idx } => chunk.min_deepest(*column_idx).unwrap_or(Value::Null),
Self::Max { column_idx } => chunk.max_deepest(*column_idx).unwrap_or(Value::Null),
}
}
#[must_use]
pub fn output_type(&self) -> LogicalType {
match self {
Self::Count | Self::CountColumn { .. } => LogicalType::Int64,
Self::Sum { .. } | Self::Avg { .. } => LogicalType::Float64,
Self::Min { .. } | Self::Max { .. } => LogicalType::Any,
}
}
}
pub struct FactorizedAggregateOperator {
input: LazyFactorizedChainOperator,
aggregates: Vec<FactorizedAggregate>,
executed: bool,
}
impl FactorizedAggregateOperator {
pub fn new(input: LazyFactorizedChainOperator, aggregates: Vec<FactorizedAggregate>) -> Self {
Self {
input,
aggregates,
executed: false,
}
}
fn execute(&mut self) -> OperatorResult {
let mut factorized = match self.input.next_factorized() {
Ok(Some(chunk)) => chunk,
Ok(None) => {
return Ok(Some(self.create_empty_result()));
}
Err(e) => return Err(e),
};
let multiplicities: Arc<[usize]> = factorized.path_multiplicities_cached();
let output_cols: Vec<ValueVector> = self
.aggregates
.iter()
.map(|agg| {
let mut col = ValueVector::with_type(agg.output_type());
col.push_value(agg.compute_with_multiplicities(&factorized, Some(&multiplicities)));
col
})
.collect();
let mut chunk = DataChunk::new(output_cols);
chunk.set_count(1);
Ok(Some(chunk))
}
fn create_empty_result(&self) -> DataChunk {
let cols: Vec<ValueVector> = self
.aggregates
.iter()
.map(|agg| {
let mut col = ValueVector::with_type(agg.output_type());
match agg {
FactorizedAggregate::Count | FactorizedAggregate::CountColumn { .. } => {
col.push_value(Value::Int64(0));
}
_ => {
col.push_value(Value::Null);
}
}
col
})
.collect();
let mut chunk = DataChunk::new(cols);
chunk.set_count(1);
chunk
}
}
impl Operator for FactorizedAggregateOperator {
fn next(&mut self) -> OperatorResult {
if self.executed {
return Ok(None);
}
self.executed = true;
self.execute()
}
fn reset(&mut self) {
self.input.reset();
self.executed = false;
}
fn name(&self) -> &'static str {
"FactorizedAggregate"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
pub trait FactorizedOperator {
fn next_factorized(&mut self) -> FactorizedResult;
}
impl FactorizedOperator for LazyFactorizedChainOperator {
fn next_factorized(&mut self) -> FactorizedResult {
LazyFactorizedChainOperator::next_factorized(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::factorized_chunk::FactorizationLevel;
use crate::execution::factorized_vector::FactorizedVector;
fn create_test_factorized_chunk() -> FactorizedChunk {
let mut source_data = ValueVector::with_type(LogicalType::Int64);
source_data.push_int64(10);
source_data.push_int64(20);
let level0 = FactorizationLevel::flat(
vec![FactorizedVector::flat(source_data)],
vec!["source".to_string()],
);
let mut child_data = ValueVector::with_type(LogicalType::Int64);
child_data.push_int64(1);
child_data.push_int64(2);
child_data.push_int64(3);
child_data.push_int64(4);
child_data.push_int64(5);
let offsets = vec![0u32, 3, 5];
let child_vec = FactorizedVector::unflat(child_data, offsets, 2);
let level1 =
FactorizationLevel::unflat(vec![child_vec], vec!["child".to_string()], vec![3, 2]);
let mut chunk = FactorizedChunk::empty();
chunk.add_factorized_level(level0);
chunk.add_factorized_level(level1);
chunk
}
#[test]
fn test_count_aggregate() {
let chunk = create_test_factorized_chunk();
let agg = FactorizedAggregate::count();
let result = agg.compute(&chunk);
assert_eq!(result, Value::Int64(5));
}
#[test]
fn test_sum_aggregate() {
let chunk = create_test_factorized_chunk();
let agg = FactorizedAggregate::sum(0);
let result = agg.compute(&chunk);
assert_eq!(result, Value::Float64(15.0));
}
#[test]
fn test_avg_aggregate() {
let chunk = create_test_factorized_chunk();
let agg = FactorizedAggregate::avg(0);
let result = agg.compute(&chunk);
assert_eq!(result, Value::Float64(3.0));
}
#[test]
fn test_min_aggregate() {
let chunk = create_test_factorized_chunk();
let agg = FactorizedAggregate::min(0);
let result = agg.compute(&chunk);
assert_eq!(result, Value::Int64(1));
}
#[test]
fn test_max_aggregate() {
let chunk = create_test_factorized_chunk();
let agg = FactorizedAggregate::max(0);
let result = agg.compute(&chunk);
assert_eq!(result, Value::Int64(5));
}
#[test]
fn test_multiplicity_weighted_sum() {
let mut source = ValueVector::with_type(LogicalType::Int64);
source.push_int64(10);
let level0 =
FactorizationLevel::flat(vec![FactorizedVector::flat(source)], vec!["a".to_string()]);
let mut children = ValueVector::with_type(LogicalType::Int64);
children.push_int64(100);
children.push_int64(200);
let child_vec = FactorizedVector::unflat(children, vec![0, 2], 1);
let level1 = FactorizationLevel::unflat(
vec![child_vec],
vec!["b".to_string()],
vec![2], );
let mut grandchildren = ValueVector::with_type(LogicalType::Int64);
grandchildren.push_int64(1);
grandchildren.push_int64(2);
grandchildren.push_int64(3);
grandchildren.push_int64(4);
grandchildren.push_int64(5);
let gc_vec = FactorizedVector::unflat(grandchildren, vec![0, 3, 5], 2);
let level2 = FactorizationLevel::unflat(
vec![gc_vec],
vec!["c".to_string()],
vec![3, 2], );
let mut chunk = FactorizedChunk::empty();
chunk.add_factorized_level(level0);
chunk.add_factorized_level(level1);
chunk.add_factorized_level(level2);
assert_eq!(chunk.logical_row_count(), 5);
let sum_agg = FactorizedAggregate::sum(0);
let sum_result = sum_agg.compute(&chunk);
assert_eq!(sum_result, Value::Float64(15.0));
let count_agg = FactorizedAggregate::count();
let count_result = count_agg.compute(&chunk);
assert_eq!(count_result, Value::Int64(5));
}
#[test]
fn test_empty_chunk_aggregates() {
let chunk = FactorizedChunk::empty();
assert_eq!(
FactorizedAggregate::count().compute(&chunk),
Value::Int64(0)
);
assert_eq!(FactorizedAggregate::sum(0).compute(&chunk), Value::Null);
assert_eq!(FactorizedAggregate::min(0).compute(&chunk), Value::Null);
}
}