use std::{any::Any, sync::Arc};
use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;
use crate::{
datasource::file_format::avro::AvroFormat,
datasource::file_format::csv::CsvFormat,
datasource::file_format::json::JsonFormat,
datasource::file_format::parquet::ParquetFormat,
error::{DataFusionError, Result},
logical_plan::Expr,
physical_plan::{
empty::EmptyExec,
file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
project_schema, ExecutionPlan, Statistics,
},
};
use crate::datasource::{
datasource::TableProviderFilterPushDown, file_format::FileFormat,
get_statistics_with_limit, object_store::ObjectStore, PartitionedFile, TableProvider,
};
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
pub struct ListingTableConfig {
pub object_store: Arc<dyn ObjectStore>,
pub table_path: String,
pub file_schema: Option<SchemaRef>,
pub options: Option<ListingOptions>,
}
impl ListingTableConfig {
pub fn new(
object_store: Arc<dyn ObjectStore>,
table_path: impl Into<String>,
) -> Self {
Self {
object_store,
table_path: table_path.into(),
file_schema: None,
options: None,
}
}
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
object_store: self.object_store,
table_path: self.table_path,
file_schema: Some(schema),
options: self.options,
}
}
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
Self {
object_store: self.object_store,
table_path: self.table_path,
file_schema: self.file_schema,
options: Some(listing_options),
}
}
fn infer_format(suffix: &str) -> Result<Arc<dyn FileFormat>> {
match suffix {
"avro" => Ok(Arc::new(AvroFormat::default())),
"csv" => Ok(Arc::new(CsvFormat::default())),
"json" => Ok(Arc::new(JsonFormat::default())),
"parquet" => Ok(Arc::new(ParquetFormat::default())),
_ => Err(DataFusionError::Internal(format!(
"Unable to infer file type from suffix {}",
suffix
))),
}
}
pub async fn infer_options(self) -> Result<Self> {
let mut files = self.object_store.list_file(&self.table_path).await?;
let file = files
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
let tokens: Vec<&str> = file.path().split('.').collect();
let file_type = tokens.last().ok_or_else(|| {
DataFusionError::Internal("Unable to infer file suffix".into())
})?;
let format = ListingTableConfig::infer_format(*file_type)?;
let listing_options = ListingOptions {
format,
collect_stat: true,
file_extension: file_type.to_string(),
target_partitions: num_cpus::get(),
table_partition_cols: vec![],
};
Ok(Self {
object_store: self.object_store,
table_path: self.table_path,
file_schema: self.file_schema,
options: Some(listing_options),
})
}
pub async fn infer_schema(self) -> Result<Self> {
match self.options {
Some(options) => {
let schema = options
.infer_schema(self.object_store.clone(), self.table_path.as_str())
.await?;
Ok(Self {
object_store: self.object_store,
table_path: self.table_path,
file_schema: Some(schema),
options: Some(options),
})
}
None => Err(DataFusionError::Internal(
"No `ListingOptions` set for inferring schema".into(),
)),
}
}
pub async fn infer(self) -> Result<Self> {
self.infer_options().await?.infer_schema().await
}
}
#[derive(Clone)]
pub struct ListingOptions {
pub file_extension: String,
pub format: Arc<dyn FileFormat>,
pub table_partition_cols: Vec<String>,
pub collect_stat: bool,
pub target_partitions: usize,
}
impl ListingOptions {
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
file_extension: String::new(),
format,
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
}
}
pub async fn infer_schema<'a>(
&'a self,
object_store: Arc<dyn ObjectStore>,
path: &'a str,
) -> Result<SchemaRef> {
let file_stream = object_store
.list_file_with_suffix(path, &self.file_extension)
.await?
.map(move |file_meta| object_store.file_reader(file_meta?.sized_file));
let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?;
Ok(file_schema)
}
}
pub struct ListingTable {
object_store: Arc<dyn ObjectStore>,
table_path: String,
file_schema: SchemaRef,
table_schema: SchemaRef,
options: ListingOptions,
}
impl ListingTable {
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
let file_schema = config
.file_schema
.ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
let options = config.options.ok_or_else(|| {
DataFusionError::Internal("No ListingOptions provided".into())
})?;
let mut table_fields = file_schema.fields().clone();
for part in &options.table_partition_cols {
table_fields.push(Field::new(
part,
DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
false,
));
}
let table = Self {
object_store: config.object_store.clone(),
table_path: config.table_path.clone(),
file_schema,
table_schema: Arc::new(Schema::new(table_fields)),
options,
};
Ok(table)
}
pub fn object_store(&self) -> &Arc<dyn ObjectStore> {
&self.object_store
}
pub fn table_path(&self) -> &str {
&self.table_path
}
pub fn options(&self) -> &ListingOptions {
&self.options
}
}
#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
async fn scan(
&self,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (partitioned_file_lists, statistics) =
self.list_files_for_scan(filters, limit).await?;
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection.as_ref())?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
}
self.options
.format
.create_physical_plan(
FileScanConfig {
object_store: Arc::clone(&self.object_store),
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.clone(),
limit,
table_partition_cols: self.options.table_partition_cols.clone(),
},
filters,
)
.await
}
fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
Ok(TableProviderFilterPushDown::Exact)
} else {
Ok(TableProviderFilterPushDown::Inexact)
}
}
}
impl ListingTable {
async fn list_files_for_scan<'a>(
&'a self,
filters: &'a [Expr],
limit: Option<usize>,
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
let file_list = pruned_partition_list(
self.object_store.as_ref(),
&self.table_path,
filters,
&self.options.file_extension,
&self.options.table_partition_cols,
)
.await?;
let object_store = Arc::clone(&self.object_store);
let files = file_list.then(move |part_file| {
let object_store = object_store.clone();
async move {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
let object_reader = object_store
.file_reader(part_file.file_meta.sized_file.clone())?;
self.options.format.infer_stats(object_reader).await?
} else {
Statistics::default()
};
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
}
});
let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit).await?;
Ok((
split_files(files, self.options.target_partitions),
statistics,
))
}
}
#[cfg(test)]
mod tests {
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::{
datasource::{
file_format::{avro::AvroFormat, parquet::ParquetFormat},
object_store::local::LocalFileSystem,
},
logical_plan::{col, lit},
test::{columns, object_store::TestObjectStore},
};
use arrow::datatypes::DataType;
use super::*;
#[tokio::test]
async fn read_single_file() -> Result<()> {
let table = load_table("alltypes_plain.parquet").await?;
let projection = None;
let exec = table
.scan(&projection, &[], None)
.await
.expect("Scan table");
assert_eq!(exec.children().len(), 0);
assert_eq!(exec.output_partitioning().partition_count(), 1);
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));
Ok(())
}
#[tokio::test]
async fn load_table_stats_by_default() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = opt
.infer_schema(Arc::new(LocalFileSystem {}), &filename)
.await?;
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
let exec = table.scan(&None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));
Ok(())
}
#[tokio::test]
async fn read_empty_table() -> Result<()> {
let path = String::from("table/p1=v1/file.avro");
let store = TestObjectStore::new_arc(&[(&path, 100)]);
let opt = ListingOptions {
file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
format: Arc::new(AvroFormat {}),
table_partition_cols: vec![String::from("p1")],
target_partitions: 4,
collect_stat: true,
};
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let config = ListingTableConfig::new(store, "table/")
.with_listing_options(opt)
.with_schema(file_schema);
let table = ListingTable::try_new(config)?;
assert_eq!(
columns(&table.schema()),
vec!["a".to_owned(), "p1".to_owned()]
);
let filter = Expr::not_eq(col("p1"), lit("v1"));
let scan = table
.scan(&None, &[filter], None)
.await
.expect("Empty execution plan");
assert!(scan.as_any().is::<EmptyExec>());
assert_eq!(
columns(&scan.schema()),
vec!["a".to_owned(), "p1".to_owned()]
);
Ok(())
}
#[tokio::test]
async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
"bucket/key-prefix/",
12,
5,
)
.await?;
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
],
"bucket/key-prefix/",
4,
4,
)
.await?;
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
"bucket/key-prefix/",
2,
2,
)
.await?;
assert_list_files_for_scan_grouping(&[], "bucket/key-prefix/", 2, 0).await?;
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/other-prefix/roguefile",
],
"bucket/key-prefix/",
10,
2,
)
.await?;
Ok(())
}
async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename)
.infer()
.await?;
let table = ListingTable::try_new(config)?;
Ok(Arc::new(table))
}
async fn assert_list_files_for_scan_grouping(
files: &[&str],
table_prefix: &str,
target_partitions: usize,
output_partitioning: usize,
) -> Result<()> {
let mock_store =
TestObjectStore::new_arc(&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
let format = AvroFormat {};
let opt = ListingOptions {
file_extension: "".to_owned(),
format: Arc::new(format),
table_partition_cols: vec![],
target_partitions,
collect_stat: true,
};
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let config = ListingTableConfig::new(mock_store, table_prefix.to_owned())
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
let (file_list, _) = table.list_files_for_scan(&[], None).await?;
assert_eq!(file_list.len(), output_partitioning);
Ok(())
}
}