use super::listing::PartitionedFile;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;
use futures::{Stream, StreamExt};
use itertools::izip;
use itertools::multiunzip;
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
file_schema: SchemaRef,
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
let size = file_schema.fields().len();
let mut null_counts: Vec<Precision<usize>> = vec![Precision::Absent; size];
let mut max_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut min_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut num_rows = Precision::<usize>::Absent;
let mut total_byte_size = Precision::<usize>::Absent;
let mut all_files = Box::pin(all_files.fuse());
if let Some(first_file) = all_files.next().await {
let (file, file_stats) = first_file?;
result_files.push(file);
num_rows = file_stats.num_rows;
total_byte_size = file_stats.total_byte_size;
for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() {
null_counts[index] = file_column.null_count;
max_values[index] = file_column.max_value;
min_values[index] = file_column.min_value;
}
let conservative_num_rows = match num_rows {
Precision::Exact(nr) => nr,
_ => usize::MIN,
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let (file, file_stats) = current?;
result_files.push(file);
num_rows = add_row_stats(file_stats.num_rows, num_rows);
total_byte_size =
add_row_stats(file_stats.total_byte_size, total_byte_size);
(null_counts, max_values, min_values) = multiunzip(
izip!(
file_stats.column_statistics.into_iter(),
null_counts.into_iter(),
max_values.into_iter(),
min_values.into_iter()
)
.map(
|(
ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
distinct_count: _,
},
null_count,
max_value,
min_value,
)| {
(
add_row_stats(file_nc, null_count),
set_max_if_greater(file_max, max_value),
set_min_if_lesser(file_min, min_value),
)
},
),
);
if num_rows.get_value().unwrap_or(&usize::MIN)
> &limit.unwrap_or(usize::MAX)
{
break;
}
}
}
};
let mut statistics = Statistics {
num_rows,
total_byte_size,
column_statistics: get_col_stats_vec(null_counts, max_values, min_values),
};
if all_files.next().await.is_some() {
statistics = statistics.into_inexact()
}
Ok((result_files, statistics))
}
pub(crate) fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
.iter()
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
.collect();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
.iter()
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
.collect();
(max_values, min_values)
}
fn add_row_stats(
file_num_rows: Precision<usize>,
num_rows: Precision<usize>,
) -> Precision<usize> {
match (file_num_rows, &num_rows) {
(Precision::Absent, _) => num_rows.to_inexact(),
(lhs, Precision::Absent) => lhs.to_inexact(),
(lhs, rhs) => lhs.add(rhs),
}
}
pub(crate) fn get_col_stats_vec(
null_counts: Vec<Precision<usize>>,
max_values: Vec<Precision<ScalarValue>>,
min_values: Vec<Precision<ScalarValue>>,
) -> Vec<ColumnStatistics> {
izip!(null_counts, max_values, min_values)
.map(|(null_count, max_value, min_value)| ColumnStatistics {
null_count,
max_value,
min_value,
distinct_count: Precision::Absent,
})
.collect()
}
pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
.map(|i| {
let max_value = match &max_values[i] {
Some(max_value) => max_value.evaluate().ok(),
None => None,
};
let min_value = match &min_values[i] {
Some(min_value) => min_value.evaluate().ok(),
None => None,
};
ColumnStatistics {
null_count: null_counts[i].clone(),
max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
distinct_count: Precision::Absent,
}
})
.collect()
}
fn set_max_if_greater(
max_nominee: Precision<ScalarValue>,
max_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&max_values, &max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee,
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 < val2 =>
{
max_nominee.to_inexact()
}
(Precision::Exact(_), Precision::Absent) => max_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => max_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => max_values,
}
}
fn set_min_if_lesser(
min_nominee: Precision<ScalarValue>,
min_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&min_values, &min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee,
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 > val2 =>
{
min_nominee.to_inexact()
}
(Precision::Exact(_), Precision::Absent) => min_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => min_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => min_values,
}
}