use arrow::datatypes::{DataType, SchemaRef};
use datafusion_catalog::Session;
use datafusion_common::plan_err;
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::file_format::FileFormat;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use futures::{TryStreamExt, future};
use itertools::Itertools;
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct ListingOptions {
pub file_extension: String,
pub format: Arc<dyn FileFormat>,
pub table_partition_cols: Vec<(String, DataType)>,
pub collect_stat: bool,
pub target_partitions: usize,
pub file_sort_order: Vec<Vec<SortExpr>>,
}
impl ListingOptions {
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
file_extension: format.get_ext(),
format,
table_partition_cols: vec![],
collect_stat: false,
target_partitions: 1,
file_sort_order: vec![],
}
}
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
self = self.with_target_partitions(config.target_partitions());
self = self.with_collect_stat(config.collect_statistics());
self
}
pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
self.file_extension = file_extension.into();
self
}
pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
where
S: Into<String>,
{
if let Some(file_extension) = file_extension {
self.file_extension = file_extension.into();
}
self
}
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
self.collect_stat = collect_stat;
self
}
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = target_partitions;
self
}
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
pub async fn infer_schema<'a>(
&'a self,
state: &dyn Session,
table_path: &'a ListingTableUrl,
) -> datafusion_common::Result<SchemaRef> {
let store = state.runtime_env().object_store(table_path)?;
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
.try_filter(|object_meta| future::ready(object_meta.size > 0))
.try_collect()
.await?;
let schema = self.format.infer_schema(state, &store, &files).await?;
Ok(schema)
}
pub async fn validate_partitions(
&self,
state: &dyn Session,
table_path: &ListingTableUrl,
) -> datafusion_common::Result<()> {
if self.table_partition_cols.is_empty() {
return Ok(());
}
if !table_path.is_collection() {
return plan_err!(
"Can't create a partitioned table backed by a single file, \
perhaps the URL is missing a trailing slash?"
);
}
let inferred = self.infer_partitions(state, table_path).await?;
if inferred.is_empty() {
return Ok(());
}
let table_partition_names = self
.table_partition_cols
.iter()
.map(|(col_name, _)| col_name.clone())
.collect_vec();
if inferred.len() < table_partition_names.len() {
return plan_err!(
"Inferred partitions to be {:?}, but got {:?}",
inferred,
table_partition_names
);
}
for (idx, col) in table_partition_names.iter().enumerate() {
if &inferred[idx] != col {
return plan_err!(
"Inferred partitions to be {:?}, but got {:?}",
inferred,
table_partition_names
);
}
}
Ok(())
}
pub async fn infer_partitions(
&self,
state: &dyn Session,
table_path: &ListingTableUrl,
) -> datafusion_common::Result<Vec<String>> {
let store = state.runtime_env().object_store(table_path)?;
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
.take(10)
.try_collect()
.await?;
let stripped_path_parts = files.iter().map(|file| {
table_path
.strip_prefix(&file.location)
.unwrap()
.collect_vec()
});
let partition_keys = stripped_path_parts
.map(|path_parts| {
path_parts
.into_iter()
.rev()
.skip(1) .rev()
.filter(|s| s.contains('='))
.map(|s| s.split('=').take(1).collect())
.collect_vec()
})
.collect_vec();
match partition_keys.into_iter().all_equal_value() {
Ok(v) => Ok(v),
Err(None) => Ok(vec![]),
Err(Some(diff)) => {
let mut sorted_diff = [diff.0, diff.1];
sorted_diff.sort();
plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
}
}
}
}