use std::cmp::Ordering;
use anyhow::{Result, bail};
use rayon::prelude::*;
use rustc_hash::FxHashMap;
use crate::sql::CompareOp;
use crate::storage::{ColumnData, ColumnStats, NullBitmap, RowGroup, Table};
use crate::types::Value;
use super::aggregate::AggState;
use super::compile::{
CompiledFilter, CompiledProjection, LocalKeyPart, RowSelection,
};
const PARALLEL_AGGREGATE_ROW_GROUPS: usize = 4;
pub(super) fn scan_table<F>(
table: &Table,
mapped: Option<&[u8]>,
filters: &[CompiledFilter],
mut visitor: F,
) -> Result<()>
where
F: FnMut(&RowGroup, usize) -> Result<()>,
{
for row_group in &table.row_groups {
let row_group = row_group.get(&table.schema, mapped)?;
let selection = select_rows(row_group, filters);
selection.try_for_each(|row| visitor(row_group, row))?;
}
Ok(())
}
fn should_parallelize_aggregate(table: &Table) -> bool {
table.row_groups.len() >= PARALLEL_AGGREGATE_ROW_GROUPS
}
pub(super) fn collect_row_group_aggregates<T, F>(
table: &Table,
mapped: Option<&[u8]>,
worker: F,
) -> Result<Vec<T>>
where
T: Send,
F: Fn(&RowGroup) -> Result<T> + Sync + Send,
{
let partials = if should_parallelize_aggregate(table) {
table
.row_groups
.par_iter()
.map(|row_group| row_group.get(&table.schema, mapped).and_then(&worker))
.collect::<Vec<_>>()
} else {
table
.row_groups
.iter()
.map(|row_group| row_group.get(&table.schema, mapped).and_then(&worker))
.collect::<Vec<_>>()
};
partials.into_iter().collect()
}
pub(super) fn aggregate_row_group_all(
row_group: &RowGroup,
filters: &[CompiledFilter],
projections: &[CompiledProjection],
) -> Result<Option<Vec<AggState>>> {
let selection = select_rows(row_group, filters);
if selection.is_empty() {
return Ok(None);
}
let mut states = projections.iter().map(AggState::new).collect::<Vec<_>>();
selection.try_for_each(|row| {
for (state, projection) in states.iter_mut().zip(projections) {
state.update(projection, row_group, row)?;
}
Ok(())
})?;
Ok(Some(states))
}
pub(super) fn aggregate_row_group_grouped(
row_group: &RowGroup,
filters: &[CompiledFilter],
projections: &[CompiledProjection],
group_indexes: &[usize],
) -> Result<FxHashMap<Vec<Value>, Vec<AggState>>> {
let selection = select_rows(row_group, filters);
if selection.is_empty() {
return Ok(FxHashMap::default());
}
let mut local_groups: FxHashMap<Vec<LocalKeyPart>, Vec<AggState>> = FxHashMap::default();
selection.try_for_each(|row| {
let key = group_indexes
.iter()
.map(|index| LocalKeyPart::from_column(&row_group.columns[*index], row))
.collect::<Result<Vec<_>>>()?;
let states = local_groups
.entry(key)
.or_insert_with(|| projections.iter().map(AggState::new).collect());
for (state, projection) in states.iter_mut().zip(projections) {
state.update(projection, row_group, row)?;
}
Ok(())
})?;
let mut materialized = FxHashMap::default();
for (local_key, local_states) in local_groups {
let key = materialize_group_key(row_group, group_indexes, &local_key)?;
materialized.insert(key, local_states);
}
Ok(materialized)
}
fn row_group_matches(row_group: &RowGroup, filters: &[CompiledFilter]) -> bool {
filters.iter().all(|filter| match filter.op {
CompareOp::Eq => stats_may_match_eq(&row_group.stats[filter.column_index], &filter.value),
CompareOp::Lt => stats_may_match_lt(&row_group.stats[filter.column_index], &filter.value),
CompareOp::Lte => stats_may_match_lte(&row_group.stats[filter.column_index], &filter.value),
CompareOp::Gt => stats_may_match_gt(&row_group.stats[filter.column_index], &filter.value),
CompareOp::Gte => stats_may_match_gte(&row_group.stats[filter.column_index], &filter.value),
CompareOp::NotEq => true,
CompareOp::IsNull => row_group.stats[filter.column_index].null_count > 0,
CompareOp::IsNotNull => row_group.stats[filter.column_index].null_count < row_group.rows,
})
}
fn stats_may_match_eq(stats: &ColumnStats, value: &Value) -> bool {
match (&stats.min, &stats.max) {
(Some(min), Some(max)) => {
min.compare(value)
.is_none_or(|ordering| ordering != Ordering::Greater)
&& max
.compare(value)
.is_none_or(|ordering| ordering != Ordering::Less)
}
_ => true,
}
}
fn stats_may_match_lt(stats: &ColumnStats, value: &Value) -> bool {
match &stats.min {
Some(min) => min
.compare(value)
.is_none_or(|ordering| ordering == Ordering::Less),
None => true,
}
}
fn stats_may_match_lte(stats: &ColumnStats, value: &Value) -> bool {
match &stats.min {
Some(min) => min
.compare(value)
.is_none_or(|ordering| ordering != Ordering::Greater),
None => true,
}
}
fn stats_may_match_gt(stats: &ColumnStats, value: &Value) -> bool {
match &stats.max {
Some(max) => max
.compare(value)
.is_none_or(|ordering| ordering == Ordering::Greater),
None => true,
}
}
fn stats_may_match_gte(stats: &ColumnStats, value: &Value) -> bool {
match &stats.max {
Some(max) => max
.compare(value)
.is_none_or(|ordering| ordering != Ordering::Less),
None => true,
}
}
fn select_rows(row_group: &RowGroup, filters: &[CompiledFilter]) -> RowSelection {
if !row_group_matches(row_group, filters) {
return RowSelection::Indexes(Vec::new());
}
if filters.is_empty() {
return RowSelection::All(row_group.rows);
}
let mut selected = apply_filter_to_all_rows(row_group, &filters[0]);
for filter in &filters[1..] {
if selected.is_empty() {
break;
}
refine_selection(row_group, filter, &mut selected);
}
RowSelection::Indexes(selected)
}
fn apply_filter_to_all_rows(row_group: &RowGroup, filter: &CompiledFilter) -> Vec<u32> {
let mut selected = Vec::with_capacity(row_group.rows);
let column = &row_group.columns[filter.column_index];
let nulls = row_group.nulls[filter.column_index].as_ref();
for row in 0..row_group.rows {
if column_matches_filter(column, row, filter, nulls) {
selected.push(row as u32);
}
}
selected
}
fn refine_selection(row_group: &RowGroup, filter: &CompiledFilter, selected: &mut Vec<u32>) {
let column = &row_group.columns[filter.column_index];
let nulls = row_group.nulls[filter.column_index].as_ref();
selected.retain(|row| column_matches_filter(column, *row as usize, filter, nulls));
}
fn column_matches_filter(column: &ColumnData, row: usize, filter: &CompiledFilter, nulls: Option<&NullBitmap>) -> bool {
match filter.op {
CompareOp::Eq => column.compare_at(row, &filter.value) == Some(Ordering::Equal),
CompareOp::NotEq => column.compare_at(row, &filter.value) != Some(Ordering::Equal),
CompareOp::Lt => column.compare_at(row, &filter.value) == Some(Ordering::Less),
CompareOp::Lte => column
.compare_at(row, &filter.value)
.is_some_and(|ordering| ordering == Ordering::Less || ordering == Ordering::Equal),
CompareOp::Gt => column.compare_at(row, &filter.value) == Some(Ordering::Greater),
CompareOp::Gte => column
.compare_at(row, &filter.value)
.is_some_and(|ordering| ordering == Ordering::Greater || ordering == Ordering::Equal),
CompareOp::IsNull => nulls.is_some_and(|n| n.is_null(row)),
CompareOp::IsNotNull => !nulls.is_some_and(|n| n.is_null(row)),
}
}
fn materialize_group_key(
row_group: &RowGroup,
group_indexes: &[usize],
key: &[LocalKeyPart],
) -> Result<Vec<Value>> {
let mut values = Vec::with_capacity(key.len());
for (group_index, part) in group_indexes.iter().zip(key) {
let value = match (&row_group.columns[*group_index], part) {
(ColumnData::Int64(_), LocalKeyPart::Int64(value)) => Value::Int64(*value),
(ColumnData::Float64(_), LocalKeyPart::Float64(value)) => Value::Float64(*value),
(ColumnData::Bool(_), LocalKeyPart::Bool(value)) => Value::Bool(*value),
(ColumnData::StringPlain(_), LocalKeyPart::String(value)) => {
Value::String(value.clone())
}
(ColumnData::StringDict { dictionary, .. }, LocalKeyPart::StringCode(code)) => {
Value::String(dictionary[*code as usize].clone())
}
_ => bail!("group key type mismatch"),
};
values.push(value);
}
Ok(values)
}