use std::str::FromStr;
use std::{any::Any, sync::Arc};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::FileTypeWriterOptions;
use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::datasource::{
file_format::{
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat, FileFormat,
},
get_statistics_with_limit,
listing::ListingTableUrl,
TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};
use datafusion_common::{FileCompressionType, FileType};
use super::PartitionedFile;
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
#[derive(Debug, Clone)]
pub struct ListingTableConfig {
pub table_paths: Vec<ListingTableUrl>,
pub file_schema: Option<SchemaRef>,
pub options: Option<ListingOptions>,
}
impl ListingTableConfig {
pub fn new(table_path: ListingTableUrl) -> Self {
let table_paths = vec![table_path];
Self {
table_paths,
file_schema: None,
options: None,
}
}
pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
Self {
table_paths,
file_schema: None,
options: None,
}
}
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
table_paths: self.table_paths,
file_schema: Some(schema),
options: self.options,
}
}
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
}
}
fn infer_format(path: &str) -> Result<(Arc<dyn FileFormat>, String)> {
let err_msg = format!("Unable to infer file type from path: {path}");
let mut exts = path.rsplit('.');
let mut splitted = exts.next().unwrap_or("");
let file_compression_type = FileCompressionType::from_str(splitted)
.unwrap_or(FileCompressionType::UNCOMPRESSED);
if file_compression_type.is_compressed() {
splitted = exts.next().unwrap_or("");
}
let file_type = FileType::from_str(splitted)
.map_err(|_| DataFusionError::Internal(err_msg.to_owned()))?;
let ext = file_type
.get_ext_with_compression(file_compression_type.to_owned())
.map_err(|_| DataFusionError::Internal(err_msg))?;
let file_format: Arc<dyn FileFormat> = match file_type {
FileType::ARROW => Arc::new(ArrowFormat),
FileType::AVRO => Arc::new(AvroFormat),
FileType::CSV => Arc::new(
CsvFormat::default().with_file_compression_type(file_compression_type),
),
FileType::JSON => Arc::new(
JsonFormat::default().with_file_compression_type(file_compression_type),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
};
Ok((file_format, ext))
}
pub async fn infer_options(self, state: &SessionState) -> Result<Self> {
let store = if let Some(url) = self.table_paths.get(0) {
state.runtime_env().object_store(url)?
} else {
return Ok(self);
};
let file = self
.table_paths
.get(0)
.unwrap()
.list_all_files(store.as_ref(), "")
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
let (format, file_extension) =
ListingTableConfig::infer_format(file.location.as_ref())?;
let listing_options = ListingOptions::new(format)
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions());
Ok(Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
})
}
pub async fn infer_schema(self, state: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
let schema = if let Some(url) = self.table_paths.get(0) {
options.infer_schema(state, url).await?
} else {
Arc::new(Schema::empty())
};
Ok(Self {
table_paths: self.table_paths,
file_schema: Some(schema),
options: Some(options),
})
}
None => internal_err!("No `ListingOptions` set for inferring schema"),
}
}
pub async fn infer(self, state: &SessionState) -> Result<Self> {
self.infer_options(state).await?.infer_schema(state).await
}
}
#[derive(Debug, Clone)]
pub enum ListingTableInsertMode {
AppendToFile,
AppendNewFiles,
Error,
}
impl FromStr for ListingTableInsertMode {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s_lower = s.to_lowercase();
match s_lower.as_str() {
"append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
"append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
"error" => Ok(ListingTableInsertMode::Error),
_ => Err(DataFusionError::Plan(format!(
"Unknown or unsupported insert mode {s}. Supported options are \
append_to_file, append_new_files, and error."
))),
}
}
}
#[derive(Clone, Debug)]
pub struct ListingOptions {
pub file_extension: String,
pub format: Arc<dyn FileFormat>,
pub table_partition_cols: Vec<(String, DataType)>,
pub collect_stat: bool,
pub target_partitions: usize,
pub file_sort_order: Vec<Vec<Expr>>,
pub infinite_source: bool,
pub insert_mode: ListingTableInsertMode,
pub single_file: bool,
pub file_type_write_options: Option<FileTypeWriterOptions>,
}
impl ListingOptions {
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
file_extension: String::new(),
format,
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
file_sort_order: vec![],
infinite_source: false,
insert_mode: ListingTableInsertMode::AppendToFile,
single_file: false,
file_type_write_options: None,
}
}
pub fn with_infinite_source(mut self, infinite_source: bool) -> Self {
self.infinite_source = infinite_source;
self
}
pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
self.file_extension = file_extension.into();
self
}
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
self.collect_stat = collect_stat;
self
}
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = target_partitions;
self
}
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
pub fn with_single_file(mut self, single_file: bool) -> Self {
self.single_file = single_file;
self
}
pub fn with_write_options(
mut self,
file_type_write_options: FileTypeWriterOptions,
) -> Self {
self.file_type_write_options = Some(file_type_write_options);
self
}
pub async fn infer_schema<'a>(
&'a self,
state: &SessionState,
table_path: &'a ListingTableUrl,
) -> Result<SchemaRef> {
let store = state.runtime_env().object_store(table_path)?;
let files: Vec<_> = table_path
.list_all_files(store.as_ref(), &self.file_extension)
.try_collect()
.await?;
self.format.infer_schema(state, &store, &files).await
}
}
#[derive(Default)]
struct StatisticsCache {
statistics: DashMap<Path, (ObjectMeta, Statistics)>,
}
impl StatisticsCache {
fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
self.statistics
.get(&meta.location)
.map(|s| {
let (saved_meta, statistics) = s.value();
if saved_meta.size != meta.size
|| saved_meta.last_modified != meta.last_modified
{
None
} else {
Some(statistics.clone())
}
})
.unwrap_or(None)
}
fn save(&self, meta: ObjectMeta, statistics: Statistics) {
self.statistics
.insert(meta.location.clone(), (meta, statistics));
}
}
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
file_schema: SchemaRef,
table_schema: SchemaRef,
options: ListingOptions,
definition: Option<String>,
collected_statistics: StatisticsCache,
infinite_source: bool,
}
impl ListingTable {
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
let file_schema = config
.file_schema
.ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
let options = config.options.ok_or_else(|| {
DataFusionError::Internal("No ListingOptions provided".into())
})?;
let mut builder = SchemaBuilder::from(file_schema.fields());
for (part_col_name, part_col_type) in &options.table_partition_cols {
builder.push(Field::new(part_col_name, part_col_type.clone(), false));
}
let infinite_source = options.infinite_source;
let table = Self {
table_paths: config.table_paths,
file_schema,
table_schema: Arc::new(builder.finish()),
options,
definition: None,
collected_statistics: Default::default(),
infinite_source,
};
Ok(table)
}
pub fn with_definition(mut self, defintion: Option<String>) -> Self {
self.definition = defintion;
self
}
pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
&self.table_paths
}
pub fn options(&self) -> &ListingOptions {
&self.options
}
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];
for exprs in &self.options.file_sort_order {
let sort_exprs = exprs
.iter()
.map(|expr| {
if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
if let Expr::Column(col) = expr.as_ref() {
let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
Ok(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
}
else {
plan_err!("Expected single column references in output_ordering, got {expr}")
}
} else {
plan_err!("Expected Expr::Sort in output_ordering, but got {expr}")
}
})
.collect::<Result<Vec<_>>>()?;
all_sort_orders.push(sort_exprs);
}
Ok(all_sort_orders)
}
}
#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (partitioned_file_lists, statistics) =
self.list_files_for_scan(state, filters, limit).await?;
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
}
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| {
Ok((
col.0.to_owned(),
self.table_schema
.field_with_name(&col.0)?
.data_type()
.clone(),
))
})
.collect::<Result<Vec<_>>>()?;
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
&self.table_schema,
state.execution_props(),
)?;
Some(filters)
} else {
None
};
let object_store_url = if let Some(url) = self.table_paths.get(0) {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))));
};
self.options
.format
.create_physical_plan(
state,
FileScanConfig {
object_store_url,
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.cloned(),
limit,
output_ordering: self.try_create_output_ordering()?,
table_partition_cols,
infinite_source: self.infinite_source,
},
filters.as_ref(),
)
.await
}
fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
filter,
) {
Ok(TableProviderFilterPushDown::Exact)
} else {
Ok(TableProviderFilterPushDown::Inexact)
}
}
fn get_table_definition(&self) -> Option<&str> {
self.definition.as_deref()
}
async fn insert_into(
&self,
state: &SessionState,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
if !self.schema().equivalent_names_and_types(&input.schema()) {
return plan_err!(
"Inserting query must have the same schema with the table."
);
}
if self.table_paths().len() > 1 {
return plan_err!(
"Writing to a table backed by multiple partitions is not supported yet"
);
}
let unsorted: Vec<Vec<Expr>> = vec![];
if self.options.file_sort_order != unsorted {
return Err(
DataFusionError::NotImplemented(
"Writing to a sorted listing table via insert into is not supported yet. \
To write to this table in the meantime, register an equivalent table with \
file_sort_order = vec![]".into())
);
}
let table_path = &self.table_paths()[0];
let store = state.runtime_env().object_store(table_path)?;
let file_list_stream = pruned_partition_list(
store.as_ref(),
table_path,
&[],
&self.options.file_extension,
&self.options.table_partition_cols,
)
.await?;
let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
let writer_mode;
let input_partitions = input.output_partitioning().partition_count();
match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len() {
return Err(DataFusionError::Plan(format!(
"Cannot append {input_partitions} partitions to {} files!",
file_groups.len()
)));
}
writer_mode =
crate::datasource::file_format::write::FileWriterMode::Append;
}
ListingTableInsertMode::AppendNewFiles => {
writer_mode =
crate::datasource::file_format::write::FileWriterMode::PutMultipart
}
ListingTableInsertMode::Error => {
return plan_err!(
"Invalid plan attempting write to table with TableWriteMode::Error!"
)
}
}
let file_format = self.options().format.as_ref();
let file_type_writer_options = match &self.options().file_type_write_options {
Some(opt) => opt.clone(),
None => FileTypeWriterOptions::build_default(
&file_format.file_type(),
state.config_options(),
)?,
};
let config = FileSinkConfig {
object_store_url: self.table_paths()[0].object_store(),
table_paths: self.table_paths().clone(),
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
writer_mode,
single_file_output: self.options.single_file,
overwrite,
file_type_writer_options,
};
self.options()
.format
.create_writer_physical_plan(input, state, config)
.await
}
}
impl ListingTable {
async fn list_files_for_scan<'a>(
&'a self,
ctx: &'a SessionState,
filters: &'a [Expr],
limit: Option<usize>,
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
let store = if let Some(url) = self.table_paths.get(0) {
ctx.runtime_env().object_store(url)?
} else {
return Ok((vec![], Statistics::default()));
};
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
pruned_partition_list(
store.as_ref(),
table_path,
filters,
&self.options.file_extension,
&self.options.table_partition_cols,
)
}))
.await?;
let file_list = stream::iter(file_list).flatten();
let files = file_list.then(|part_file| async {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
match self.collected_statistics.get(&part_file.object_meta) {
Some(statistics) => statistics,
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
self.collected_statistics
.save(part_file.object_meta.clone(), statistics.clone());
statistics
}
}
} else {
Statistics::default()
};
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
});
let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit).await?;
Ok((
split_files(files, self.options.target_partitions),
statistics,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
use crate::physical_plan::collect;
use crate::prelude::*;
use crate::{
assert_batches_eq,
datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
execution::options::ReadOptions,
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
};
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use chrono::DateTime;
use datafusion_common::assert_contains;
use datafusion_common::GetExt;
use datafusion_expr::LogicalPlanBuilder;
use rstest::*;
use std::collections::HashMap;
use std::fs::File;
use tempfile::TempDir;
async fn unbounded_table_helper(
file_type: FileType,
listing_option: ListingOptions,
infinite_data: bool,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(
&ctx,
&[(&format!("table/file{}", file_type.get_ext()), 100)],
);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_option)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
let source_exec = table.scan(&ctx.state(), None, &[], None).await?;
assert_eq!(source_exec.unbounded_output(&[])?, infinite_data);
Ok(())
}
#[tokio::test]
async fn read_single_file() -> Result<()> {
let ctx = SessionContext::new();
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
let projection = None;
let exec = table
.scan(&ctx.state(), projection, &[], None)
.await
.expect("Scan table");
assert_eq!(exec.children().len(), 0);
assert_eq!(exec.output_partitioning().partition_count(), 1);
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));
Ok(())
}
#[tokio::test]
async fn load_table_stats_by_default() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
let ctx = SessionContext::new();
let state = ctx.state();
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = opt.infer_schema(&state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));
Ok(())
}
#[tokio::test]
async fn load_table_stats_when_no_stats() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
let ctx = SessionContext::new();
let state = ctx.state();
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_collect_stat(false);
let schema = opt.infer_schema(&state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, None);
assert_eq!(exec.statistics().total_byte_size, None);
Ok(())
}
#[tokio::test]
async fn test_try_create_output_ordering() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
let ctx = SessionContext::new();
let state = ctx.state();
let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = options.infer_schema(&state, &table_path).await.unwrap();
use crate::physical_plan::expressions::col as physical_col;
use std::ops::Add;
let cases = vec![
(vec![], Ok(vec![])),
(
vec![vec![col("string_col")]],
Err("Expected Expr::Sort in output_ordering, but got string_col"),
),
(
vec![vec![
col("int_col").add(lit(1)).sort(true, true),
]],
Err("Expected single column references in output_ordering, got int_col + Int32(1)"),
),
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}]])
),
(
vec![vec![
col("string_col").sort(true, false),
col("int_col").sort(false, true),
]],
Ok(vec![vec![
PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
PhysicalSortExpr {
expr: physical_col("int_col", &schema).unwrap(),
options: SortOptions {
descending: true,
nulls_first: true,
},
},
]])
),
];
for (file_sort_order, expected_result) in cases {
let options = options.clone().with_file_sort_order(file_sort_order);
let config = ListingTableConfig::new(table_path.clone())
.with_listing_options(options)
.with_schema(schema.clone());
let table =
ListingTable::try_new(config.clone()).expect("Creating the table");
let ordering_result = table.try_create_output_ordering();
match (expected_result, ordering_result) {
(Ok(expected), Ok(result)) => {
assert_eq!(expected, result);
}
(Err(expected), Err(result)) => {
let result = result.to_string();
let expected = expected.to_string();
assert_contains!(result.to_string(), expected);
}
(expected_result, ordering_result) => {
panic!(
"expected: {expected_result:#?}\n\nactual:{ordering_result:#?}"
);
}
}
}
}
#[tokio::test]
async fn read_empty_table() -> Result<()> {
let ctx = SessionContext::new();
let path = String::from("table/p1=v1/file.avro");
register_test_store(&ctx, &[(&path, 100)]);
let opt = ListingOptions::new(Arc::new(AvroFormat {}))
.with_file_extension(FileType::AVRO.get_ext())
.with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
.with_target_partitions(4);
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(file_schema);
let table = ListingTable::try_new(config)?;
assert_eq!(
columns(&table.schema()),
vec!["a".to_owned(), "p1".to_owned()]
);
let filter = Expr::not_eq(col("p1"), lit("v1"));
let scan = table
.scan(&ctx.state(), None, &[filter], None)
.await
.expect("Empty execution plan");
assert!(scan.as_any().is::<EmptyExec>());
assert_eq!(
columns(&scan.schema()),
vec!["a".to_owned(), "p1".to_owned()]
);
Ok(())
}
#[tokio::test]
async fn unbounded_csv_table_without_schema() -> Result<()> {
let tmp_dir = TempDir::new()?;
let file_path = tmp_dir.path().join("dummy.csv");
File::create(file_path)?;
let ctx = SessionContext::new();
let error = ctx
.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().mark_infinite(true),
)
.await
.unwrap_err();
match error {
DataFusionError::Plan(_) => Ok(()),
val => Err(val),
}
}
#[tokio::test]
async fn unbounded_json_table_without_schema() -> Result<()> {
let tmp_dir = TempDir::new()?;
let file_path = tmp_dir.path().join("dummy.json");
File::create(file_path)?;
let ctx = SessionContext::new();
let error = ctx
.register_json(
"test",
tmp_dir.path().to_str().unwrap(),
NdJsonReadOptions::default().mark_infinite(true),
)
.await
.unwrap_err();
match error {
DataFusionError::Plan(_) => Ok(()),
val => Err(val),
}
}
#[tokio::test]
async fn unbounded_avro_table_without_schema() -> Result<()> {
let tmp_dir = TempDir::new()?;
let file_path = tmp_dir.path().join("dummy.avro");
File::create(file_path)?;
let ctx = SessionContext::new();
let error = ctx
.register_avro(
"test",
tmp_dir.path().to_str().unwrap(),
AvroReadOptions::default().mark_infinite(true),
)
.await
.unwrap_err();
match error {
DataFusionError::Plan(_) => Ok(()),
val => Err(val),
}
}
#[rstest]
#[tokio::test]
async fn unbounded_csv_table(
#[values(true, false)] infinite_data: bool,
) -> Result<()> {
let config = CsvReadOptions::new().mark_infinite(infinite_data);
let session_config = SessionConfig::new().with_target_partitions(1);
let listing_options = config.to_listing_options(&session_config);
unbounded_table_helper(FileType::CSV, listing_options, infinite_data).await
}
#[rstest]
#[tokio::test]
async fn unbounded_json_table(
#[values(true, false)] infinite_data: bool,
) -> Result<()> {
let config = NdJsonReadOptions::default().mark_infinite(infinite_data);
let session_config = SessionConfig::new().with_target_partitions(1);
let listing_options = config.to_listing_options(&session_config);
unbounded_table_helper(FileType::JSON, listing_options, infinite_data).await
}
#[rstest]
#[tokio::test]
async fn unbounded_avro_table(
#[values(true, false)] infinite_data: bool,
) -> Result<()> {
let config = AvroReadOptions::default().mark_infinite(infinite_data);
let session_config = SessionConfig::new().with_target_partitions(1);
let listing_options = config.to_listing_options(&session_config);
unbounded_table_helper(FileType::AVRO, listing_options, infinite_data).await
}
#[tokio::test]
async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
"test:///bucket/key-prefix/",
12,
5,
)
.await?;
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
],
"test:///bucket/key-prefix/",
4,
4,
)
.await?;
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
"test:///bucket/key-prefix/",
2,
2,
)
.await?;
assert_list_files_for_scan_grouping(&[], "test:///bucket/key-prefix/", 2, 0)
.await?;
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0",
"bucket/key-prefix/file1",
"bucket/other-prefix/roguefile",
],
"test:///bucket/key-prefix/",
10,
2,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_assert_list_files_for_multi_path() -> Result<()> {
assert_list_files_for_multi_paths(
&[
"bucket/key1/file0",
"bucket/key1/file1",
"bucket/key1/file2",
"bucket/key2/file3",
"bucket/key2/file4",
"bucket/key3/file5",
],
&["test:///bucket/key1/", "test:///bucket/key2/"],
12,
5,
)
.await?;
assert_list_files_for_multi_paths(
&[
"bucket/key1/file0",
"bucket/key1/file1",
"bucket/key1/file2",
"bucket/key2/file3",
"bucket/key2/file4",
"bucket/key3/file5",
],
&["test:///bucket/key1/", "test:///bucket/key2/"],
5,
5,
)
.await?;
assert_list_files_for_multi_paths(
&[
"bucket/key1/file0",
"bucket/key1/file1",
"bucket/key1/file2",
"bucket/key2/file3",
"bucket/key2/file4",
"bucket/key3/file5",
],
&["test:///bucket/key1/"],
2,
2,
)
.await?;
assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0).await?;
assert_list_files_for_multi_paths(
&[
"bucket/key1/file0",
"bucket/key1/file1",
"bucket/key1/file2",
"bucket/key2/file3",
"bucket/key2/file4",
"bucket/key3/file5",
],
&["test:///bucket/key3/"],
2,
1,
)
.await?;
Ok(())
}
async fn load_table(
ctx: &SessionContext,
name: &str,
) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{testdata}/{name}");
let table_path = ListingTableUrl::parse(filename).unwrap();
let config = ListingTableConfig::new(table_path)
.infer(&ctx.state())
.await?;
let table = ListingTable::try_new(config)?;
Ok(Arc::new(table))
}
async fn assert_list_files_for_scan_grouping(
files: &[&str],
table_prefix: &str,
target_partitions: usize,
output_partitioning: usize,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
let format = AvroFormat {};
let opt = ListingOptions::new(Arc::new(format))
.with_file_extension("")
.with_target_partitions(target_partitions);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_path = ListingTableUrl::parse(table_prefix).unwrap();
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(file_list.len(), output_partitioning);
Ok(())
}
async fn assert_list_files_for_multi_paths(
files: &[&str],
table_prefix: &[&str],
target_partitions: usize,
output_partitioning: usize,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
let format = AvroFormat {};
let opt = ListingOptions::new(Arc::new(format))
.with_file_extension("")
.with_target_partitions(target_partitions);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_paths = table_prefix
.iter()
.map(|t| ListingTableUrl::parse(t).unwrap())
.collect();
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
let table = ListingTable::try_new(config)?;
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
assert_eq!(file_list.len(), output_partitioning);
Ok(())
}
#[test]
fn test_statistics_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
};
let cache = StatisticsCache::default();
assert!(cache.get(&meta).is_none());
cache.save(meta.clone(), Statistics::default());
assert!(cache.get(&meta).is_some());
let mut meta2 = meta.clone();
meta2.size = 2048;
assert!(cache.get(&meta2).is_none());
let mut meta2 = meta.clone();
meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
.unwrap()
.into();
assert!(cache.get(&meta2).is_none());
let mut meta2 = meta;
meta2.location = Path::from("test2");
assert!(cache.get(&meta2).is_none());
}
#[tokio::test]
async fn test_insert_into_append_to_json_file() -> Result<()> {
helper_test_insert_into_append_to_existing_files(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_new_json_files() -> Result<()> {
helper_test_append_new_files_to_table(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_to_csv_file() -> Result<()> {
helper_test_insert_into_append_to_existing_files(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_new_csv_files() -> Result<()> {
helper_test_append_new_files_to_table(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> {
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_csv_defaults() -> Result<()> {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
"OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
"WITH HEADER ROW \
OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_json_defaults() -> Result<()> {
helper_test_insert_into_sql(
"json",
FileCompressionType::UNCOMPRESSED,
"OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
None,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd(5)".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_enabled".into(),
"false".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_page_size_limit".into(),
"100".into(),
);
config_map.insert(
"datafusion.execution.parquet.staistics_enabled".into(),
"none".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_statistics_size".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_row_group_size".into(),
"5".into(),
);
config_map.insert(
"datafusion.execution.parquet.created_by".into(),
"datafusion test".into(),
);
config_map.insert(
"datafusion.execution.parquet.column_index_truncate_length".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.data_page_row_count_limit".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"true".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_fpp".into(),
"0.01".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_ndv".into(),
"1000".into(),
);
config_map.insert(
"datafusion.execution.parquet.writer_version".into(),
"2.0".into(),
);
config_map.insert(
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
Some(config_map),
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd(5)".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_enabled".into(),
"false".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_page_size_limit".into(),
"100".into(),
);
config_map.insert(
"datafusion.execution.parquet.staistics_enabled".into(),
"none".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_statistics_size".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_row_group_size".into(),
"5".into(),
);
config_map.insert(
"datafusion.execution.parquet.created_by".into(),
"datafusion test".into(),
);
config_map.insert(
"datafusion.execution.parquet.column_index_truncate_length".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.data_page_row_count_limit".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.encoding".into(),
"delta_binary_packed".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"true".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_fpp".into(),
"0.01".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_ndv".into(),
"1000".into(),
);
config_map.insert(
"datafusion.execution.parquet.writer_version".into(),
"2.0".into(),
);
config_map.insert(
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
) -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd".into(),
);
let e = helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
)
.await
.expect_err("Example should fail!");
assert_eq!(e.strip_backtrace(), "Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)");
Ok(())
}
#[tokio::test]
async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> {
let maybe_err = helper_test_insert_into_append_to_existing_files(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
None,
)
.await;
let _err =
maybe_err.expect_err("Appending to existing parquet file did not fail!");
Ok(())
}
fn load_empty_schema_table(
schema: SchemaRef,
temp_path: &str,
insert_mode: ListingTableInsertMode,
file_format: Arc<dyn FileFormat>,
) -> Result<Arc<dyn TableProvider>> {
File::create(temp_path)?;
let table_path = ListingTableUrl::parse(temp_path).unwrap();
let listing_options =
ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode);
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
Ok(Arc::new(table))
}
async fn helper_test_insert_into_append_to_existing_files(
file_type: FileType,
file_compression_type: FileCompressionType,
session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
}
None => SessionContext::new(),
};
let schema = Arc::new(Schema::new(vec![Field::new(
"column1",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
)?;
let filename = format!(
"path{}",
file_type
.to_owned()
.get_ext_with_compression(file_compression_type)
.unwrap()
);
let tmp_dir = TempDir::new()?;
let path = tmp_dir.path().join(filename);
let file_format: Arc<dyn FileFormat> = match file_type {
FileType::CSV => Arc::new(
CsvFormat::default().with_file_compression_type(file_compression_type),
),
FileType::JSON => Arc::new(
JsonFormat::default().with_file_compression_type(file_compression_type),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::AVRO => Arc::new(AvroFormat {}),
FileType::ARROW => Arc::new(ArrowFormat {}),
};
let initial_table = load_empty_schema_table(
schema.clone(),
path.to_str().unwrap(),
ListingTableInsertMode::AppendToFile,
file_format,
)?;
session_ctx.register_table("t", initial_table)?;
let source_table = Arc::new(MemTable::try_new(
schema.clone(),
vec![vec![batch.clone(), batch.clone()]],
)?);
session_ctx.register_table("source", source_table.clone())?;
let source = provider_as_source(source_table);
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.await?;
let res = collect(plan, session_ctx.task_ctx()).await?;
let expected = [
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"+-------+",
];
assert_batches_eq!(expected, &res);
let batches = session_ctx.sql("select * from t").await?.collect().await?;
let expected = [
"+---------+",
"| column1 |",
"+---------+",
"| 1 |",
"| 2 |",
"| 3 |",
"| 1 |",
"| 2 |",
"| 3 |",
"+---------+",
];
assert_batches_eq!(expected, &batches);
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 1);
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.await?;
let res = collect(plan, session_ctx.task_ctx()).await?;
let expected = [
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"+-------+",
];
assert_batches_eq!(expected, &res);
let batches = session_ctx.sql("select * from t").await?.collect().await?;
let expected = vec![
"+---------+",
"| column1 |",
"+---------+",
"| 1 |",
"| 2 |",
"| 3 |",
"| 1 |",
"| 2 |",
"| 3 |",
"| 1 |",
"| 2 |",
"| 3 |",
"| 1 |",
"| 2 |",
"| 3 |",
"+---------+",
];
assert_batches_eq!(expected, &batches);
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 1);
Ok(())
}
async fn helper_test_append_new_files_to_table(
file_type: FileType,
file_compression_type: FileCompressionType,
session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
}
None => SessionContext::new(),
};
let schema = Arc::new(Schema::new(vec![Field::new(
"column1",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
)?;
let tmp_dir = TempDir::new()?;
match file_type {
FileType::CSV => {
session_ctx
.register_csv(
"t",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new()
.insert_mode(ListingTableInsertMode::AppendNewFiles)
.schema(schema.as_ref())
.file_compression_type(file_compression_type),
)
.await?;
}
FileType::JSON => {
session_ctx
.register_json(
"t",
tmp_dir.path().to_str().unwrap(),
NdJsonReadOptions::default()
.insert_mode(ListingTableInsertMode::AppendNewFiles)
.schema(schema.as_ref())
.file_compression_type(file_compression_type),
)
.await?;
}
FileType::PARQUET => {
session_ctx
.register_parquet(
"t",
tmp_dir.path().to_str().unwrap(),
ParquetReadOptions::default()
.insert_mode(ListingTableInsertMode::AppendNewFiles)
.schema(schema.as_ref()),
)
.await?;
}
FileType::AVRO => {
session_ctx
.register_avro(
"t",
tmp_dir.path().to_str().unwrap(),
AvroReadOptions::default()
.schema(schema.as_ref()),
)
.await?;
}
FileType::ARROW => {
session_ctx
.register_arrow(
"t",
tmp_dir.path().to_str().unwrap(),
ArrowReadOptions::default()
.schema(schema.as_ref()),
)
.await?;
}
}
let source_table = Arc::new(MemTable::try_new(
schema.clone(),
vec![vec![batch.clone(), batch.clone()]],
)?);
session_ctx.register_table("source", source_table.clone())?;
let source = provider_as_source(source_table);
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
.repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))?
.build()?;
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.await?;
let res = collect(plan, session_ctx.task_ctx()).await?;
let expected = [
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"+-------+",
];
assert_batches_eq!(expected, &res);
let batches = session_ctx
.sql("select count(*) as count from t")
.await?
.collect()
.await?;
let expected = [
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"+-------+",
];
assert_batches_eq!(expected, &batches);
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 6);
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.await?;
let res = collect(plan, session_ctx.task_ctx()).await?;
let expected = [
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"+-------+",
];
assert_batches_eq!(expected, &res);
let batches = session_ctx
.sql("select count(*) AS count from t")
.await?
.collect()
.await?;
let expected = [
"+-------+",
"| count |",
"+-------+",
"| 12 |",
"+-------+",
];
assert_batches_eq!(expected, &batches);
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 12);
Ok(())
}
async fn helper_test_insert_into_sql(
file_type: &str,
_file_compression_type: FileCompressionType,
external_table_options: &str,
session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
}
None => SessionContext::new(),
};
let tmp_dir = TempDir::new()?;
let tmp_path = tmp_dir.into_path();
let str_path = tmp_path.to_str().expect("Temp path should convert to &str");
session_ctx
.sql(&format!(
"create external table foo(a varchar, b varchar, c int) \
stored as {file_type} \
location '{str_path}' \
{external_table_options}"
))
.await?
.collect()
.await?;
session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
.await?
.collect()
.await?;
let batches = session_ctx
.sql("select * from foo")
.await?
.collect()
.await?;
let expected = [
"+-----+-----+---+",
"| a | b | c |",
"+-----+-----+---+",
"| foo | bar | 1 |",
"| foo | bar | 2 |",
"| foo | bar | 3 |",
"+-----+-----+---+",
];
assert_batches_eq!(expected, &batches);
Ok(())
}
}