#![allow(clippy::module_inception)]
pub mod datasource;
pub mod empty;
pub mod file_format;
pub mod listing;
pub mod memory;
pub mod object_store;
use futures::Stream;
pub use self::datasource::{TableProvider, TableType};
pub use self::memory::MemTable;
use self::object_store::{FileMeta, SizedFile};
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use crate::scalar::ScalarValue;
use futures::StreamExt;
use std::pin::Pin;
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
file_schema: SchemaRef,
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
let mut total_byte_size = 0;
let mut null_counts = vec![0; file_schema.fields().len()];
let mut has_statistics = false;
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
let mut num_rows = 0;
let mut is_exact = true;
let mut all_files = Box::pin(all_files.fuse());
while let Some(res) = all_files.next().await {
let (file, file_stats) = res?;
result_files.push(file);
is_exact &= file_stats.is_exact;
num_rows += file_stats.num_rows.unwrap_or(0);
total_byte_size += file_stats.total_byte_size.unwrap_or(0);
if let Some(vec) = &file_stats.column_statistics {
has_statistics = true;
for (i, cs) in vec.iter().enumerate() {
null_counts[i] += cs.null_count.unwrap_or(0);
if let Some(max_value) = &mut max_values[i] {
if let Some(file_max) = cs.max_value.clone() {
match max_value.update_batch(&[file_max.to_array()]) {
Ok(_) => {}
Err(_) => {
max_values[i] = None;
}
}
}
}
if let Some(min_value) = &mut min_values[i] {
if let Some(file_min) = cs.min_value.clone() {
match min_value.update_batch(&[file_min.to_array()]) {
Ok(_) => {}
Err(_) => {
min_values[i] = None;
}
}
}
}
}
}
if num_rows > limit.unwrap_or(usize::MAX) {
break;
}
}
if all_files.next().await.is_some() {
is_exact = false;
}
let column_stats = if has_statistics {
Some(get_col_stats(
&*file_schema,
null_counts,
&mut max_values,
&mut min_values,
))
} else {
None
};
let statistics = Statistics {
num_rows: Some(num_rows as usize),
total_byte_size: Some(total_byte_size as usize),
column_statistics: column_stats,
is_exact,
};
Ok((result_files, statistics))
}
#[derive(Debug, Clone)]
pub struct PartitionedFile {
pub file_meta: FileMeta,
pub partition_values: Vec<ScalarValue>,
}
impl PartitionedFile {
pub fn new(path: String, size: u64) -> Self {
Self {
file_meta: FileMeta {
sized_file: SizedFile { path, size },
last_modified: None,
},
partition_values: vec![],
}
}
}
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
impl std::fmt::Display for PartitionedFile {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.file_meta)
}
}
fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
.iter()
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
.iter()
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
(max_values, min_values)
}
fn get_col_stats(
schema: &Schema,
null_counts: Vec<usize>,
max_values: &mut Vec<Option<MaxAccumulator>>,
min_values: &mut Vec<Option<MinAccumulator>>,
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
.map(|i| {
let max_value = match &max_values[i] {
Some(max_value) => max_value.evaluate().ok(),
None => None,
};
let min_value = match &min_values[i] {
Some(min_value) => min_value.evaluate().ok(),
None => None,
};
ColumnStatistics {
null_count: Some(null_counts[i] as usize),
max_value,
min_value,
distinct_count: None,
}
})
.collect()
}