use super::{
helpers::{expr_applicable_for_cols, pruned_partition_list},
ListingTableUrl, PartitionedFile,
};
use crate::{
datasource::file_format::{file_compression_type::FileCompressionType, FileFormat},
datasource::{create_ordering, physical_plan::FileSinkConfig},
execution::context::SessionState,
};
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion_catalog::{Session, TableProvider};
use datafusion_common::{
config_datafusion_err, config_err, internal_err, plan_err, project_schema,
stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
};
use datafusion_datasource::{
compute_all_files_statistics,
file::FileSource,
file_groups::FileGroup,
file_scan_config::{FileScanConfig, FileScanConfigBuilder},
schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory},
};
use datafusion_execution::{
cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache},
config::SessionConfig,
};
use datafusion_expr::{
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum SchemaSource {
#[default]
Unset,
Inferred,
Specified,
}
#[derive(Debug, Clone, Default)]
pub struct ListingTableConfig {
pub table_paths: Vec<ListingTableUrl>,
pub file_schema: Option<SchemaRef>,
pub options: Option<ListingOptions>,
schema_source: SchemaSource,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}
impl ListingTableConfig {
pub fn new(table_path: ListingTableUrl) -> Self {
Self {
table_paths: vec![table_path],
..Default::default()
}
}
pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
Self {
table_paths,
..Default::default()
}
}
pub fn schema_source(&self) -> SchemaSource {
self.schema_source
}
pub fn with_schema(self, schema: SchemaRef) -> Self {
debug_assert!(
self.options.is_some() || cfg!(test),
"ListingTableConfig::with_schema called without options set. \
Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code."
);
Self {
file_schema: Some(schema),
schema_source: SchemaSource::Specified,
..self
}
}
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
debug_assert!(
!self.table_paths.is_empty() || cfg!(test),
"ListingTableConfig::with_listing_options called without table_paths set. \
Consider calling new() or new_with_multi_paths() first to establish table paths."
);
Self {
options: Some(listing_options),
..self
}
}
fn infer_file_extension_and_compression_type(
path: &str,
) -> Result<(String, Option<String>)> {
let mut exts = path.rsplit('.');
let split = exts.next().unwrap_or("");
let file_compression_type = FileCompressionType::from_str(split)
.unwrap_or(FileCompressionType::UNCOMPRESSED);
if file_compression_type.is_compressed() {
let split2 = exts.next().unwrap_or("");
Ok((split2.to_string(), Some(split.to_string())))
} else {
Ok((split.to_string(), None))
}
}
pub async fn infer_options(self, state: &dyn Session) -> Result<Self> {
let store = if let Some(url) = self.table_paths.first() {
state.runtime_env().object_store(url)?
} else {
return Ok(self);
};
let file = self
.table_paths
.first()
.unwrap()
.list_all_files(state, store.as_ref(), "")
.await?
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
let (file_extension, maybe_compression_type) =
ListingTableConfig::infer_file_extension_and_compression_type(
file.location.as_ref(),
)?;
let mut format_options = HashMap::new();
if let Some(ref compression_type) = maybe_compression_type {
format_options
.insert("format.compression".to_string(), compression_type.clone());
}
let state = state.as_any().downcast_ref::<SessionState>().unwrap();
let file_format = state
.get_file_format_factory(&file_extension)
.ok_or(config_datafusion_err!(
"No file_format found with extension {file_extension}"
))?
.create(state, &format_options)?;
let listing_file_extension =
if let Some(compression_type) = maybe_compression_type {
format!("{}.{}", &file_extension, &compression_type)
} else {
file_extension
};
let listing_options = ListingOptions::new(file_format)
.with_file_extension(listing_file_extension)
.with_target_partitions(state.config().target_partitions())
.with_collect_stat(state.config().collect_statistics());
Ok(Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
schema_source: self.schema_source,
schema_adapter_factory: self.schema_adapter_factory,
expr_adapter_factory: self.expr_adapter_factory,
})
}
pub async fn infer_schema(self, state: &dyn Session) -> Result<Self> {
match self.options {
Some(options) => {
let ListingTableConfig {
table_paths,
file_schema,
options: _,
schema_source,
schema_adapter_factory,
expr_adapter_factory: physical_expr_adapter_factory,
} = self;
let (schema, new_schema_source) = match file_schema {
Some(schema) => (schema, schema_source), None => {
if let Some(url) = table_paths.first() {
(
options.infer_schema(state, url).await?,
SchemaSource::Inferred,
)
} else {
(Arc::new(Schema::empty()), SchemaSource::Inferred)
}
}
};
Ok(Self {
table_paths,
file_schema: Some(schema),
options: Some(options),
schema_source: new_schema_source,
schema_adapter_factory,
expr_adapter_factory: physical_expr_adapter_factory,
})
}
None => internal_err!("No `ListingOptions` set for inferring schema"),
}
}
pub async fn infer(self, state: &dyn Session) -> Result<Self> {
self.infer_options(state).await?.infer_schema(state).await
}
pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result<Self> {
match self.options {
Some(options) => {
let Some(url) = self.table_paths.first() else {
return config_err!("No table path found");
};
let partitions = options
.infer_partitions(state, url)
.await?
.into_iter()
.map(|col_name| {
(
col_name,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
)
})
.collect::<Vec<_>>();
let options = options.with_table_partition_cols(partitions);
Ok(Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(options),
schema_source: self.schema_source,
schema_adapter_factory: self.schema_adapter_factory,
expr_adapter_factory: self.expr_adapter_factory,
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
}
}
pub fn with_schema_adapter_factory(
self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self
}
}
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.as_ref()
}
pub fn with_expr_adapter_factory(
self,
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
) -> Self {
Self {
expr_adapter_factory: Some(expr_adapter_factory),
..self
}
}
}
#[derive(Clone, Debug)]
pub struct ListingOptions {
pub file_extension: String,
pub format: Arc<dyn FileFormat>,
pub table_partition_cols: Vec<(String, DataType)>,
pub collect_stat: bool,
pub target_partitions: usize,
pub file_sort_order: Vec<Vec<SortExpr>>,
}
impl ListingOptions {
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
file_extension: format.get_ext(),
format,
table_partition_cols: vec![],
collect_stat: false,
target_partitions: 1,
file_sort_order: vec![],
}
}
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
self = self.with_target_partitions(config.target_partitions());
self = self.with_collect_stat(config.collect_statistics());
self
}
pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
self.file_extension = file_extension.into();
self
}
pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
where
S: Into<String>,
{
if let Some(file_extension) = file_extension {
self.file_extension = file_extension.into();
}
self
}
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
self.collect_stat = collect_stat;
self
}
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = target_partitions;
self
}
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
pub async fn infer_schema<'a>(
&'a self,
state: &dyn Session,
table_path: &'a ListingTableUrl,
) -> Result<SchemaRef> {
let store = state.runtime_env().object_store(table_path)?;
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
.try_filter(|object_meta| future::ready(object_meta.size > 0))
.try_collect()
.await?;
let schema = self.format.infer_schema(state, &store, &files).await?;
Ok(schema)
}
pub async fn validate_partitions(
&self,
state: &dyn Session,
table_path: &ListingTableUrl,
) -> Result<()> {
if self.table_partition_cols.is_empty() {
return Ok(());
}
if !table_path.is_collection() {
return plan_err!(
"Can't create a partitioned table backed by a single file, \
perhaps the URL is missing a trailing slash?"
);
}
let inferred = self.infer_partitions(state, table_path).await?;
if inferred.is_empty() {
return Ok(());
}
let table_partition_names = self
.table_partition_cols
.iter()
.map(|(col_name, _)| col_name.clone())
.collect_vec();
if inferred.len() < table_partition_names.len() {
return plan_err!(
"Inferred partitions to be {:?}, but got {:?}",
inferred,
table_partition_names
);
}
for (idx, col) in table_partition_names.iter().enumerate() {
if &inferred[idx] != col {
return plan_err!(
"Inferred partitions to be {:?}, but got {:?}",
inferred,
table_partition_names
);
}
}
Ok(())
}
pub(crate) async fn infer_partitions(
&self,
state: &dyn Session,
table_path: &ListingTableUrl,
) -> Result<Vec<String>> {
let store = state.runtime_env().object_store(table_path)?;
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
.take(10)
.try_collect()
.await?;
let stripped_path_parts = files.iter().map(|file| {
table_path
.strip_prefix(&file.location)
.unwrap()
.collect_vec()
});
let partition_keys = stripped_path_parts
.map(|path_parts| {
path_parts
.into_iter()
.rev()
.skip(1) .rev()
.map(|s| s.split('=').take(1).collect())
.collect_vec()
})
.collect_vec();
match partition_keys.into_iter().all_equal_value() {
Ok(v) => Ok(v),
Err(None) => Ok(vec![]),
Err(Some(diff)) => {
let mut sorted_diff = [diff.0, diff.1];
sorted_diff.sort();
plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
}
}
}
}
#[derive(Debug, Clone)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
file_schema: SchemaRef,
table_schema: SchemaRef,
schema_source: SchemaSource,
options: ListingOptions,
definition: Option<String>,
collected_statistics: FileStatisticsCache,
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}
impl ListingTable {
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
let schema_source = config.schema_source();
let file_schema = config
.file_schema
.ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
let options = config.options.ok_or_else(|| {
DataFusionError::Internal("No ListingOptions provided".into())
})?;
let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
for (part_col_name, part_col_type) in &options.table_partition_cols {
builder.push(Field::new(part_col_name, part_col_type.clone(), false));
}
let table_schema = Arc::new(
builder
.finish()
.with_metadata(file_schema.metadata().clone()),
);
let table = Self {
table_paths: config.table_paths,
file_schema,
table_schema,
schema_source,
options,
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
constraints: Constraints::default(),
column_defaults: HashMap::new(),
schema_adapter_factory: config.schema_adapter_factory,
expr_adapter_factory: config.expr_adapter_factory,
};
Ok(table)
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}
pub fn with_column_defaults(
mut self,
column_defaults: HashMap<String, Expr>,
) -> Self {
self.column_defaults = column_defaults;
self
}
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self
}
pub fn with_definition(mut self, definition: Option<String>) -> Self {
self.definition = definition;
self
}
pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
&self.table_paths
}
pub fn options(&self) -> &ListingOptions {
&self.options
}
pub fn schema_source(&self) -> SchemaSource {
self.schema_source
}
pub fn with_schema_adapter_factory(
self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self
}
}
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.as_ref()
}
fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
let table_schema = self.schema();
match &self.schema_adapter_factory {
Some(factory) => {
factory.create_with_projected_schema(Arc::clone(&table_schema))
}
None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
}
}
fn create_file_source_with_schema_adapter(&self) -> Result<Arc<dyn FileSource>> {
let mut source = self.options.format.file_source();
if let Some(factory) = &self.schema_adapter_factory {
source = source.with_schema_adapter_factory(Arc::clone(factory))?;
}
Ok(source)
}
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
create_ordering(&self.table_schema, &self.options.file_sort_order)
}
}
fn can_be_evaluated_for_partition_pruning(
partition_column_names: &[&str],
expr: &Expr,
) -> bool {
!partition_column_names.is_empty()
&& expr_applicable_for_cols(partition_column_names, expr)
}
#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
fn constraints(&self) -> Option<&Constraints> {
Some(&self.constraints)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;
let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
let (partition_filters, filters): (Vec<_>, Vec<_>) =
filters.iter().cloned().partition(|filter| {
can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
});
let statistic_file_limit = if filters.is_empty() { limit } else { None };
let (mut partitioned_file_lists, statistics) = self
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
.await?;
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}
let output_ordering = self.try_create_output_ordering()?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
)
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
}
}
None => {} };
let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
let file_source = self.create_file_source_with_schema_adapter()?;
self.options
.format
.create_physical_plan(
state,
FileScanConfigBuilder::new(
object_store_url,
Arc::clone(&self.file_schema),
file_source,
)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
)
.await
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let partition_column_names = self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>();
filters
.iter()
.map(|filter| {
if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
{
return Ok(TableProviderFilterPushDown::Exact);
}
Ok(TableProviderFilterPushDown::Inexact)
})
.collect()
}
fn get_table_definition(&self) -> Option<&str> {
self.definition.as_deref()
}
async fn insert_into(
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
self.schema()
.logically_equivalent_names_and_types(&input.schema())?;
let table_path = &self.table_paths()[0];
if !table_path.is_collection() {
return plan_err!(
"Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
);
}
let store = state.runtime_env().object_store(table_path)?;
let file_list_stream = pruned_partition_list(
state,
store.as_ref(),
table_path,
&[],
&self.options.file_extension,
&self.options.table_partition_cols,
)
.await?;
let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
let keep_partition_by_columns =
state.config_options().execution.keep_partition_by_columns;
let config = FileSinkConfig {
original_url: String::default(),
object_store_url: self.table_paths()[0].object_store(),
table_paths: self.table_paths().clone(),
file_group,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
insert_op,
keep_partition_by_columns,
file_extension: self.options().format.get_ext(),
};
let orderings = self.try_create_output_ordering()?;
let order_requirements = orderings.into_iter().next().map(Into::into);
self.options()
.format
.create_writer_physical_plan(input, state, config, order_requirements)
.await
}
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
}
impl ListingTable {
async fn list_files_for_scan<'a>(
&'a self,
ctx: &'a dyn Session,
filters: &'a [Expr],
limit: Option<usize>,
) -> Result<(Vec<FileGroup>, Statistics)> {
let store = if let Some(url) = self.table_paths.first() {
ctx.runtime_env().object_store(url)?
} else {
return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
};
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
pruned_partition_list(
ctx,
store.as_ref(),
table_path,
filters,
&self.options.file_extension,
&self.options.table_partition_cols,
)
}))
.await?;
let meta_fetch_concurrency =
ctx.config_options().execution.meta_fetch_concurrency;
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
self.do_collect_statistics(ctx, &store, &part_file).await?
} else {
Arc::new(Statistics::new_unknown(&self.file_schema))
};
Ok(part_file.with_statistics(statistics))
})
.boxed()
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;
let file_groups = file_group.split_files(self.options.target_partitions);
let (mut file_groups, mut stats) = compute_all_files_statistics(
file_groups,
self.schema(),
self.options.collect_stat,
inexact_stats,
)?;
let schema_adapter = self.create_schema_adapter();
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
stats.column_statistics =
schema_mapper.map_column_statistics(&stats.column_statistics)?;
file_groups.iter_mut().try_for_each(|file_group| {
if let Some(stat) = file_group.statistics_mut() {
stat.column_statistics =
schema_mapper.map_column_statistics(&stat.column_statistics)?;
}
Ok::<_, DataFusionError>(())
})?;
Ok((file_groups, stats))
}
async fn do_collect_statistics(
&self,
ctx: &dyn Session,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> Result<Arc<Statistics>> {
match self
.collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics),
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
store,
Arc::clone(&self.file_schema),
&part_file.object_meta,
)
.await?;
let statistics = Arc::new(statistics);
self.collected_statistics.put_with_extra(
&part_file.object_meta.location,
Arc::clone(&statistics),
&part_file.object_meta,
);
Ok(statistics)
}
}
}
}
async fn get_files_with_limit(
files: impl Stream<Item = Result<PartitionedFile>>,
limit: Option<usize>,
collect_stats: bool,
) -> Result<(FileGroup, bool)> {
let mut file_group = FileGroup::default();
let mut all_files = Box::pin(files.fuse());
enum ProcessingState {
ReadingFiles,
ReachedLimit,
}
let mut state = ProcessingState::ReadingFiles;
let mut num_rows = Precision::Absent;
while let Some(file_result) = all_files.next().await {
if matches!(state, ProcessingState::ReachedLimit) {
break;
}
let file = file_result?;
if collect_stats {
if let Some(file_stats) = &file.statistics {
num_rows = if file_group.is_empty() {
file_stats.num_rows
} else {
num_rows.add(&file_stats.num_rows)
};
}
}
file_group.push(file);
if let Some(limit) = limit {
if let Precision::Exact(row_count) = num_rows {
if row_count > limit {
state = ProcessingState::ReachedLimit;
}
}
}
}
let inexact_stats = all_files.next().await.is_some();
Ok((file_group, inexact_stats))
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::prelude::*;
use crate::{
datasource::{
file_format::csv::CsvFormat, file_format::json::JsonFormat,
provider_as_source, DefaultTableSource, MemTable,
},
execution::options::ArrowReadOptions,
test::{
columns, object_store::ensure_head_concurrency,
object_store::make_test_store_and_state, object_store::register_test_store,
},
};
use arrow::{compute::SortOptions, record_batch::RecordBatch};
use datafusion_common::{
assert_contains,
stats::Precision,
test_util::{batches_to_string, datafusion_test_data},
ColumnStatistics, ScalarValue,
};
use datafusion_datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
use rstest::rstest;
use std::io::Write;
use tempfile::TempDir;
use url::Url;
const DUMMY_NULL_COUNT: Precision<usize> = Precision::Exact(42);
fn create_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("c1", DataType::Float32, true),
Field::new("c2", DataType::Float64, true),
Field::new("c3", DataType::Boolean, true),
Field::new("c4", DataType::Utf8, true),
]))
}
fn generate_test_files(prefix: &str, count: usize) -> Vec<String> {
generate_test_files_with_start(prefix, count, 0)
}
fn generate_test_files_with_start(
prefix: &str,
count: usize,
start_index: usize,
) -> Vec<String> {
(start_index..start_index + count)
.map(|i| format!("{prefix}/file{i}"))
.collect()
}
#[tokio::test]
async fn test_schema_source_tracking_comprehensive() -> Result<()> {
let ctx = SessionContext::new();
let testdata = datafusion_test_data();
let filename = format!("{testdata}/aggregate_simple.csv");
let table_path = ListingTableUrl::parse(filename).unwrap();
let config = ListingTableConfig::new(table_path.clone());
assert_eq!(config.schema_source(), SchemaSource::Unset);
let provided_schema = create_test_schema();
let config_with_schema = config.clone().with_schema(provided_schema.clone());
assert_eq!(config_with_schema.schema_source(), SchemaSource::Specified);
let format = CsvFormat::default();
let options = ListingOptions::new(Arc::new(format));
let config_with_options = config.with_listing_options(options.clone());
assert_eq!(config_with_options.schema_source(), SchemaSource::Unset);
let config_with_inferred = config_with_options.infer_schema(&ctx.state()).await?;
assert_eq!(config_with_inferred.schema_source(), SchemaSource::Inferred);
let config_with_schema_and_options = config_with_schema
.clone()
.with_listing_options(options.clone());
assert_eq!(
config_with_schema_and_options.schema_source(),
SchemaSource::Specified
);
let config_with_schema_and_infer = config_with_schema_and_options
.clone()
.infer(&ctx.state())
.await?;
assert_eq!(
config_with_schema_and_infer.schema_source(),
SchemaSource::Specified
);
let table_specified = ListingTable::try_new(config_with_schema_and_options)?;
assert_eq!(table_specified.schema_source(), SchemaSource::Specified);
let table_inferred = ListingTable::try_new(config_with_inferred)?;
assert_eq!(table_inferred.schema_source(), SchemaSource::Inferred);
Ok(())
}
#[tokio::test]
async fn read_single_file() -> Result<()> {
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_collect_statistics(true),
);
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
let projection = None;
let exec = table
.scan(&ctx.state(), projection, &[], None)
.await
.expect("Scan table");
assert_eq!(exec.children().len(), 0);
assert_eq!(exec.output_partitioning().partition_count(), 1);
assert_eq!(
exec.partition_statistics(None)?.num_rows,
Precision::Exact(8)
);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
);
Ok(())
}
#[cfg(feature = "parquet")]
#[tokio::test]
async fn test_try_create_output_ordering() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
let ctx = SessionContext::new();
let state = ctx.state();
let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = options.infer_schema(&state, &table_path).await.unwrap();
use crate::datasource::file_format::parquet::ParquetFormat;
use datafusion_physical_plan::expressions::col as physical_col;
use std::ops::Add;
let cases = vec![
(vec![], Ok(Vec::<LexOrdering>::new())),
(
vec![vec![
col("int_col").add(lit(1)).sort(true, true),
]],
Err("Expected single column reference in sort_order[0][0], got int_col + Int32(1)"),
),
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![[PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}].into(),
])
),
(
vec![vec![
col("string_col").sort(true, false),
col("int_col").sort(false, true),
]],
Ok(vec![[
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
].into(),
])
),
];
for (file_sort_order, expected_result) in cases {
let options = options.clone().with_file_sort_order(file_sort_order);
let config = ListingTableConfig::new(table_path.clone())
.with_listing_options(options)
.with_schema(schema.clone());
let table =
ListingTable::try_new(config.clone()).expect("Creating the table");
let ordering_result = table.try_create_output_ordering();
match (expected_result, ordering_result) {
(Ok(expected), Ok(result)) => {
assert_eq!(expected, result);
}
(Err(expected), Err(result)) => {
let result = result.to_string();
let expected = expected.to_string();
assert_contains!(result.to_string(), expected);
}
(expected_result, ordering_result) => {
panic!(
"expected: {expected_result:#?}\n\nactual:{ordering_result:#?}"
);
}
}
}
}
#[tokio::test]
async fn read_empty_table() -> Result<()> {
let ctx = SessionContext::new();
let path = String::from("table/p1=v1/file.json");
register_test_store(&ctx, &[(&path, 100)]);
let format = JsonFormat::default();
let ext = format.get_ext();
let opt = ListingOptions::new(Arc::new(format))
.with_file_extension(ext)
.with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
.with_target_partitions(4);
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(file_schema);
let table = ListingTable::try_new(config)?;
assert_eq!(
columns(&table.schema()),
vec!["a".to_owned(), "p1".to_owned()]
);
let filter = Expr::not_eq(col("p1"), lit("v1"));
let scan = table
.scan(&ctx.state(), None, &[filter], None)
.await
.expect("Empty execution plan");
assert!(scan.as_any().is::<EmptyExec>());
assert_eq!(
columns(&scan.schema()),
vec!["a".to_owned(), "p1".to_owned()]
);
Ok(())
}
async fn load_table(
ctx: &SessionContext,
name: &str,
) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{testdata}/{name}");
let table_path = ListingTableUrl::parse(filename).unwrap();
let config = ListingTableConfig::new(table_path)
.infer(&ctx.state())
.await?;
let table = ListingTable::try_new(config)?;
Ok(Arc::new(table))
}
async fn assert_list_files_for_scan_grouping(
files: &[&str],
table_prefix: &str,
target_partitions: usize,
output_partitioning: usize,
file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
.with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_path = ListingTableUrl::parse(table_prefix).unwrap();
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(file_list.len(), output_partitioning);
Ok(())
}
async fn assert_list_files_for_multi_paths(
files: &[&str],
table_prefix: &[&str],
target_partitions: usize,
output_partitioning: usize,
file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
.with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_paths = table_prefix
.iter()
.map(|t| ListingTableUrl::parse(t).unwrap())
.collect();
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(file_list.len(), output_partitioning);
Ok(())
}
async fn assert_list_files_for_exact_paths(
files: &[&str],
target_partitions: usize,
output_partitioning: usize,
file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
let (store, _) = make_test_store_and_state(
&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
);
let meta_fetch_concurrency = ctx
.state()
.config_options()
.execution
.meta_fetch_concurrency;
let expected_concurrency = files.len().min(meta_fetch_concurrency);
let head_concurrency_store = ensure_head_concurrency(store, expected_concurrency);
let url = Url::parse("test://").unwrap();
ctx.register_object_store(&url, head_concurrency_store.clone());
let format = JsonFormat::default();
let opt = ListingOptions::new(Arc::new(format))
.with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_paths = files
.iter()
.map(|t| ListingTableUrl::parse(format!("test:///{t}")).unwrap())
.collect();
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(file_list.len(), output_partitioning);
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_csv_defaults() -> Result<()> {
helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
"",
Some(HashMap::from([("has_header".into(), "true".into())])),
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_json_defaults() -> Result<()> {
helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd(5)".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_enabled".into(),
"false".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_page_size_limit".into(),
"100".into(),
);
config_map.insert(
"datafusion.execution.parquet.statistics_enabled".into(),
"none".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_statistics_size".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_row_group_size".into(),
"5".into(),
);
config_map.insert(
"datafusion.execution.parquet.created_by".into(),
"datafusion test".into(),
);
config_map.insert(
"datafusion.execution.parquet.column_index_truncate_length".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.data_page_row_count_limit".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_on_write".into(),
"true".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_fpp".into(),
"0.01".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_ndv".into(),
"1000".into(),
);
config_map.insert(
"datafusion.execution.parquet.writer_version".into(),
"2.0".into(),
);
config_map.insert(
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
Some(config_map),
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.soft_max_rows_per_output_file".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd(5)".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_enabled".into(),
"false".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_page_size_limit".into(),
"100".into(),
);
config_map.insert(
"datafusion.execution.parquet.statistics_enabled".into(),
"none".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_statistics_size".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_row_group_size".into(),
"5".into(),
);
config_map.insert(
"datafusion.execution.parquet.created_by".into(),
"datafusion test".into(),
);
config_map.insert(
"datafusion.execution.parquet.column_index_truncate_length".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.data_page_row_count_limit".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.encoding".into(),
"delta_binary_packed".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_on_write".into(),
"true".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_fpp".into(),
"0.01".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_ndv".into(),
"1000".into(),
);
config_map.insert(
"datafusion.execution.parquet.writer_version".into(),
"2.0".into(),
);
config_map.insert(
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
helper_test_append_new_files_to_table(
ParquetFormat::default().get_ext(),
FileCompressionType::UNCOMPRESSED,
Some(config_map),
2,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
) -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd".into(),
);
let e = helper_test_append_new_files_to_table(
ParquetFormat::default().get_ext(),
FileCompressionType::UNCOMPRESSED,
Some(config_map),
2,
)
.await
.expect_err("Example should fail!");
assert_eq!(e.strip_backtrace(), "Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)");
Ok(())
}
async fn helper_test_append_new_files_to_table(
file_type_ext: String,
file_compression_type: FileCompressionType,
session_config_map: Option<HashMap<String, String>>,
expected_n_files_per_insert: usize,
) -> Result<()> {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(&cfg)?;
SessionContext::new_with_config(config)
}
None => SessionContext::new(),
};
let schema = Arc::new(Schema::new(vec![Field::new(
"column1",
DataType::Int32,
false,
)]));
let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
Box::new(Expr::Column("column1".into())),
Operator::GtEq,
Box::new(Expr::Literal(ScalarValue::Int32(Some(0)), None)),
));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(arrow::array::Int32Array::from(vec![
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
]))],
)?;
let tmp_dir = TempDir::new()?;
match file_type_ext.as_str() {
"csv" => {
session_ctx
.register_csv(
"t",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new()
.schema(schema.as_ref())
.file_compression_type(file_compression_type),
)
.await?;
}
"json" => {
session_ctx
.register_json(
"t",
tmp_dir.path().to_str().unwrap(),
NdJsonReadOptions::default()
.schema(schema.as_ref())
.file_compression_type(file_compression_type),
)
.await?;
}
#[cfg(feature = "parquet")]
"parquet" => {
session_ctx
.register_parquet(
"t",
tmp_dir.path().to_str().unwrap(),
ParquetReadOptions::default().schema(schema.as_ref()),
)
.await?;
}
#[cfg(feature = "avro")]
"avro" => {
session_ctx
.register_avro(
"t",
tmp_dir.path().to_str().unwrap(),
AvroReadOptions::default().schema(schema.as_ref()),
)
.await?;
}
"arrow" => {
session_ctx
.register_arrow(
"t",
tmp_dir.path().to_str().unwrap(),
ArrowReadOptions::default().schema(schema.as_ref()),
)
.await?;
}
_ => panic!("Unrecognized file extension {file_type_ext}"),
}
let source_table = Arc::new(MemTable::try_new(
schema.clone(),
vec![vec![batch.clone(), batch.clone()]],
)?);
session_ctx.register_table("source", source_table.clone())?;
let source = provider_as_source(source_table);
let target = session_ctx.table_provider("t").await?;
let target = Arc::new(DefaultTableSource::new(target));
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
.filter(filter_predicate)?
.build()?;
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
.build()?;
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.await?;
let res = collect(plan, session_ctx.task_ctx()).await?;
insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
+-------+
| count |
+-------+
| 20 |
+-------+
"###);}
let batches = session_ctx
.sql("select count(*) as count from t")
.await?
.collect()
.await?;
insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
+-------+
| count |
+-------+
| 20 |
+-------+
"###);}
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, expected_n_files_per_insert);
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.await?;
let res = collect(plan, session_ctx.task_ctx()).await?;
insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
+-------+
| count |
+-------+
| 20 |
+-------+
"###);}
let batches = session_ctx
.sql("select count(*) AS count from t")
.await?
.collect()
.await?;
insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
+-------+
| count |
+-------+
| 40 |
+-------+
"###);}
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, expected_n_files_per_insert * 2);
Ok(())
}
async fn helper_test_insert_into_sql(
file_type: &str,
_file_compression_type: FileCompressionType,
external_table_options: &str,
session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(&cfg)?;
SessionContext::new_with_config(config)
}
None => SessionContext::new(),
};
let tmp_dir = TempDir::new()?;
let str_path = tmp_dir
.path()
.to_str()
.expect("Temp path should convert to &str");
session_ctx
.sql(&format!(
"create external table foo(a varchar, b varchar, c int) \
stored as {file_type} \
location '{str_path}' \
{external_table_options}"
))
.await?
.collect()
.await?;
session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
.await?
.collect()
.await?;
let batches = session_ctx
.sql("select * from foo")
.await?
.collect()
.await?;
insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
+-----+-----+---+
| a | b | c |
+-----+-----+---+
| foo | bar | 1 |
| foo | bar | 2 |
| foo | bar | 3 |
+-----+-----+---+
"###);}
Ok(())
}
#[tokio::test]
async fn test_infer_options_compressed_csv() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{testdata}/csv/aggregate_test_100.csv.gz");
let table_path = ListingTableUrl::parse(filename).unwrap();
let ctx = SessionContext::new();
let config = ListingTableConfig::new(table_path);
let config_with_opts = config.infer_options(&ctx.state()).await?;
let config_with_schema = config_with_opts.infer_schema(&ctx.state()).await?;
let schema = config_with_schema.file_schema.unwrap();
assert_eq!(schema.fields.len(), 13);
Ok(())
}
#[tokio::test]
async fn infer_preserves_provided_schema() -> Result<()> {
let ctx = SessionContext::new();
let testdata = datafusion_test_data();
let filename = format!("{testdata}/aggregate_simple.csv");
let table_path = ListingTableUrl::parse(filename).unwrap();
let provided_schema = create_test_schema();
let config =
ListingTableConfig::new(table_path).with_schema(Arc::clone(&provided_schema));
let config = config.infer(&ctx.state()).await?;
assert_eq!(*config.file_schema.unwrap(), *provided_schema);
Ok(())
}
#[tokio::test]
async fn test_listing_table_config_with_multiple_files_comprehensive() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;
let file_path1 = tmp_dir.path().join("file1.csv");
let file_path2 = tmp_dir.path().join("file2.csv");
let mut file1 = std::fs::File::create(&file_path1)?;
writeln!(file1, "c1,c2,c3")?;
writeln!(file1, "1,2,3")?;
writeln!(file1, "4,5,6")?;
let mut file2 = std::fs::File::create(&file_path2)?;
writeln!(file2, "c1,c2,c3,c4")?;
writeln!(file2, "7,8,9,10")?;
writeln!(file2, "11,12,13,14")?;
let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?;
let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?;
let format = CsvFormat::default().with_has_header(true);
let options = ListingOptions::new(Arc::new(format));
let config1 = ListingTableConfig::new_with_multi_paths(vec![
table_path1.clone(),
table_path2.clone(),
])
.with_listing_options(options.clone());
let config1 = config1.infer_schema(&ctx.state()).await?;
assert_eq!(config1.schema_source(), SchemaSource::Inferred);
let schema1 = config1.file_schema.as_ref().unwrap().clone();
assert_eq!(schema1.fields().len(), 3);
assert_eq!(schema1.field(0).name(), "c1");
assert_eq!(schema1.field(1).name(), "c2");
assert_eq!(schema1.field(2).name(), "c3");
let schema_3cols = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Utf8, true),
]));
let config2 = ListingTableConfig::new_with_multi_paths(vec![
table_path1.clone(),
table_path2.clone(),
])
.with_schema(schema_3cols)
.with_listing_options(options.clone());
let config2 = config2.infer_schema(&ctx.state()).await?;
assert_eq!(config2.schema_source(), SchemaSource::Specified);
let schema2 = config2.file_schema.as_ref().unwrap().clone();
assert_eq!(schema2.fields().len(), 3);
assert_eq!(schema2.field(0).name(), "c1");
assert_eq!(schema2.field(1).name(), "c2");
assert_eq!(schema2.field(2).name(), "c3");
let schema_4cols = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Utf8, true),
Field::new("c4", DataType::Utf8, true),
]));
let config3 = ListingTableConfig::new_with_multi_paths(vec![
table_path1.clone(),
table_path2.clone(),
])
.with_schema(schema_4cols)
.with_listing_options(options.clone());
let config3 = config3.infer_schema(&ctx.state()).await?;
assert_eq!(config3.schema_source(), SchemaSource::Specified);
let schema3 = config3.file_schema.as_ref().unwrap().clone();
assert_eq!(schema3.fields().len(), 4);
assert_eq!(schema3.field(0).name(), "c1");
assert_eq!(schema3.field(1).name(), "c2");
assert_eq!(schema3.field(2).name(), "c3");
assert_eq!(schema3.field(3).name(), "c4");
let config4 = ListingTableConfig::new_with_multi_paths(vec![
table_path2.clone(),
table_path1.clone(),
])
.with_listing_options(options);
let config4 = config4.infer_schema(&ctx.state()).await?;
let schema4 = config4.file_schema.as_ref().unwrap().clone();
assert_eq!(schema4.fields().len(), 4);
assert_eq!(schema4.field(0).name(), "c1");
assert_eq!(schema4.field(1).name(), "c2");
assert_eq!(schema4.field(2).name(), "c3");
assert_eq!(schema4.field(3).name(), "c4");
Ok(())
}
#[tokio::test]
async fn test_list_files_configurations() -> Result<()> {
let test_cases = vec![
(
"Single path, more partitions than files",
generate_test_files("bucket/key-prefix", 5),
vec!["test:///bucket/key-prefix/"],
12,
5,
Some(""),
),
(
"Single path, equal partitions and files",
generate_test_files("bucket/key-prefix", 4),
vec!["test:///bucket/key-prefix/"],
4,
4,
Some(""),
),
(
"Single path, more files than partitions",
generate_test_files("bucket/key-prefix", 5),
vec!["test:///bucket/key-prefix/"],
2,
2,
Some(""),
),
(
"Multi path, more partitions than files",
{
let mut files = generate_test_files("bucket/key1", 3);
files.extend(generate_test_files_with_start("bucket/key2", 2, 3));
files.extend(generate_test_files_with_start("bucket/key3", 1, 5));
files
},
vec!["test:///bucket/key1/", "test:///bucket/key2/"],
12,
5,
Some(""),
),
(
"No files",
vec![],
vec!["test:///bucket/key-prefix/"],
2,
0,
Some(""),
),
(
"Exact paths test",
{
let mut files = generate_test_files("bucket/key1", 3);
files.extend(generate_test_files_with_start("bucket/key2", 2, 3));
files
},
vec![
"test:///bucket/key1/file0",
"test:///bucket/key1/file1",
"test:///bucket/key1/file2",
"test:///bucket/key2/file3",
"test:///bucket/key2/file4",
],
12,
5,
Some(""),
),
];
for (test_name, files, paths, target_partitions, expected_partitions, file_ext) in
test_cases
{
println!("Running test: {test_name}");
if files.is_empty() {
assert_list_files_for_multi_paths(
&[],
&paths,
target_partitions,
expected_partitions,
file_ext,
)
.await?;
} else if paths.len() == 1 {
let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
assert_list_files_for_scan_grouping(
&file_refs,
paths[0],
target_partitions,
expected_partitions,
file_ext,
)
.await?;
} else if paths[0].contains("test:///bucket/key") {
let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
assert_list_files_for_multi_paths(
&file_refs,
&paths,
target_partitions,
expected_partitions,
file_ext,
)
.await?;
} else {
let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
assert_list_files_for_exact_paths(
&file_refs,
target_partitions,
expected_partitions,
file_ext,
)
.await?;
}
}
Ok(())
}
#[cfg(feature = "parquet")]
#[tokio::test]
async fn test_table_stats_behaviors() -> Result<()> {
use crate::datasource::file_format::parquet::ParquetFormat;
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
let ctx = SessionContext::new();
let state = ctx.state();
let opt_default = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema_default = opt_default.infer_schema(&state, &table_path).await?;
let config_default = ListingTableConfig::new(table_path.clone())
.with_listing_options(opt_default)
.with_schema(schema_default);
let table_default = ListingTable::try_new(config_default)?;
let exec_default = table_default.scan(&state, None, &[], None).await?;
assert_eq!(
exec_default.partition_statistics(None)?.num_rows,
Precision::Absent
);
assert_eq!(
exec_default.partition_statistics(None)?.total_byte_size,
Precision::Absent
);
let opt_disabled = ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_collect_stat(false);
let schema_disabled = opt_disabled.infer_schema(&state, &table_path).await?;
let config_disabled = ListingTableConfig::new(table_path.clone())
.with_listing_options(opt_disabled)
.with_schema(schema_disabled);
let table_disabled = ListingTable::try_new(config_disabled)?;
let exec_disabled = table_disabled.scan(&state, None, &[], None).await?;
assert_eq!(
exec_disabled.partition_statistics(None)?.num_rows,
Precision::Absent
);
assert_eq!(
exec_disabled.partition_statistics(None)?.total_byte_size,
Precision::Absent
);
let opt_enabled = ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_collect_stat(true);
let schema_enabled = opt_enabled.infer_schema(&state, &table_path).await?;
let config_enabled = ListingTableConfig::new(table_path)
.with_listing_options(opt_enabled)
.with_schema(schema_enabled);
let table_enabled = ListingTable::try_new(config_enabled)?;
let exec_enabled = table_enabled.scan(&state, None, &[], None).await?;
assert_eq!(
exec_enabled.partition_statistics(None)?.num_rows,
Precision::Exact(8)
);
assert_eq!(
exec_enabled.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
);
Ok(())
}
#[tokio::test]
async fn test_insert_into_parameterized() -> Result<()> {
let test_cases = vec![
("json", 10, 10, 2),
("csv", 10, 10, 2),
#[cfg(feature = "parquet")]
("parquet", 10, 10, 2),
#[cfg(feature = "parquet")]
("parquet", 20, 20, 1),
];
for (format, batch_size, soft_max_rows, expected_files) in test_cases {
println!("Testing insert with format: {format}, batch_size: {batch_size}, expected files: {expected_files}");
let mut config_map = HashMap::new();
config_map.insert(
"datafusion.execution.batch_size".into(),
batch_size.to_string(),
);
config_map.insert(
"datafusion.execution.soft_max_rows_per_output_file".into(),
soft_max_rows.to_string(),
);
let file_extension = match format {
"json" => JsonFormat::default().get_ext(),
"csv" => CsvFormat::default().get_ext(),
#[cfg(feature = "parquet")]
"parquet" => ParquetFormat::default().get_ext(),
_ => unreachable!("Unsupported format"),
};
helper_test_append_new_files_to_table(
file_extension,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
expected_files,
)
.await?;
}
Ok(())
}
#[tokio::test]
async fn test_statistics_mapping_with_custom_factory() -> Result<()> {
let ctx = SessionContext::new();
let table = create_test_listing_table_with_json_and_adapter(
&ctx,
false,
Arc::new(NullStatsAdapterFactory {}),
)?;
let (groups, stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(stats.column_statistics[0].null_count, DUMMY_NULL_COUNT);
for g in groups {
if let Some(s) = g.file_statistics(None) {
assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT);
}
}
Ok(())
}
#[tokio::test]
async fn test_statistics_mapping_with_default_factory() -> Result<()> {
let ctx = SessionContext::new();
let path = "table/file.json";
register_test_store(&ctx, &[(path, 10)]);
let format = JsonFormat::default();
let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(false);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
assert!(table.schema_adapter_factory().is_none());
let scan_result = table.scan(&ctx.state(), None, &[], None).await;
assert!(
scan_result.is_ok(),
"Scan should succeed with default schema adapter"
);
let (groups, _stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert!(
!groups.is_empty(),
"Should list files successfully with default adapter"
);
Ok(())
}
#[rstest]
#[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")]
#[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")]
#[case(
MapSchemaError::InvalidProjection,
"Invalid projection in schema mapping"
)]
#[tokio::test]
async fn test_schema_adapter_map_schema_errors(
#[case] error_type: MapSchemaError,
#[case] expected_error_msg: &str,
) -> Result<()> {
let ctx = SessionContext::new();
let table = create_test_listing_table_with_json_and_adapter(
&ctx,
false,
Arc::new(FailingMapSchemaAdapterFactory { error_type }),
)?;
let scan_result = table.scan(&ctx.state(), None, &[], None).await;
assert!(scan_result.is_err());
let error_msg = scan_result.unwrap_err().to_string();
assert!(
error_msg.contains(expected_error_msg),
"Expected error containing '{expected_error_msg}', got: {error_msg}"
);
Ok(())
}
#[tokio::test]
async fn test_schema_adapter_error_during_file_listing() -> Result<()> {
let ctx = SessionContext::new();
let table = create_test_listing_table_with_json_and_adapter(
&ctx,
true,
Arc::new(FailingMapSchemaAdapterFactory {
error_type: MapSchemaError::TypeIncompatible,
}),
)?;
let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await;
assert!(list_result.is_err());
let error_msg = list_result.unwrap_err().to_string();
assert!(
error_msg.contains("Cannot map incompatible types"),
"Expected type incompatibility error during file listing, got: {error_msg}"
);
Ok(())
}
#[derive(Debug, Copy, Clone)]
enum MapSchemaError {
TypeIncompatible,
GeneralFailure,
InvalidProjection,
}
#[derive(Debug)]
struct FailingMapSchemaAdapterFactory {
error_type: MapSchemaError,
}
impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory {
fn create(
&self,
projected_table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(FailingMapSchemaAdapter {
schema: projected_table_schema,
error_type: self.error_type,
})
}
}
#[derive(Debug)]
struct FailingMapSchemaAdapter {
schema: SchemaRef,
error_type: MapSchemaError,
}
impl SchemaAdapter for FailingMapSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.schema.field(index);
file_schema.fields.find(field.name()).map(|(i, _)| i)
}
fn map_schema(
&self,
_file_schema: &Schema,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
match self.error_type {
MapSchemaError::TypeIncompatible => {
plan_err!(
"Cannot map incompatible types: Boolean cannot be cast to Utf8"
)
}
MapSchemaError::GeneralFailure => {
plan_err!("Schema adapter mapping failed due to internal error")
}
MapSchemaError::InvalidProjection => {
plan_err!("Invalid projection in schema mapping: column index out of bounds")
}
}
}
}
#[derive(Debug)]
struct NullStatsAdapterFactory;
impl SchemaAdapterFactory for NullStatsAdapterFactory {
fn create(
&self,
projected_table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(NullStatsAdapter {
schema: projected_table_schema,
})
}
}
#[derive(Debug)]
struct NullStatsAdapter {
schema: SchemaRef,
}
impl SchemaAdapter for NullStatsAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.schema.field(index);
file_schema.fields.find(field.name()).map(|(i, _)| i)
}
fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let projection = (0..file_schema.fields().len()).collect();
Ok((Arc::new(NullStatsMapper {}), projection))
}
}
#[derive(Debug)]
struct NullStatsMapper;
impl SchemaMapper for NullStatsMapper {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
Ok(batch)
}
fn map_column_statistics(
&self,
stats: &[ColumnStatistics],
) -> Result<Vec<ColumnStatistics>> {
Ok(stats
.iter()
.map(|s| {
let mut s = s.clone();
s.null_count = DUMMY_NULL_COUNT;
s
})
.collect())
}
}
fn create_test_listing_table_with_json_and_adapter(
ctx: &SessionContext,
collect_stat: bool,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<ListingTable> {
let path = "table/file.json";
register_test_store(ctx, &[(path, 10)]);
let format = JsonFormat::default();
let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema))
.with_schema_adapter_factory(schema_adapter_factory);
ListingTable::try_new(config)
}
}