datafusion/datasource/listing/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The table implementation.
19
20use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
21use super::{ListingTableUrl, PartitionedFile};
22use std::collections::HashMap;
23use std::{any::Any, str::FromStr, sync::Arc};
24
25use crate::datasource::{
26    create_ordering,
27    file_format::{
28        file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
29    },
30    physical_plan::FileSinkConfig,
31};
32use crate::execution::context::SessionState;
33use datafusion_catalog::TableProvider;
34use datafusion_common::{config_err, DataFusionError, Result};
35use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
36use datafusion_expr::dml::InsertOp;
37use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
38use datafusion_expr::{SortExpr, TableType};
39use datafusion_physical_plan::empty::EmptyExec;
40use datafusion_physical_plan::{ExecutionPlan, Statistics};
41
42use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
43use datafusion_common::{
44    config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
45    SchemaExt, ToDFSchema,
46};
47use datafusion_execution::cache::{
48    cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
49};
50use datafusion_physical_expr::{
51    create_physical_expr, LexOrdering, PhysicalSortRequirement,
52};
53
54use async_trait::async_trait;
55use datafusion_catalog::Session;
56use datafusion_common::stats::Precision;
57use datafusion_datasource::compute_all_files_statistics;
58use datafusion_datasource::file_groups::FileGroup;
59use datafusion_physical_expr_common::sort_expr::LexRequirement;
60use futures::{future, stream, Stream, StreamExt, TryStreamExt};
61use itertools::Itertools;
62use object_store::ObjectStore;
63
64/// Configuration for creating a [`ListingTable`]
65#[derive(Debug, Clone)]
66pub struct ListingTableConfig {
67    /// Paths on the `ObjectStore` for creating `ListingTable`.
68    /// They should share the same schema and object store.
69    pub table_paths: Vec<ListingTableUrl>,
70    /// Optional `SchemaRef` for the to be created `ListingTable`.
71    pub file_schema: Option<SchemaRef>,
72    /// Optional `ListingOptions` for the to be created `ListingTable`.
73    pub options: Option<ListingOptions>,
74}
75
76impl ListingTableConfig {
77    /// Creates new [`ListingTableConfig`].
78    ///
79    /// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
80    /// the suffix of the provided `table_paths` first element.
81    pub fn new(table_path: ListingTableUrl) -> Self {
82        let table_paths = vec![table_path];
83        Self {
84            table_paths,
85            file_schema: None,
86            options: None,
87        }
88    }
89
90    /// Creates new [`ListingTableConfig`] with multiple table paths.
91    ///
92    /// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
93    /// the suffix of the provided `table_paths` first element.
94    pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
95        Self {
96            table_paths,
97            file_schema: None,
98            options: None,
99        }
100    }
101    /// Add `schema` to [`ListingTableConfig`]
102    pub fn with_schema(self, schema: SchemaRef) -> Self {
103        Self {
104            table_paths: self.table_paths,
105            file_schema: Some(schema),
106            options: self.options,
107        }
108    }
109
110    /// Add `listing_options` to [`ListingTableConfig`]
111    pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
112        Self {
113            table_paths: self.table_paths,
114            file_schema: self.file_schema,
115            options: Some(listing_options),
116        }
117    }
118
119    ///Returns a tupe of (file_extension, optional compression_extension)
120    ///
121    /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
122    /// For example a path ending with blah.test.csv returns `("csv", None)`
123    fn infer_file_extension_and_compression_type(
124        path: &str,
125    ) -> Result<(String, Option<String>)> {
126        let mut exts = path.rsplit('.');
127
128        let splitted = exts.next().unwrap_or("");
129
130        let file_compression_type = FileCompressionType::from_str(splitted)
131            .unwrap_or(FileCompressionType::UNCOMPRESSED);
132
133        if file_compression_type.is_compressed() {
134            let splitted2 = exts.next().unwrap_or("");
135            Ok((splitted2.to_string(), Some(splitted.to_string())))
136        } else {
137            Ok((splitted.to_string(), None))
138        }
139    }
140
141    /// Infer `ListingOptions` based on `table_path` suffix.
142    pub async fn infer_options(self, state: &dyn Session) -> Result<Self> {
143        let store = if let Some(url) = self.table_paths.first() {
144            state.runtime_env().object_store(url)?
145        } else {
146            return Ok(self);
147        };
148
149        let file = self
150            .table_paths
151            .first()
152            .unwrap()
153            .list_all_files(state, store.as_ref(), "")
154            .await?
155            .next()
156            .await
157            .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
158
159        let (file_extension, maybe_compression_type) =
160            ListingTableConfig::infer_file_extension_and_compression_type(
161                file.location.as_ref(),
162            )?;
163
164        let mut format_options = HashMap::new();
165        if let Some(ref compression_type) = maybe_compression_type {
166            format_options
167                .insert("format.compression".to_string(), compression_type.clone());
168        }
169        let state = state.as_any().downcast_ref::<SessionState>().unwrap();
170        let file_format = state
171            .get_file_format_factory(&file_extension)
172            .ok_or(config_datafusion_err!(
173                "No file_format found with extension {file_extension}"
174            ))?
175            .create(state, &format_options)?;
176
177        let listing_file_extension =
178            if let Some(compression_type) = maybe_compression_type {
179                format!("{}.{}", &file_extension, &compression_type)
180            } else {
181                file_extension
182            };
183
184        let listing_options = ListingOptions::new(file_format)
185            .with_file_extension(listing_file_extension)
186            .with_target_partitions(state.config().target_partitions());
187
188        Ok(Self {
189            table_paths: self.table_paths,
190            file_schema: self.file_schema,
191            options: Some(listing_options),
192        })
193    }
194
195    /// Infer the [`SchemaRef`] based on `table_path` suffix.  Requires `self.options` to be set prior to using.
196    pub async fn infer_schema(self, state: &dyn Session) -> Result<Self> {
197        match self.options {
198            Some(options) => {
199                let schema = if let Some(url) = self.table_paths.first() {
200                    options.infer_schema(state, url).await?
201                } else {
202                    Arc::new(Schema::empty())
203                };
204
205                Ok(Self {
206                    table_paths: self.table_paths,
207                    file_schema: Some(schema),
208                    options: Some(options),
209                })
210            }
211            None => internal_err!("No `ListingOptions` set for inferring schema"),
212        }
213    }
214
215    /// Convenience wrapper for calling `infer_options` and `infer_schema`
216    pub async fn infer(self, state: &dyn Session) -> Result<Self> {
217        self.infer_options(state).await?.infer_schema(state).await
218    }
219
220    /// Infer the partition columns from the path. Requires `self.options` to be set prior to using.
221    pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result<Self> {
222        match self.options {
223            Some(options) => {
224                let Some(url) = self.table_paths.first() else {
225                    return config_err!("No table path found");
226                };
227                let partitions = options
228                    .infer_partitions(state, url)
229                    .await?
230                    .into_iter()
231                    .map(|col_name| {
232                        (
233                            col_name,
234                            DataType::Dictionary(
235                                Box::new(DataType::UInt16),
236                                Box::new(DataType::Utf8),
237                            ),
238                        )
239                    })
240                    .collect::<Vec<_>>();
241                let options = options.with_table_partition_cols(partitions);
242                Ok(Self {
243                    table_paths: self.table_paths,
244                    file_schema: self.file_schema,
245                    options: Some(options),
246                })
247            }
248            None => config_err!("No `ListingOptions` set for inferring schema"),
249        }
250    }
251}
252
253/// Options for creating a [`ListingTable`]
254#[derive(Clone, Debug)]
255pub struct ListingOptions {
256    /// A suffix on which files should be filtered (leave empty to
257    /// keep all files on the path)
258    pub file_extension: String,
259    /// The file format
260    pub format: Arc<dyn FileFormat>,
261    /// The expected partition column names in the folder structure.
262    /// See [Self::with_table_partition_cols] for details
263    pub table_partition_cols: Vec<(String, DataType)>,
264    /// Set true to try to guess statistics from the files.
265    /// This can add a lot of overhead as it will usually require files
266    /// to be opened and at least partially parsed.
267    pub collect_stat: bool,
268    /// Group files to avoid that the number of partitions exceeds
269    /// this limit
270    pub target_partitions: usize,
271    /// Optional pre-known sort order(s). Must be `SortExpr`s.
272    ///
273    /// DataFusion may take advantage of this ordering to omit sorts
274    /// or use more efficient algorithms. Currently sortedness must be
275    /// provided if it is known by some external mechanism, but may in
276    /// the future be automatically determined, for example using
277    /// parquet metadata.
278    ///
279    /// See <https://github.com/apache/datafusion/issues/4177>
280    /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
281    ///       where each ordering consists of an individual lexicographic
282    ///       ordering (encapsulated by a `Vec<Expr>`). If there aren't
283    ///       multiple equivalent orderings, the outer `Vec` will have a
284    ///       single element.
285    pub file_sort_order: Vec<Vec<SortExpr>>,
286}
287
288impl ListingOptions {
289    /// Creates an options instance with the given format
290    /// Default values:
291    /// - use default file extension filter
292    /// - no input partition to discover
293    /// - one target partition
294    /// - stat collection
295    pub fn new(format: Arc<dyn FileFormat>) -> Self {
296        Self {
297            file_extension: format.get_ext(),
298            format,
299            table_partition_cols: vec![],
300            collect_stat: true,
301            target_partitions: 1,
302            file_sort_order: vec![],
303        }
304    }
305
306    /// Set file extension on [`ListingOptions`] and returns self.
307    ///
308    /// # Example
309    /// ```
310    /// # use std::sync::Arc;
311    /// # use datafusion::prelude::SessionContext;
312    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
313    ///
314    /// let listing_options = ListingOptions::new(Arc::new(
315    ///     ParquetFormat::default()
316    ///   ))
317    ///   .with_file_extension(".parquet");
318    ///
319    /// assert_eq!(listing_options.file_extension, ".parquet");
320    /// ```
321    pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
322        self.file_extension = file_extension.into();
323        self
324    }
325
326    /// Optionally set file extension on [`ListingOptions`] and returns self.
327    ///
328    /// If `file_extension` is `None`, the file extension will not be changed
329    ///
330    /// # Example
331    /// ```
332    /// # use std::sync::Arc;
333    /// # use datafusion::prelude::SessionContext;
334    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
335    /// let extension = Some(".parquet");
336    /// let listing_options = ListingOptions::new(Arc::new(
337    ///     ParquetFormat::default()
338    ///   ))
339    ///   .with_file_extension_opt(extension);
340    ///
341    /// assert_eq!(listing_options.file_extension, ".parquet");
342    /// ```
343    pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
344    where
345        S: Into<String>,
346    {
347        if let Some(file_extension) = file_extension {
348            self.file_extension = file_extension.into();
349        }
350        self
351    }
352
353    /// Set `table partition columns` on [`ListingOptions`] and returns self.
354    ///
355    /// "partition columns," used to support [Hive Partitioning], are
356    /// columns added to the data that is read, based on the folder
357    /// structure where the data resides.
358    ///
359    /// For example, give the following files in your filesystem:
360    ///
361    /// ```text
362    /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet
363    /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet
364    /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet
365    /// ```
366    ///
367    /// A [`ListingTable`] created at `/mnt/nyctaxi/` with partition
368    /// columns "year" and "month" will include new `year` and `month`
369    /// columns while reading the files. The `year` column would have
370    /// value `2022` and the `month` column would have value `01` for
371    /// the rows read from
372    /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet`
373    ///
374    ///# Notes
375    ///
376    /// - If only one level (e.g. `year` in the example above) is
377    ///   specified, the other levels are ignored but the files are
378    ///   still read.
379    ///
380    /// - Files that don't follow this partitioning scheme will be
381    ///   ignored.
382    ///
383    /// - Since the columns have the same value for all rows read from
384    ///   each individual file (such as dates), they are typically
385    ///   dictionary encoded for efficiency. You may use
386    ///   [`wrap_partition_type_in_dict`] to request a
387    ///   dictionary-encoded type.
388    ///
389    /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
390    ///
391    /// # Example
392    ///
393    /// ```
394    /// # use std::sync::Arc;
395    /// # use arrow::datatypes::DataType;
396    /// # use datafusion::prelude::col;
397    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
398    ///
399    /// // listing options for files with paths such as  `/mnt/data/col_a=x/col_b=y/data.parquet`
400    /// // `col_a` and `col_b` will be included in the data read from those files
401    /// let listing_options = ListingOptions::new(Arc::new(
402    ///     ParquetFormat::default()
403    ///   ))
404    ///   .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
405    ///       ("col_b".to_string(), DataType::Utf8)]);
406    ///
407    /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
408    ///     ("col_b".to_string(), DataType::Utf8)]);
409    /// ```
410    ///
411    /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
412    /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
413    pub fn with_table_partition_cols(
414        mut self,
415        table_partition_cols: Vec<(String, DataType)>,
416    ) -> Self {
417        self.table_partition_cols = table_partition_cols;
418        self
419    }
420
421    /// Set stat collection on [`ListingOptions`] and returns self.
422    ///
423    /// ```
424    /// # use std::sync::Arc;
425    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
426    ///
427    /// let listing_options = ListingOptions::new(Arc::new(
428    ///     ParquetFormat::default()
429    ///   ))
430    ///   .with_collect_stat(true);
431    ///
432    /// assert_eq!(listing_options.collect_stat, true);
433    /// ```
434    pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
435        self.collect_stat = collect_stat;
436        self
437    }
438
439    /// Set number of target partitions on [`ListingOptions`] and returns self.
440    ///
441    /// ```
442    /// # use std::sync::Arc;
443    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
444    ///
445    /// let listing_options = ListingOptions::new(Arc::new(
446    ///     ParquetFormat::default()
447    ///   ))
448    ///   .with_target_partitions(8);
449    ///
450    /// assert_eq!(listing_options.target_partitions, 8);
451    /// ```
452    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
453        self.target_partitions = target_partitions;
454        self
455    }
456
457    /// Set file sort order on [`ListingOptions`] and returns self.
458    ///
459    /// ```
460    /// # use std::sync::Arc;
461    /// # use datafusion::prelude::col;
462    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
463    ///
464    ///  // Tell datafusion that the files are sorted by column "a"
465    ///  let file_sort_order = vec![vec![
466    ///    col("a").sort(true, true)
467    ///  ]];
468    ///
469    /// let listing_options = ListingOptions::new(Arc::new(
470    ///     ParquetFormat::default()
471    ///   ))
472    ///   .with_file_sort_order(file_sort_order.clone());
473    ///
474    /// assert_eq!(listing_options.file_sort_order, file_sort_order);
475    /// ```
476    pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
477        self.file_sort_order = file_sort_order;
478        self
479    }
480
481    /// Infer the schema of the files at the given path on the provided object store.
482    /// The inferred schema does not include the partitioning columns.
483    ///
484    /// This method will not be called by the table itself but before creating it.
485    /// This way when creating the logical plan we can decide to resolve the schema
486    /// locally or ask a remote service to do it (e.g a scheduler).
487    pub async fn infer_schema<'a>(
488        &'a self,
489        state: &dyn Session,
490        table_path: &'a ListingTableUrl,
491    ) -> Result<SchemaRef> {
492        let store = state.runtime_env().object_store(table_path)?;
493
494        let files: Vec<_> = table_path
495            .list_all_files(state, store.as_ref(), &self.file_extension)
496            .await?
497            // Empty files cannot affect schema but may throw when trying to read for it
498            .try_filter(|object_meta| future::ready(object_meta.size > 0))
499            .try_collect()
500            .await?;
501
502        let schema = self.format.infer_schema(state, &store, &files).await?;
503
504        Ok(schema)
505    }
506
507    /// Infers the partition columns stored in `LOCATION` and compares
508    /// them with the columns provided in `PARTITIONED BY` to help prevent
509    /// accidental corrupts of partitioned tables.
510    ///
511    /// Allows specifying partial partitions.
512    pub async fn validate_partitions(
513        &self,
514        state: &dyn Session,
515        table_path: &ListingTableUrl,
516    ) -> Result<()> {
517        if self.table_partition_cols.is_empty() {
518            return Ok(());
519        }
520
521        if !table_path.is_collection() {
522            return plan_err!(
523                "Can't create a partitioned table backed by a single file, \
524                perhaps the URL is missing a trailing slash?"
525            );
526        }
527
528        let inferred = self.infer_partitions(state, table_path).await?;
529
530        // no partitioned files found on disk
531        if inferred.is_empty() {
532            return Ok(());
533        }
534
535        let table_partition_names = self
536            .table_partition_cols
537            .iter()
538            .map(|(col_name, _)| col_name.clone())
539            .collect_vec();
540
541        if inferred.len() < table_partition_names.len() {
542            return plan_err!(
543                "Inferred partitions to be {:?}, but got {:?}",
544                inferred,
545                table_partition_names
546            );
547        }
548
549        // match prefix to allow creating tables with partial partitions
550        for (idx, col) in table_partition_names.iter().enumerate() {
551            if &inferred[idx] != col {
552                return plan_err!(
553                    "Inferred partitions to be {:?}, but got {:?}",
554                    inferred,
555                    table_partition_names
556                );
557            }
558        }
559
560        Ok(())
561    }
562
563    /// Infer the partitioning at the given path on the provided object store.
564    /// For performance reasons, it doesn't read all the files on disk
565    /// and therefore may fail to detect invalid partitioning.
566    pub(crate) async fn infer_partitions(
567        &self,
568        state: &dyn Session,
569        table_path: &ListingTableUrl,
570    ) -> Result<Vec<String>> {
571        let store = state.runtime_env().object_store(table_path)?;
572
573        // only use 10 files for inference
574        // This can fail to detect inconsistent partition keys
575        // A DFS traversal approach of the store can help here
576        let files: Vec<_> = table_path
577            .list_all_files(state, store.as_ref(), &self.file_extension)
578            .await?
579            .take(10)
580            .try_collect()
581            .await?;
582
583        let stripped_path_parts = files.iter().map(|file| {
584            table_path
585                .strip_prefix(&file.location)
586                .unwrap()
587                .collect_vec()
588        });
589
590        let partition_keys = stripped_path_parts
591            .map(|path_parts| {
592                path_parts
593                    .into_iter()
594                    .rev()
595                    .skip(1) // get parents only; skip the file itself
596                    .rev()
597                    .map(|s| s.split('=').take(1).collect())
598                    .collect_vec()
599            })
600            .collect_vec();
601
602        match partition_keys.into_iter().all_equal_value() {
603            Ok(v) => Ok(v),
604            Err(None) => Ok(vec![]),
605            Err(Some(diff)) => {
606                let mut sorted_diff = [diff.0, diff.1];
607                sorted_diff.sort();
608                plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
609            }
610        }
611    }
612}
613
614/// Reads data from one or more files as a single table.
615///
616/// Implements [`TableProvider`], a DataFusion data source. The files are read
617/// using an  [`ObjectStore`] instance, for example from local files or objects
618/// from AWS S3.
619///
620/// # Reading Directories
621/// For example, given the `table1` directory (or object store prefix)
622///
623/// ```text
624/// table1
625///  ├── file1.parquet
626///  └── file2.parquet
627/// ```
628///
629/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
630/// a single table, merging the schemas if the files have compatible but not
631/// identical schemas.
632///
633/// Given the `table2` directory (or object store prefix)
634///
635/// ```text
636/// table2
637///  ├── date=2024-06-01
638///  │    ├── file3.parquet
639///  │    └── file4.parquet
640///  └── date=2024-06-02
641///       └── file5.parquet
642/// ```
643///
644/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
645/// `file5.parquet` as a single table, again merging schemas if necessary.
646///
647/// Given the hive style partitioning structure (e.g,. directories named
648/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
649/// column when reading the table:
650/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
651/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
652///
653/// If the query has a predicate like `WHERE date = '2024-06-01'`
654/// only the corresponding directory will be read.
655///
656/// `ListingTable` also supports limit, filter and projection pushdown for formats that
657/// support it as such as Parquet.
658///
659/// # Implementation
660///
661/// `ListingTable` Uses [`DataSourceExec`] to execute the data. See that struct
662/// for more details.
663///
664/// [`DataSourceExec`]: crate::datasource::source::DataSourceExec
665///
666/// # Example
667///
668/// To read a directory of parquet files using a [`ListingTable`]:
669///
670/// ```no_run
671/// # use datafusion::prelude::SessionContext;
672/// # use datafusion::error::Result;
673/// # use std::sync::Arc;
674/// # use datafusion::datasource::{
675/// #   listing::{
676/// #      ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
677/// #   },
678/// #   file_format::parquet::ParquetFormat,
679/// # };
680/// # #[tokio::main]
681/// # async fn main() -> Result<()> {
682/// let ctx = SessionContext::new();
683/// let session_state = ctx.state();
684/// let table_path = "/path/to/parquet";
685///
686/// // Parse the path
687/// let table_path = ListingTableUrl::parse(table_path)?;
688///
689/// // Create default parquet options
690/// let file_format = ParquetFormat::new();
691/// let listing_options = ListingOptions::new(Arc::new(file_format))
692///   .with_file_extension(".parquet");
693///
694/// // Resolve the schema
695/// let resolved_schema = listing_options
696///    .infer_schema(&session_state, &table_path)
697///    .await?;
698///
699/// let config = ListingTableConfig::new(table_path)
700///   .with_listing_options(listing_options)
701///   .with_schema(resolved_schema);
702///
703/// // Create a new TableProvider
704/// let provider = Arc::new(ListingTable::try_new(config)?);
705///
706/// // This provider can now be read as a dataframe:
707/// let df = ctx.read_table(provider.clone());
708///
709/// // or registered as a named table:
710/// ctx.register_table("my_table", provider);
711///
712/// # Ok(())
713/// # }
714/// ```
715#[derive(Debug)]
716pub struct ListingTable {
717    table_paths: Vec<ListingTableUrl>,
718    /// `file_schema` contains only the columns physically stored in the data files themselves.
719    ///     - Represents the actual fields found in files like Parquet, CSV, etc.
720    ///     - Used when reading the raw data from files
721    file_schema: SchemaRef,
722    /// `table_schema` combines `file_schema` + partition columns
723    ///     - Partition columns are derived from directory paths (not stored in files)
724    ///     - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet`
725    table_schema: SchemaRef,
726    options: ListingOptions,
727    definition: Option<String>,
728    collected_statistics: FileStatisticsCache,
729    constraints: Constraints,
730    column_defaults: HashMap<String, Expr>,
731}
732
733impl ListingTable {
734    /// Create new [`ListingTable`] that lists the FS to get the files
735    /// to scan. See [`ListingTable`] for and example.
736    ///
737    /// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`.
738    /// `ListingOptions` and `SchemaRef` are optional.  If they are not
739    /// provided the file type is inferred based on the file suffix.
740    /// If the schema is provided then it must be resolved before creating the table
741    /// and should contain the fields of the file without the table
742    /// partitioning columns.
743    ///
744    pub fn try_new(config: ListingTableConfig) -> Result<Self> {
745        let file_schema = config
746            .file_schema
747            .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
748
749        let options = config.options.ok_or_else(|| {
750            DataFusionError::Internal("No ListingOptions provided".into())
751        })?;
752
753        // Add the partition columns to the file schema
754        let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
755        for (part_col_name, part_col_type) in &options.table_partition_cols {
756            builder.push(Field::new(part_col_name, part_col_type.clone(), false));
757        }
758
759        let table_schema = Arc::new(
760            builder
761                .finish()
762                .with_metadata(file_schema.metadata().clone()),
763        );
764
765        let table = Self {
766            table_paths: config.table_paths,
767            file_schema,
768            table_schema,
769            options,
770            definition: None,
771            collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
772            constraints: Constraints::empty(),
773            column_defaults: HashMap::new(),
774        };
775
776        Ok(table)
777    }
778
779    /// Assign constraints
780    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
781        self.constraints = constraints;
782        self
783    }
784
785    /// Assign column defaults
786    pub fn with_column_defaults(
787        mut self,
788        column_defaults: HashMap<String, Expr>,
789    ) -> Self {
790        self.column_defaults = column_defaults;
791        self
792    }
793
794    /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
795    ///
796    /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
797    /// multiple times in the same session.
798    ///
799    /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
800    pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
801        self.collected_statistics =
802            cache.unwrap_or(Arc::new(DefaultFileStatisticsCache::default()));
803        self
804    }
805
806    /// Specify the SQL definition for this table, if any
807    pub fn with_definition(mut self, definition: Option<String>) -> Self {
808        self.definition = definition;
809        self
810    }
811
812    /// Get paths ref
813    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
814        &self.table_paths
815    }
816
817    /// Get options ref
818    pub fn options(&self) -> &ListingOptions {
819        &self.options
820    }
821
822    /// If file_sort_order is specified, creates the appropriate physical expressions
823    fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
824        create_ordering(&self.table_schema, &self.options.file_sort_order)
825    }
826}
827
828// Expressions can be used for parttion pruning if they can be evaluated using
829// only the partiton columns and there are partition columns.
830fn can_be_evaluted_for_partition_pruning(
831    partition_column_names: &[&str],
832    expr: &Expr,
833) -> bool {
834    !partition_column_names.is_empty()
835        && expr_applicable_for_cols(partition_column_names, expr)
836}
837
838#[async_trait]
839impl TableProvider for ListingTable {
840    fn as_any(&self) -> &dyn Any {
841        self
842    }
843
844    fn schema(&self) -> SchemaRef {
845        Arc::clone(&self.table_schema)
846    }
847
848    fn constraints(&self) -> Option<&Constraints> {
849        Some(&self.constraints)
850    }
851
852    fn table_type(&self) -> TableType {
853        TableType::Base
854    }
855
856    async fn scan(
857        &self,
858        state: &dyn Session,
859        projection: Option<&Vec<usize>>,
860        filters: &[Expr],
861        limit: Option<usize>,
862    ) -> Result<Arc<dyn ExecutionPlan>> {
863        // extract types of partition columns
864        let table_partition_cols = self
865            .options
866            .table_partition_cols
867            .iter()
868            .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
869            .collect::<Result<Vec<_>>>()?;
870
871        let table_partition_col_names = table_partition_cols
872            .iter()
873            .map(|field| field.name().as_str())
874            .collect::<Vec<_>>();
875        // If the filters can be resolved using only partition cols, there is no need to
876        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
877        let (partition_filters, filters): (Vec<_>, Vec<_>) =
878            filters.iter().cloned().partition(|filter| {
879                can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
880            });
881
882        // We should not limit the number of partitioned files to scan if there are filters and limit
883        // at the same time. This is because the limit should be applied after the filters are applied.
884        let statistic_file_limit = if filters.is_empty() { limit } else { None };
885
886        let (mut partitioned_file_lists, statistics) = self
887            .list_files_for_scan(state, &partition_filters, statistic_file_limit)
888            .await?;
889
890        // if no files need to be read, return an `EmptyExec`
891        if partitioned_file_lists.is_empty() {
892            let projected_schema = project_schema(&self.schema(), projection)?;
893            return Ok(Arc::new(EmptyExec::new(projected_schema)));
894        }
895
896        let output_ordering = self.try_create_output_ordering()?;
897        match state
898            .config_options()
899            .execution
900            .split_file_groups_by_statistics
901            .then(|| {
902                output_ordering.first().map(|output_ordering| {
903                    FileScanConfig::split_groups_by_statistics_with_target_partitions(
904                        &self.table_schema,
905                        &partitioned_file_lists,
906                        output_ordering,
907                        self.options.target_partitions,
908                    )
909                })
910            })
911            .flatten()
912        {
913            Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
914            Some(Ok(new_groups)) => {
915                if new_groups.len() <= self.options.target_partitions {
916                    partitioned_file_lists = new_groups;
917                } else {
918                    log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
919                }
920            }
921            None => {} // no ordering required
922        };
923
924        let filters = match conjunction(filters.to_vec()) {
925            Some(expr) => {
926                let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
927                let filters = create_physical_expr(
928                    &expr,
929                    &table_df_schema,
930                    state.execution_props(),
931                )?;
932                Some(filters)
933            }
934            None => None,
935        };
936
937        let Some(object_store_url) =
938            self.table_paths.first().map(ListingTableUrl::object_store)
939        else {
940            return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
941        };
942
943        // create the execution plan
944        self.options
945            .format
946            .create_physical_plan(
947                state,
948                FileScanConfigBuilder::new(
949                    object_store_url,
950                    Arc::clone(&self.file_schema),
951                    self.options.format.file_source(),
952                )
953                .with_file_groups(partitioned_file_lists)
954                .with_constraints(self.constraints.clone())
955                .with_statistics(statistics)
956                .with_projection(projection.cloned())
957                .with_limit(limit)
958                .with_output_ordering(output_ordering)
959                .with_table_partition_cols(table_partition_cols)
960                .build(),
961                filters.as_ref(),
962            )
963            .await
964    }
965
966    fn supports_filters_pushdown(
967        &self,
968        filters: &[&Expr],
969    ) -> Result<Vec<TableProviderFilterPushDown>> {
970        let partition_column_names = self
971            .options
972            .table_partition_cols
973            .iter()
974            .map(|col| col.0.as_str())
975            .collect::<Vec<_>>();
976        filters
977            .iter()
978            .map(|filter| {
979                if can_be_evaluted_for_partition_pruning(&partition_column_names, filter)
980                {
981                    // if filter can be handled by partition pruning, it is exact
982                    return Ok(TableProviderFilterPushDown::Exact);
983                }
984
985                // if we can't push it down completely with only the filename-based/path-based
986                // column names, then we should check if we can do parquet predicate pushdown
987                let supports_pushdown = self.options.format.supports_filters_pushdown(
988                    &self.file_schema,
989                    &self.table_schema,
990                    &[filter],
991                )?;
992
993                if supports_pushdown == FilePushdownSupport::Supported {
994                    return Ok(TableProviderFilterPushDown::Exact);
995                }
996
997                Ok(TableProviderFilterPushDown::Inexact)
998            })
999            .collect()
1000    }
1001
1002    fn get_table_definition(&self) -> Option<&str> {
1003        self.definition.as_deref()
1004    }
1005
1006    async fn insert_into(
1007        &self,
1008        state: &dyn Session,
1009        input: Arc<dyn ExecutionPlan>,
1010        insert_op: InsertOp,
1011    ) -> Result<Arc<dyn ExecutionPlan>> {
1012        // Check that the schema of the plan matches the schema of this table.
1013        self.schema()
1014            .logically_equivalent_names_and_types(&input.schema())?;
1015
1016        let table_path = &self.table_paths()[0];
1017        if !table_path.is_collection() {
1018            return plan_err!(
1019                "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
1020                To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
1021            );
1022        }
1023
1024        // Get the object store for the table path.
1025        let store = state.runtime_env().object_store(table_path)?;
1026
1027        let file_list_stream = pruned_partition_list(
1028            state,
1029            store.as_ref(),
1030            table_path,
1031            &[],
1032            &self.options.file_extension,
1033            &self.options.table_partition_cols,
1034        )
1035        .await?;
1036
1037        let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
1038        let keep_partition_by_columns =
1039            state.config_options().execution.keep_partition_by_columns;
1040
1041        // Sink related option, apart from format
1042        let config = FileSinkConfig {
1043            original_url: String::default(),
1044            object_store_url: self.table_paths()[0].object_store(),
1045            table_paths: self.table_paths().clone(),
1046            file_group,
1047            output_schema: self.schema(),
1048            table_partition_cols: self.options.table_partition_cols.clone(),
1049            insert_op,
1050            keep_partition_by_columns,
1051            file_extension: self.options().format.get_ext(),
1052        };
1053
1054        let order_requirements = if !self.options().file_sort_order.is_empty() {
1055            // Multiple sort orders in outer vec are equivalent, so we pass only the first one
1056            let orderings = self.try_create_output_ordering()?;
1057            let Some(ordering) = orderings.first() else {
1058                return internal_err!(
1059                    "Expected ListingTable to have a sort order, but none found!"
1060                );
1061            };
1062            // Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
1063            Some(LexRequirement::new(
1064                ordering
1065                    .into_iter()
1066                    .cloned()
1067                    .map(PhysicalSortRequirement::from)
1068                    .collect::<Vec<_>>(),
1069            ))
1070        } else {
1071            None
1072        };
1073
1074        self.options()
1075            .format
1076            .create_writer_physical_plan(input, state, config, order_requirements)
1077            .await
1078    }
1079
1080    fn get_column_default(&self, column: &str) -> Option<&Expr> {
1081        self.column_defaults.get(column)
1082    }
1083}
1084
1085impl ListingTable {
1086    /// Get the list of files for a scan as well as the file level statistics.
1087    /// The list is grouped to let the execution plan know how the files should
1088    /// be distributed to different threads / executors.
1089    async fn list_files_for_scan<'a>(
1090        &'a self,
1091        ctx: &'a dyn Session,
1092        filters: &'a [Expr],
1093        limit: Option<usize>,
1094    ) -> Result<(Vec<FileGroup>, Statistics)> {
1095        let store = if let Some(url) = self.table_paths.first() {
1096            ctx.runtime_env().object_store(url)?
1097        } else {
1098            return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
1099        };
1100        // list files (with partitions)
1101        let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
1102            pruned_partition_list(
1103                ctx,
1104                store.as_ref(),
1105                table_path,
1106                filters,
1107                &self.options.file_extension,
1108                &self.options.table_partition_cols,
1109            )
1110        }))
1111        .await?;
1112        let meta_fetch_concurrency =
1113            ctx.config_options().execution.meta_fetch_concurrency;
1114        let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
1115        // collect the statistics if required by the config
1116        let files = file_list
1117            .map(|part_file| async {
1118                let part_file = part_file?;
1119                let statistics = if self.options.collect_stat {
1120                    self.do_collect_statistics(ctx, &store, &part_file).await?
1121                } else {
1122                    Arc::new(Statistics::new_unknown(&self.file_schema))
1123                };
1124                Ok(part_file.with_statistics(statistics))
1125            })
1126            .boxed()
1127            .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
1128
1129        let (file_group, inexact_stats) =
1130            get_files_with_limit(files, limit, self.options.collect_stat).await?;
1131
1132        let file_groups = file_group.split_files(self.options.target_partitions);
1133        compute_all_files_statistics(
1134            file_groups,
1135            self.schema(),
1136            self.options.collect_stat,
1137            inexact_stats,
1138        )
1139    }
1140
1141    /// Collects statistics for a given partitioned file.
1142    ///
1143    /// This method first checks if the statistics for the given file are already cached.
1144    /// If they are, it returns the cached statistics.
1145    /// If they are not, it infers the statistics from the file and stores them in the cache.
1146    async fn do_collect_statistics(
1147        &self,
1148        ctx: &dyn Session,
1149        store: &Arc<dyn ObjectStore>,
1150        part_file: &PartitionedFile,
1151    ) -> Result<Arc<Statistics>> {
1152        match self
1153            .collected_statistics
1154            .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
1155        {
1156            Some(statistics) => Ok(statistics),
1157            None => {
1158                let statistics = self
1159                    .options
1160                    .format
1161                    .infer_stats(
1162                        ctx,
1163                        store,
1164                        Arc::clone(&self.file_schema),
1165                        &part_file.object_meta,
1166                    )
1167                    .await?;
1168                let statistics = Arc::new(statistics);
1169                self.collected_statistics.put_with_extra(
1170                    &part_file.object_meta.location,
1171                    Arc::clone(&statistics),
1172                    &part_file.object_meta,
1173                );
1174                Ok(statistics)
1175            }
1176        }
1177    }
1178}
1179
1180/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
1181///
1182/// This function collects files from the provided stream until either:
1183/// 1. The stream is exhausted
1184/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
1185///
1186/// # Arguments
1187/// * `files` - A stream of `Result<PartitionedFile>` items to process
1188/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
1189///   once the accumulated number of rows exceeds this limit
1190/// * `collect_stats` - Whether to collect and accumulate statistics from the files
1191///
1192/// # Returns
1193/// A `Result` containing a `FileGroup` with the collected files
1194/// and a boolean indicating whether the statistics are inexact.
1195///
1196/// # Note
1197/// The function will continue processing files if statistics are not available or if the
1198/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
1199/// but files will still be collected.
1200async fn get_files_with_limit(
1201    files: impl Stream<Item = Result<PartitionedFile>>,
1202    limit: Option<usize>,
1203    collect_stats: bool,
1204) -> Result<(FileGroup, bool)> {
1205    let mut file_group = FileGroup::default();
1206    // Fusing the stream allows us to call next safely even once it is finished.
1207    let mut all_files = Box::pin(files.fuse());
1208    enum ProcessingState {
1209        ReadingFiles,
1210        ReachedLimit,
1211    }
1212
1213    let mut state = ProcessingState::ReadingFiles;
1214    let mut num_rows = Precision::Absent;
1215
1216    while let Some(file_result) = all_files.next().await {
1217        // Early exit if we've already reached our limit
1218        if matches!(state, ProcessingState::ReachedLimit) {
1219            break;
1220        }
1221
1222        let file = file_result?;
1223
1224        // Update file statistics regardless of state
1225        if collect_stats {
1226            if let Some(file_stats) = &file.statistics {
1227                num_rows = if file_group.is_empty() {
1228                    // For the first file, just take its row count
1229                    file_stats.num_rows
1230                } else {
1231                    // For subsequent files, accumulate the counts
1232                    num_rows.add(&file_stats.num_rows)
1233                };
1234            }
1235        }
1236
1237        // Always add the file to our group
1238        file_group.push(file);
1239
1240        // Check if we've hit the limit (if one was specified)
1241        if let Some(limit) = limit {
1242            if let Precision::Exact(row_count) = num_rows {
1243                if row_count > limit {
1244                    state = ProcessingState::ReachedLimit;
1245                }
1246            }
1247        }
1248    }
1249    // If we still have files in the stream, it means that the limit kicked
1250    // in, and the statistic could have been different had we processed the
1251    // files in a different order.
1252    let inexact_stats = all_files.next().await.is_some();
1253    Ok((file_group, inexact_stats))
1254}
1255
1256#[cfg(test)]
1257mod tests {
1258    use super::*;
1259    use crate::datasource::file_format::csv::CsvFormat;
1260    use crate::datasource::file_format::json::JsonFormat;
1261    #[cfg(feature = "parquet")]
1262    use crate::datasource::file_format::parquet::ParquetFormat;
1263    use crate::datasource::{provider_as_source, DefaultTableSource, MemTable};
1264    use crate::execution::options::ArrowReadOptions;
1265    use crate::prelude::*;
1266    use crate::test::{columns, object_store::register_test_store};
1267
1268    use arrow::compute::SortOptions;
1269    use arrow::record_batch::RecordBatch;
1270    use datafusion_common::stats::Precision;
1271    use datafusion_common::test_util::batches_to_string;
1272    use datafusion_common::{assert_contains, ScalarValue};
1273    use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
1274    use datafusion_physical_expr::PhysicalSortExpr;
1275    use datafusion_physical_plan::collect;
1276    use datafusion_physical_plan::ExecutionPlanProperties;
1277
1278    use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
1279    use tempfile::TempDir;
1280    use url::Url;
1281
1282    #[tokio::test]
1283    async fn read_single_file() -> Result<()> {
1284        let ctx = SessionContext::new();
1285
1286        let table = load_table(&ctx, "alltypes_plain.parquet").await?;
1287        let projection = None;
1288        let exec = table
1289            .scan(&ctx.state(), projection, &[], None)
1290            .await
1291            .expect("Scan table");
1292
1293        assert_eq!(exec.children().len(), 0);
1294        assert_eq!(exec.output_partitioning().partition_count(), 1);
1295
1296        // test metadata
1297        assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1298        assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1299
1300        Ok(())
1301    }
1302
1303    #[cfg(feature = "parquet")]
1304    #[tokio::test]
1305    async fn load_table_stats_by_default() -> Result<()> {
1306        use crate::datasource::file_format::parquet::ParquetFormat;
1307
1308        let testdata = crate::test_util::parquet_test_data();
1309        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1310        let table_path = ListingTableUrl::parse(filename).unwrap();
1311
1312        let ctx = SessionContext::new();
1313        let state = ctx.state();
1314
1315        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
1316        let schema = opt.infer_schema(&state, &table_path).await?;
1317        let config = ListingTableConfig::new(table_path)
1318            .with_listing_options(opt)
1319            .with_schema(schema);
1320        let table = ListingTable::try_new(config)?;
1321
1322        let exec = table.scan(&state, None, &[], None).await?;
1323        assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1324        // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
1325        assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1326
1327        Ok(())
1328    }
1329
1330    #[cfg(feature = "parquet")]
1331    #[tokio::test]
1332    async fn load_table_stats_when_no_stats() -> Result<()> {
1333        use crate::datasource::file_format::parquet::ParquetFormat;
1334
1335        let testdata = crate::test_util::parquet_test_data();
1336        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1337        let table_path = ListingTableUrl::parse(filename).unwrap();
1338
1339        let ctx = SessionContext::new();
1340        let state = ctx.state();
1341
1342        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
1343            .with_collect_stat(false);
1344        let schema = opt.infer_schema(&state, &table_path).await?;
1345        let config = ListingTableConfig::new(table_path)
1346            .with_listing_options(opt)
1347            .with_schema(schema);
1348        let table = ListingTable::try_new(config)?;
1349
1350        let exec = table.scan(&state, None, &[], None).await?;
1351        assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
1352        assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
1353
1354        Ok(())
1355    }
1356
1357    #[cfg(feature = "parquet")]
1358    #[tokio::test]
1359    async fn test_try_create_output_ordering() {
1360        let testdata = crate::test_util::parquet_test_data();
1361        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1362        let table_path = ListingTableUrl::parse(filename).unwrap();
1363
1364        let ctx = SessionContext::new();
1365        let state = ctx.state();
1366        let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
1367        let schema = options.infer_schema(&state, &table_path).await.unwrap();
1368
1369        use crate::datasource::file_format::parquet::ParquetFormat;
1370        use datafusion_physical_plan::expressions::col as physical_col;
1371        use std::ops::Add;
1372
1373        // (file_sort_order, expected_result)
1374        let cases = vec![
1375            (vec![], Ok(vec![])),
1376            // sort expr, but non column
1377            (
1378                vec![vec![
1379                    col("int_col").add(lit(1)).sort(true, true),
1380                ]],
1381                Err("Expected single column reference in sort_order[0][0], got int_col + Int32(1)"),
1382            ),
1383            // ok with one column
1384            (
1385                vec![vec![col("string_col").sort(true, false)]],
1386                Ok(vec![LexOrdering::new(
1387                        vec![PhysicalSortExpr {
1388                            expr: physical_col("string_col", &schema).unwrap(),
1389                            options: SortOptions {
1390                                descending: false,
1391                                nulls_first: false,
1392                            },
1393                        }],
1394                )
1395                ])
1396            ),
1397            // ok with two columns, different options
1398            (
1399                vec![vec![
1400                    col("string_col").sort(true, false),
1401                    col("int_col").sort(false, true),
1402                ]],
1403                Ok(vec![LexOrdering::new(
1404                        vec![
1405                            PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
1406                                        .asc()
1407                                        .nulls_last(),
1408                            PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
1409                                        .desc()
1410                                        .nulls_first()
1411                        ],
1412                )
1413                ])
1414            ),
1415        ];
1416
1417        for (file_sort_order, expected_result) in cases {
1418            let options = options.clone().with_file_sort_order(file_sort_order);
1419
1420            let config = ListingTableConfig::new(table_path.clone())
1421                .with_listing_options(options)
1422                .with_schema(schema.clone());
1423
1424            let table =
1425                ListingTable::try_new(config.clone()).expect("Creating the table");
1426            let ordering_result = table.try_create_output_ordering();
1427
1428            match (expected_result, ordering_result) {
1429                (Ok(expected), Ok(result)) => {
1430                    assert_eq!(expected, result);
1431                }
1432                (Err(expected), Err(result)) => {
1433                    // can't compare the DataFusionError directly
1434                    let result = result.to_string();
1435                    let expected = expected.to_string();
1436                    assert_contains!(result.to_string(), expected);
1437                }
1438                (expected_result, ordering_result) => {
1439                    panic!(
1440                        "expected: {expected_result:#?}\n\nactual:{ordering_result:#?}"
1441                    );
1442                }
1443            }
1444        }
1445    }
1446
1447    #[tokio::test]
1448    async fn read_empty_table() -> Result<()> {
1449        let ctx = SessionContext::new();
1450        let path = String::from("table/p1=v1/file.json");
1451        register_test_store(&ctx, &[(&path, 100)]);
1452
1453        let format = JsonFormat::default();
1454        let ext = format.get_ext();
1455
1456        let opt = ListingOptions::new(Arc::new(format))
1457            .with_file_extension(ext)
1458            .with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
1459            .with_target_partitions(4);
1460
1461        let table_path = ListingTableUrl::parse("test:///table/").unwrap();
1462        let file_schema =
1463            Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
1464        let config = ListingTableConfig::new(table_path)
1465            .with_listing_options(opt)
1466            .with_schema(file_schema);
1467        let table = ListingTable::try_new(config)?;
1468
1469        assert_eq!(
1470            columns(&table.schema()),
1471            vec!["a".to_owned(), "p1".to_owned()]
1472        );
1473
1474        // this will filter out the only file in the store
1475        let filter = Expr::not_eq(col("p1"), lit("v1"));
1476
1477        let scan = table
1478            .scan(&ctx.state(), None, &[filter], None)
1479            .await
1480            .expect("Empty execution plan");
1481
1482        assert!(scan.as_any().is::<EmptyExec>());
1483        assert_eq!(
1484            columns(&scan.schema()),
1485            vec!["a".to_owned(), "p1".to_owned()]
1486        );
1487
1488        Ok(())
1489    }
1490
1491    #[tokio::test]
1492    async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
1493        // more expected partitions than files
1494        assert_list_files_for_scan_grouping(
1495            &[
1496                "bucket/key-prefix/file0",
1497                "bucket/key-prefix/file1",
1498                "bucket/key-prefix/file2",
1499                "bucket/key-prefix/file3",
1500                "bucket/key-prefix/file4",
1501            ],
1502            "test:///bucket/key-prefix/",
1503            12,
1504            5,
1505            Some(""),
1506        )
1507        .await?;
1508
1509        // as many expected partitions as files
1510        assert_list_files_for_scan_grouping(
1511            &[
1512                "bucket/key-prefix/file0",
1513                "bucket/key-prefix/file1",
1514                "bucket/key-prefix/file2",
1515                "bucket/key-prefix/file3",
1516            ],
1517            "test:///bucket/key-prefix/",
1518            4,
1519            4,
1520            Some(""),
1521        )
1522        .await?;
1523
1524        // more files as expected partitions
1525        assert_list_files_for_scan_grouping(
1526            &[
1527                "bucket/key-prefix/file0",
1528                "bucket/key-prefix/file1",
1529                "bucket/key-prefix/file2",
1530                "bucket/key-prefix/file3",
1531                "bucket/key-prefix/file4",
1532            ],
1533            "test:///bucket/key-prefix/",
1534            2,
1535            2,
1536            Some(""),
1537        )
1538        .await?;
1539
1540        // no files => no groups
1541        assert_list_files_for_scan_grouping(
1542            &[],
1543            "test:///bucket/key-prefix/",
1544            2,
1545            0,
1546            Some(""),
1547        )
1548        .await?;
1549
1550        // files that don't match the prefix
1551        assert_list_files_for_scan_grouping(
1552            &[
1553                "bucket/key-prefix/file0",
1554                "bucket/key-prefix/file1",
1555                "bucket/other-prefix/roguefile",
1556            ],
1557            "test:///bucket/key-prefix/",
1558            10,
1559            2,
1560            Some(""),
1561        )
1562        .await?;
1563
1564        // files that don't match the prefix or the default file extention
1565        assert_list_files_for_scan_grouping(
1566            &[
1567                "bucket/key-prefix/file0.json",
1568                "bucket/key-prefix/file1.parquet",
1569                "bucket/other-prefix/roguefile.json",
1570            ],
1571            "test:///bucket/key-prefix/",
1572            10,
1573            1,
1574            None,
1575        )
1576        .await?;
1577        Ok(())
1578    }
1579
1580    #[tokio::test]
1581    async fn test_assert_list_files_for_multi_path() -> Result<()> {
1582        // more expected partitions than files
1583        assert_list_files_for_multi_paths(
1584            &[
1585                "bucket/key1/file0",
1586                "bucket/key1/file1",
1587                "bucket/key1/file2",
1588                "bucket/key2/file3",
1589                "bucket/key2/file4",
1590                "bucket/key3/file5",
1591            ],
1592            &["test:///bucket/key1/", "test:///bucket/key2/"],
1593            12,
1594            5,
1595            Some(""),
1596        )
1597        .await?;
1598
1599        // as many expected partitions as files
1600        assert_list_files_for_multi_paths(
1601            &[
1602                "bucket/key1/file0",
1603                "bucket/key1/file1",
1604                "bucket/key1/file2",
1605                "bucket/key2/file3",
1606                "bucket/key2/file4",
1607                "bucket/key3/file5",
1608            ],
1609            &["test:///bucket/key1/", "test:///bucket/key2/"],
1610            5,
1611            5,
1612            Some(""),
1613        )
1614        .await?;
1615
1616        // more files as expected partitions
1617        assert_list_files_for_multi_paths(
1618            &[
1619                "bucket/key1/file0",
1620                "bucket/key1/file1",
1621                "bucket/key1/file2",
1622                "bucket/key2/file3",
1623                "bucket/key2/file4",
1624                "bucket/key3/file5",
1625            ],
1626            &["test:///bucket/key1/"],
1627            2,
1628            2,
1629            Some(""),
1630        )
1631        .await?;
1632
1633        // no files => no groups
1634        assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0, Some(""))
1635            .await?;
1636
1637        // files that don't match the prefix
1638        assert_list_files_for_multi_paths(
1639            &[
1640                "bucket/key1/file0",
1641                "bucket/key1/file1",
1642                "bucket/key1/file2",
1643                "bucket/key2/file3",
1644                "bucket/key2/file4",
1645                "bucket/key3/file5",
1646            ],
1647            &["test:///bucket/key3/"],
1648            2,
1649            1,
1650            Some(""),
1651        )
1652        .await?;
1653
1654        // files that don't match the prefix or the default file ext
1655        assert_list_files_for_multi_paths(
1656            &[
1657                "bucket/key1/file0.json",
1658                "bucket/key1/file1.csv",
1659                "bucket/key1/file2.json",
1660                "bucket/key2/file3.csv",
1661                "bucket/key2/file4.json",
1662                "bucket/key3/file5.csv",
1663            ],
1664            &["test:///bucket/key1/", "test:///bucket/key3/"],
1665            2,
1666            2,
1667            None,
1668        )
1669        .await?;
1670        Ok(())
1671    }
1672
1673    #[tokio::test]
1674    async fn test_assert_list_files_for_exact_paths() -> Result<()> {
1675        // more expected partitions than files
1676        assert_list_files_for_exact_paths(
1677            &[
1678                "bucket/key1/file0",
1679                "bucket/key1/file1",
1680                "bucket/key1/file2",
1681                "bucket/key2/file3",
1682                "bucket/key2/file4",
1683            ],
1684            12,
1685            5,
1686            Some(""),
1687        )
1688        .await?;
1689
1690        // more files than meta_fetch_concurrency (32)
1691        let files: Vec<String> =
1692            (0..64).map(|i| format!("bucket/key1/file{}", i)).collect();
1693        // Collect references to each string
1694        let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1695        assert_list_files_for_exact_paths(file_refs.as_slice(), 5, 5, Some("")).await?;
1696
1697        // as many expected partitions as files
1698        assert_list_files_for_exact_paths(
1699            &[
1700                "bucket/key1/file0",
1701                "bucket/key1/file1",
1702                "bucket/key1/file2",
1703                "bucket/key2/file3",
1704                "bucket/key2/file4",
1705            ],
1706            5,
1707            5,
1708            Some(""),
1709        )
1710        .await?;
1711
1712        // more files as expected partitions
1713        assert_list_files_for_exact_paths(
1714            &[
1715                "bucket/key1/file0",
1716                "bucket/key1/file1",
1717                "bucket/key1/file2",
1718                "bucket/key2/file3",
1719                "bucket/key2/file4",
1720            ],
1721            2,
1722            2,
1723            Some(""),
1724        )
1725        .await?;
1726
1727        // no files => no groups
1728        assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?;
1729
1730        // files that don't match the default file ext
1731        assert_list_files_for_exact_paths(
1732            &[
1733                "bucket/key1/file0.json",
1734                "bucket/key1/file1.csv",
1735                "bucket/key1/file2.json",
1736                "bucket/key2/file3.csv",
1737                "bucket/key2/file4.json",
1738                "bucket/key3/file5.csv",
1739            ],
1740            2,
1741            2,
1742            None,
1743        )
1744        .await?;
1745        Ok(())
1746    }
1747
1748    async fn load_table(
1749        ctx: &SessionContext,
1750        name: &str,
1751    ) -> Result<Arc<dyn TableProvider>> {
1752        let testdata = crate::test_util::parquet_test_data();
1753        let filename = format!("{testdata}/{name}");
1754        let table_path = ListingTableUrl::parse(filename).unwrap();
1755
1756        let config = ListingTableConfig::new(table_path)
1757            .infer(&ctx.state())
1758            .await?;
1759        let table = ListingTable::try_new(config)?;
1760        Ok(Arc::new(table))
1761    }
1762
1763    /// Check that the files listed by the table match the specified `output_partitioning`
1764    /// when the object store contains `files`.
1765    async fn assert_list_files_for_scan_grouping(
1766        files: &[&str],
1767        table_prefix: &str,
1768        target_partitions: usize,
1769        output_partitioning: usize,
1770        file_ext: Option<&str>,
1771    ) -> Result<()> {
1772        let ctx = SessionContext::new();
1773        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1774
1775        let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
1776            .with_file_extension_opt(file_ext)
1777            .with_target_partitions(target_partitions);
1778
1779        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1780
1781        let table_path = ListingTableUrl::parse(table_prefix).unwrap();
1782        let config = ListingTableConfig::new(table_path)
1783            .with_listing_options(opt)
1784            .with_schema(Arc::new(schema));
1785
1786        let table = ListingTable::try_new(config)?;
1787
1788        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1789
1790        assert_eq!(file_list.len(), output_partitioning);
1791
1792        Ok(())
1793    }
1794
1795    /// Check that the files listed by the table match the specified `output_partitioning`
1796    /// when the object store contains `files`.
1797    async fn assert_list_files_for_multi_paths(
1798        files: &[&str],
1799        table_prefix: &[&str],
1800        target_partitions: usize,
1801        output_partitioning: usize,
1802        file_ext: Option<&str>,
1803    ) -> Result<()> {
1804        let ctx = SessionContext::new();
1805        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1806
1807        let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
1808            .with_file_extension_opt(file_ext)
1809            .with_target_partitions(target_partitions);
1810
1811        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1812
1813        let table_paths = table_prefix
1814            .iter()
1815            .map(|t| ListingTableUrl::parse(t).unwrap())
1816            .collect();
1817        let config = ListingTableConfig::new_with_multi_paths(table_paths)
1818            .with_listing_options(opt)
1819            .with_schema(Arc::new(schema));
1820
1821        let table = ListingTable::try_new(config)?;
1822
1823        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1824
1825        assert_eq!(file_list.len(), output_partitioning);
1826
1827        Ok(())
1828    }
1829
1830    /// Check that the files listed by the table match the specified `output_partitioning`
1831    /// when the object store contains `files`, and validate that file metadata is fetched
1832    /// concurrently
1833    async fn assert_list_files_for_exact_paths(
1834        files: &[&str],
1835        target_partitions: usize,
1836        output_partitioning: usize,
1837        file_ext: Option<&str>,
1838    ) -> Result<()> {
1839        let ctx = SessionContext::new();
1840        let (store, _) = make_test_store_and_state(
1841            &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
1842        );
1843
1844        let meta_fetch_concurrency = ctx
1845            .state()
1846            .config_options()
1847            .execution
1848            .meta_fetch_concurrency;
1849        let expected_concurrency = files.len().min(meta_fetch_concurrency);
1850        let head_blocking_store = ensure_head_concurrency(store, expected_concurrency);
1851
1852        let url = Url::parse("test://").unwrap();
1853        ctx.register_object_store(&url, head_blocking_store.clone());
1854
1855        let format = JsonFormat::default();
1856
1857        let opt = ListingOptions::new(Arc::new(format))
1858            .with_file_extension_opt(file_ext)
1859            .with_target_partitions(target_partitions);
1860
1861        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1862
1863        let table_paths = files
1864            .iter()
1865            .map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap())
1866            .collect();
1867        let config = ListingTableConfig::new_with_multi_paths(table_paths)
1868            .with_listing_options(opt)
1869            .with_schema(Arc::new(schema));
1870
1871        let table = ListingTable::try_new(config)?;
1872
1873        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1874
1875        assert_eq!(file_list.len(), output_partitioning);
1876
1877        Ok(())
1878    }
1879
1880    #[tokio::test]
1881    async fn test_insert_into_append_new_json_files() -> Result<()> {
1882        let mut config_map: HashMap<String, String> = HashMap::new();
1883        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1884        config_map.insert(
1885            "datafusion.execution.soft_max_rows_per_output_file".into(),
1886            "10".into(),
1887        );
1888        helper_test_append_new_files_to_table(
1889            JsonFormat::default().get_ext(),
1890            FileCompressionType::UNCOMPRESSED,
1891            Some(config_map),
1892            2,
1893        )
1894        .await?;
1895        Ok(())
1896    }
1897
1898    #[tokio::test]
1899    async fn test_insert_into_append_new_csv_files() -> Result<()> {
1900        let mut config_map: HashMap<String, String> = HashMap::new();
1901        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1902        config_map.insert(
1903            "datafusion.execution.soft_max_rows_per_output_file".into(),
1904            "10".into(),
1905        );
1906        helper_test_append_new_files_to_table(
1907            CsvFormat::default().get_ext(),
1908            FileCompressionType::UNCOMPRESSED,
1909            Some(config_map),
1910            2,
1911        )
1912        .await?;
1913        Ok(())
1914    }
1915
1916    #[cfg(feature = "parquet")]
1917    #[tokio::test]
1918    async fn test_insert_into_append_2_new_parquet_files_defaults() -> Result<()> {
1919        let mut config_map: HashMap<String, String> = HashMap::new();
1920        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1921        config_map.insert(
1922            "datafusion.execution.soft_max_rows_per_output_file".into(),
1923            "10".into(),
1924        );
1925        helper_test_append_new_files_to_table(
1926            ParquetFormat::default().get_ext(),
1927            FileCompressionType::UNCOMPRESSED,
1928            Some(config_map),
1929            2,
1930        )
1931        .await?;
1932        Ok(())
1933    }
1934
1935    #[cfg(feature = "parquet")]
1936    #[tokio::test]
1937    async fn test_insert_into_append_1_new_parquet_files_defaults() -> Result<()> {
1938        let mut config_map: HashMap<String, String> = HashMap::new();
1939        config_map.insert("datafusion.execution.batch_size".into(), "20".into());
1940        config_map.insert(
1941            "datafusion.execution.soft_max_rows_per_output_file".into(),
1942            "20".into(),
1943        );
1944        helper_test_append_new_files_to_table(
1945            ParquetFormat::default().get_ext(),
1946            FileCompressionType::UNCOMPRESSED,
1947            Some(config_map),
1948            1,
1949        )
1950        .await?;
1951        Ok(())
1952    }
1953
1954    #[tokio::test]
1955    async fn test_insert_into_sql_csv_defaults() -> Result<()> {
1956        helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None)
1957            .await?;
1958        Ok(())
1959    }
1960
1961    #[tokio::test]
1962    async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
1963        helper_test_insert_into_sql(
1964            "csv",
1965            FileCompressionType::UNCOMPRESSED,
1966            "",
1967            Some(HashMap::from([("has_header".into(), "true".into())])),
1968        )
1969        .await?;
1970        Ok(())
1971    }
1972
1973    #[tokio::test]
1974    async fn test_insert_into_sql_json_defaults() -> Result<()> {
1975        helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None)
1976            .await?;
1977        Ok(())
1978    }
1979
1980    #[tokio::test]
1981    async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
1982        helper_test_insert_into_sql(
1983            "parquet",
1984            FileCompressionType::UNCOMPRESSED,
1985            "",
1986            None,
1987        )
1988        .await?;
1989        Ok(())
1990    }
1991
1992    #[tokio::test]
1993    async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
1994        let mut config_map: HashMap<String, String> = HashMap::new();
1995        config_map.insert(
1996            "datafusion.execution.parquet.compression".into(),
1997            "zstd(5)".into(),
1998        );
1999        config_map.insert(
2000            "datafusion.execution.parquet.dictionary_enabled".into(),
2001            "false".into(),
2002        );
2003        config_map.insert(
2004            "datafusion.execution.parquet.dictionary_page_size_limit".into(),
2005            "100".into(),
2006        );
2007        config_map.insert(
2008            "datafusion.execution.parquet.statistics_enabled".into(),
2009            "none".into(),
2010        );
2011        config_map.insert(
2012            "datafusion.execution.parquet.max_statistics_size".into(),
2013            "10".into(),
2014        );
2015        config_map.insert(
2016            "datafusion.execution.parquet.max_row_group_size".into(),
2017            "5".into(),
2018        );
2019        config_map.insert(
2020            "datafusion.execution.parquet.created_by".into(),
2021            "datafusion test".into(),
2022        );
2023        config_map.insert(
2024            "datafusion.execution.parquet.column_index_truncate_length".into(),
2025            "50".into(),
2026        );
2027        config_map.insert(
2028            "datafusion.execution.parquet.data_page_row_count_limit".into(),
2029            "50".into(),
2030        );
2031        config_map.insert(
2032            "datafusion.execution.parquet.bloom_filter_on_write".into(),
2033            "true".into(),
2034        );
2035        config_map.insert(
2036            "datafusion.execution.parquet.bloom_filter_fpp".into(),
2037            "0.01".into(),
2038        );
2039        config_map.insert(
2040            "datafusion.execution.parquet.bloom_filter_ndv".into(),
2041            "1000".into(),
2042        );
2043        config_map.insert(
2044            "datafusion.execution.parquet.writer_version".into(),
2045            "2.0".into(),
2046        );
2047        config_map.insert(
2048            "datafusion.execution.parquet.write_batch_size".into(),
2049            "5".into(),
2050        );
2051        helper_test_insert_into_sql(
2052            "parquet",
2053            FileCompressionType::UNCOMPRESSED,
2054            "",
2055            Some(config_map),
2056        )
2057        .await?;
2058        Ok(())
2059    }
2060
2061    #[tokio::test]
2062    async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
2063        let mut config_map: HashMap<String, String> = HashMap::new();
2064        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
2065        config_map.insert(
2066            "datafusion.execution.soft_max_rows_per_output_file".into(),
2067            "10".into(),
2068        );
2069        config_map.insert(
2070            "datafusion.execution.parquet.compression".into(),
2071            "zstd(5)".into(),
2072        );
2073        config_map.insert(
2074            "datafusion.execution.parquet.dictionary_enabled".into(),
2075            "false".into(),
2076        );
2077        config_map.insert(
2078            "datafusion.execution.parquet.dictionary_page_size_limit".into(),
2079            "100".into(),
2080        );
2081        config_map.insert(
2082            "datafusion.execution.parquet.statistics_enabled".into(),
2083            "none".into(),
2084        );
2085        config_map.insert(
2086            "datafusion.execution.parquet.max_statistics_size".into(),
2087            "10".into(),
2088        );
2089        config_map.insert(
2090            "datafusion.execution.parquet.max_row_group_size".into(),
2091            "5".into(),
2092        );
2093        config_map.insert(
2094            "datafusion.execution.parquet.created_by".into(),
2095            "datafusion test".into(),
2096        );
2097        config_map.insert(
2098            "datafusion.execution.parquet.column_index_truncate_length".into(),
2099            "50".into(),
2100        );
2101        config_map.insert(
2102            "datafusion.execution.parquet.data_page_row_count_limit".into(),
2103            "50".into(),
2104        );
2105        config_map.insert(
2106            "datafusion.execution.parquet.encoding".into(),
2107            "delta_binary_packed".into(),
2108        );
2109        config_map.insert(
2110            "datafusion.execution.parquet.bloom_filter_on_write".into(),
2111            "true".into(),
2112        );
2113        config_map.insert(
2114            "datafusion.execution.parquet.bloom_filter_fpp".into(),
2115            "0.01".into(),
2116        );
2117        config_map.insert(
2118            "datafusion.execution.parquet.bloom_filter_ndv".into(),
2119            "1000".into(),
2120        );
2121        config_map.insert(
2122            "datafusion.execution.parquet.writer_version".into(),
2123            "2.0".into(),
2124        );
2125        config_map.insert(
2126            "datafusion.execution.parquet.write_batch_size".into(),
2127            "5".into(),
2128        );
2129        config_map.insert("datafusion.execution.batch_size".into(), "1".into());
2130        helper_test_append_new_files_to_table(
2131            ParquetFormat::default().get_ext(),
2132            FileCompressionType::UNCOMPRESSED,
2133            Some(config_map),
2134            2,
2135        )
2136        .await?;
2137        Ok(())
2138    }
2139
2140    #[tokio::test]
2141    async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
2142    ) -> Result<()> {
2143        let mut config_map: HashMap<String, String> = HashMap::new();
2144        config_map.insert(
2145            "datafusion.execution.parquet.compression".into(),
2146            "zstd".into(),
2147        );
2148        let e = helper_test_append_new_files_to_table(
2149            ParquetFormat::default().get_ext(),
2150            FileCompressionType::UNCOMPRESSED,
2151            Some(config_map),
2152            2,
2153        )
2154        .await
2155        .expect_err("Example should fail!");
2156        assert_eq!(e.strip_backtrace(), "Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)");
2157
2158        Ok(())
2159    }
2160
2161    async fn helper_test_append_new_files_to_table(
2162        file_type_ext: String,
2163        file_compression_type: FileCompressionType,
2164        session_config_map: Option<HashMap<String, String>>,
2165        expected_n_files_per_insert: usize,
2166    ) -> Result<()> {
2167        // Create the initial context, schema, and batch.
2168        let session_ctx = match session_config_map {
2169            Some(cfg) => {
2170                let config = SessionConfig::from_string_hash_map(&cfg)?;
2171                SessionContext::new_with_config(config)
2172            }
2173            None => SessionContext::new(),
2174        };
2175
2176        // Create a new schema with one field called "a" of type Int32
2177        let schema = Arc::new(Schema::new(vec![Field::new(
2178            "column1",
2179            DataType::Int32,
2180            false,
2181        )]));
2182
2183        let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
2184            Box::new(Expr::Column("column1".into())),
2185            Operator::GtEq,
2186            Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))),
2187        ));
2188
2189        // Create a new batch of data to insert into the table
2190        let batch = RecordBatch::try_new(
2191            schema.clone(),
2192            vec![Arc::new(arrow::array::Int32Array::from(vec![
2193                1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
2194            ]))],
2195        )?;
2196
2197        // Register appropriate table depending on file_type we want to test
2198        let tmp_dir = TempDir::new()?;
2199        match file_type_ext.as_str() {
2200            "csv" => {
2201                session_ctx
2202                    .register_csv(
2203                        "t",
2204                        tmp_dir.path().to_str().unwrap(),
2205                        CsvReadOptions::new()
2206                            .schema(schema.as_ref())
2207                            .file_compression_type(file_compression_type),
2208                    )
2209                    .await?;
2210            }
2211            "json" => {
2212                session_ctx
2213                    .register_json(
2214                        "t",
2215                        tmp_dir.path().to_str().unwrap(),
2216                        NdJsonReadOptions::default()
2217                            .schema(schema.as_ref())
2218                            .file_compression_type(file_compression_type),
2219                    )
2220                    .await?;
2221            }
2222            #[cfg(feature = "parquet")]
2223            "parquet" => {
2224                session_ctx
2225                    .register_parquet(
2226                        "t",
2227                        tmp_dir.path().to_str().unwrap(),
2228                        ParquetReadOptions::default().schema(schema.as_ref()),
2229                    )
2230                    .await?;
2231            }
2232            #[cfg(feature = "avro")]
2233            "avro" => {
2234                session_ctx
2235                    .register_avro(
2236                        "t",
2237                        tmp_dir.path().to_str().unwrap(),
2238                        AvroReadOptions::default().schema(schema.as_ref()),
2239                    )
2240                    .await?;
2241            }
2242            "arrow" => {
2243                session_ctx
2244                    .register_arrow(
2245                        "t",
2246                        tmp_dir.path().to_str().unwrap(),
2247                        ArrowReadOptions::default().schema(schema.as_ref()),
2248                    )
2249                    .await?;
2250            }
2251            _ => panic!("Unrecognized file extension {file_type_ext}"),
2252        }
2253
2254        // Create and register the source table with the provided schema and inserted data
2255        let source_table = Arc::new(MemTable::try_new(
2256            schema.clone(),
2257            vec![vec![batch.clone(), batch.clone()]],
2258        )?);
2259        session_ctx.register_table("source", source_table.clone())?;
2260        // Convert the source table into a provider so that it can be used in a query
2261        let source = provider_as_source(source_table);
2262        let target = session_ctx.table_provider("t").await?;
2263        let target = Arc::new(DefaultTableSource::new(target));
2264        // Create a table scan logical plan to read from the source table
2265        let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
2266            .filter(filter_predicate)?
2267            .build()?;
2268        // Since logical plan contains a filter, increasing parallelism is helpful.
2269        // Therefore, we will have 8 partitions in the final plan.
2270        // Create an insert plan to insert the source data into the initial table
2271        let insert_into_table =
2272            LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
2273                .build()?;
2274        // Create a physical plan from the insert plan
2275        let plan = session_ctx
2276            .state()
2277            .create_physical_plan(&insert_into_table)
2278            .await?;
2279        // Execute the physical plan and collect the results
2280        let res = collect(plan, session_ctx.task_ctx()).await?;
2281        // Insert returns the number of rows written, in our case this would be 6.
2282
2283        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
2284            +-------+
2285            | count |
2286            +-------+
2287            | 20    |
2288            +-------+
2289        "###);}
2290
2291        // Read the records in the table
2292        let batches = session_ctx
2293            .sql("select count(*) as count from t")
2294            .await?
2295            .collect()
2296            .await?;
2297
2298        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
2299            +-------+
2300            | count |
2301            +-------+
2302            | 20    |
2303            +-------+
2304        "###);}
2305
2306        // Assert that `target_partition_number` many files were added to the table.
2307        let num_files = tmp_dir.path().read_dir()?.count();
2308        assert_eq!(num_files, expected_n_files_per_insert);
2309
2310        // Create a physical plan from the insert plan
2311        let plan = session_ctx
2312            .state()
2313            .create_physical_plan(&insert_into_table)
2314            .await?;
2315
2316        // Again, execute the physical plan and collect the results
2317        let res = collect(plan, session_ctx.task_ctx()).await?;
2318
2319        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
2320            +-------+
2321            | count |
2322            +-------+
2323            | 20    |
2324            +-------+
2325        "###);}
2326
2327        // Read the contents of the table
2328        let batches = session_ctx
2329            .sql("select count(*) AS count from t")
2330            .await?
2331            .collect()
2332            .await?;
2333
2334        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
2335            +-------+
2336            | count |
2337            +-------+
2338            | 40    |
2339            +-------+
2340        "###);}
2341
2342        // Assert that another `target_partition_number` many files were added to the table.
2343        let num_files = tmp_dir.path().read_dir()?.count();
2344        assert_eq!(num_files, expected_n_files_per_insert * 2);
2345
2346        // Return Ok if the function
2347        Ok(())
2348    }
2349
2350    /// tests insert into with end to end sql
2351    /// create external table + insert into statements
2352    async fn helper_test_insert_into_sql(
2353        file_type: &str,
2354        // TODO test with create statement options such as compression
2355        _file_compression_type: FileCompressionType,
2356        external_table_options: &str,
2357        session_config_map: Option<HashMap<String, String>>,
2358    ) -> Result<()> {
2359        // Create the initial context
2360        let session_ctx = match session_config_map {
2361            Some(cfg) => {
2362                let config = SessionConfig::from_string_hash_map(&cfg)?;
2363                SessionContext::new_with_config(config)
2364            }
2365            None => SessionContext::new(),
2366        };
2367
2368        // create table
2369        let tmp_dir = TempDir::new()?;
2370        let tmp_path = tmp_dir.into_path();
2371        let str_path = tmp_path.to_str().expect("Temp path should convert to &str");
2372        session_ctx
2373            .sql(&format!(
2374                "create external table foo(a varchar, b varchar, c int) \
2375                        stored as {file_type} \
2376                        location '{str_path}' \
2377                        {external_table_options}"
2378            ))
2379            .await?
2380            .collect()
2381            .await?;
2382
2383        // insert data
2384        session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
2385            .await?
2386            .collect()
2387            .await?;
2388
2389        // check count
2390        let batches = session_ctx
2391            .sql("select * from foo")
2392            .await?
2393            .collect()
2394            .await?;
2395
2396        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
2397            +-----+-----+---+
2398            | a   | b   | c |
2399            +-----+-----+---+
2400            | foo | bar | 1 |
2401            | foo | bar | 2 |
2402            | foo | bar | 3 |
2403            +-----+-----+---+
2404        "###);}
2405
2406        Ok(())
2407    }
2408
2409    #[tokio::test]
2410    async fn test_infer_options_compressed_csv() -> Result<()> {
2411        let testdata = crate::test_util::arrow_test_data();
2412        let filename = format!("{}/csv/aggregate_test_100.csv.gz", testdata);
2413        let table_path = ListingTableUrl::parse(filename).unwrap();
2414
2415        let ctx = SessionContext::new();
2416
2417        let config = ListingTableConfig::new(table_path);
2418        let config_with_opts = config.infer_options(&ctx.state()).await?;
2419        let config_with_schema = config_with_opts.infer_schema(&ctx.state()).await?;
2420
2421        let schema = config_with_schema.file_schema.unwrap();
2422
2423        assert_eq!(schema.fields.len(), 13);
2424
2425        Ok(())
2426    }
2427}