datafusion/datasource/listing/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The table implementation.
19
20use std::collections::HashMap;
21use std::{any::Any, str::FromStr, sync::Arc};
22
23use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
24use super::{ListingTableUrl, PartitionedFile};
25
26use crate::datasource::{
27    create_ordering,
28    file_format::{
29        file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
30    },
31    get_statistics_with_limit,
32    physical_plan::FileSinkConfig,
33};
34use crate::execution::context::SessionState;
35use datafusion_catalog::TableProvider;
36use datafusion_common::{config_err, DataFusionError, Result};
37use datafusion_datasource::file_scan_config::FileScanConfig;
38use datafusion_expr::dml::InsertOp;
39use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
40use datafusion_expr::{SortExpr, TableType};
41use datafusion_physical_plan::empty::EmptyExec;
42use datafusion_physical_plan::{ExecutionPlan, Statistics};
43
44use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
45use datafusion_common::{
46    config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
47    SchemaExt, ToDFSchema,
48};
49use datafusion_execution::cache::{
50    cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
51};
52use datafusion_physical_expr::{
53    create_physical_expr, LexOrdering, PhysicalSortRequirement,
54};
55
56use async_trait::async_trait;
57use datafusion_catalog::Session;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use futures::{future, stream, StreamExt, TryStreamExt};
60use itertools::Itertools;
61use object_store::ObjectStore;
62
63/// Configuration for creating a [`ListingTable`]
64#[derive(Debug, Clone)]
65pub struct ListingTableConfig {
66    /// Paths on the `ObjectStore` for creating `ListingTable`.
67    /// They should share the same schema and object store.
68    pub table_paths: Vec<ListingTableUrl>,
69    /// Optional `SchemaRef` for the to be created `ListingTable`.
70    pub file_schema: Option<SchemaRef>,
71    /// Optional `ListingOptions` for the to be created `ListingTable`.
72    pub options: Option<ListingOptions>,
73}
74
75impl ListingTableConfig {
76    /// Creates new [`ListingTableConfig`].
77    ///
78    /// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
79    /// the suffix of the provided `table_paths` first element.
80    pub fn new(table_path: ListingTableUrl) -> Self {
81        let table_paths = vec![table_path];
82        Self {
83            table_paths,
84            file_schema: None,
85            options: None,
86        }
87    }
88
89    /// Creates new [`ListingTableConfig`] with multiple table paths.
90    ///
91    /// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
92    /// the suffix of the provided `table_paths` first element.
93    pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
94        Self {
95            table_paths,
96            file_schema: None,
97            options: None,
98        }
99    }
100    /// Add `schema` to [`ListingTableConfig`]
101    pub fn with_schema(self, schema: SchemaRef) -> Self {
102        Self {
103            table_paths: self.table_paths,
104            file_schema: Some(schema),
105            options: self.options,
106        }
107    }
108
109    /// Add `listing_options` to [`ListingTableConfig`]
110    pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
111        Self {
112            table_paths: self.table_paths,
113            file_schema: self.file_schema,
114            options: Some(listing_options),
115        }
116    }
117
118    ///Returns a tupe of (file_extension, optional compression_extension)
119    ///
120    /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
121    /// For example a path ending with blah.test.csv returns `("csv", None)`
122    fn infer_file_extension_and_compression_type(
123        path: &str,
124    ) -> Result<(String, Option<String>)> {
125        let mut exts = path.rsplit('.');
126
127        let splitted = exts.next().unwrap_or("");
128
129        let file_compression_type = FileCompressionType::from_str(splitted)
130            .unwrap_or(FileCompressionType::UNCOMPRESSED);
131
132        if file_compression_type.is_compressed() {
133            let splitted2 = exts.next().unwrap_or("");
134            Ok((splitted2.to_string(), Some(splitted.to_string())))
135        } else {
136            Ok((splitted.to_string(), None))
137        }
138    }
139
140    /// Infer `ListingOptions` based on `table_path` suffix.
141    pub async fn infer_options(self, state: &dyn Session) -> Result<Self> {
142        let store = if let Some(url) = self.table_paths.first() {
143            state.runtime_env().object_store(url)?
144        } else {
145            return Ok(self);
146        };
147
148        let file = self
149            .table_paths
150            .first()
151            .unwrap()
152            .list_all_files(state, store.as_ref(), "")
153            .await?
154            .next()
155            .await
156            .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
157
158        let (file_extension, maybe_compression_type) =
159            ListingTableConfig::infer_file_extension_and_compression_type(
160                file.location.as_ref(),
161            )?;
162
163        let mut format_options = HashMap::new();
164        if let Some(ref compression_type) = maybe_compression_type {
165            format_options
166                .insert("format.compression".to_string(), compression_type.clone());
167        }
168        let state = state.as_any().downcast_ref::<SessionState>().unwrap();
169        let file_format = state
170            .get_file_format_factory(&file_extension)
171            .ok_or(config_datafusion_err!(
172                "No file_format found with extension {file_extension}"
173            ))?
174            .create(state, &format_options)?;
175
176        let listing_file_extension =
177            if let Some(compression_type) = maybe_compression_type {
178                format!("{}.{}", &file_extension, &compression_type)
179            } else {
180                file_extension
181            };
182
183        let listing_options = ListingOptions::new(file_format)
184            .with_file_extension(listing_file_extension)
185            .with_target_partitions(state.config().target_partitions());
186
187        Ok(Self {
188            table_paths: self.table_paths,
189            file_schema: self.file_schema,
190            options: Some(listing_options),
191        })
192    }
193
194    /// Infer the [`SchemaRef`] based on `table_path` suffix.  Requires `self.options` to be set prior to using.
195    pub async fn infer_schema(self, state: &dyn Session) -> Result<Self> {
196        match self.options {
197            Some(options) => {
198                let schema = if let Some(url) = self.table_paths.first() {
199                    options.infer_schema(state, url).await?
200                } else {
201                    Arc::new(Schema::empty())
202                };
203
204                Ok(Self {
205                    table_paths: self.table_paths,
206                    file_schema: Some(schema),
207                    options: Some(options),
208                })
209            }
210            None => internal_err!("No `ListingOptions` set for inferring schema"),
211        }
212    }
213
214    /// Convenience wrapper for calling `infer_options` and `infer_schema`
215    pub async fn infer(self, state: &dyn Session) -> Result<Self> {
216        self.infer_options(state).await?.infer_schema(state).await
217    }
218
219    /// Infer the partition columns from the path. Requires `self.options` to be set prior to using.
220    pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result<Self> {
221        match self.options {
222            Some(options) => {
223                let Some(url) = self.table_paths.first() else {
224                    return config_err!("No table path found");
225                };
226                let partitions = options
227                    .infer_partitions(state, url)
228                    .await?
229                    .into_iter()
230                    .map(|col_name| {
231                        (
232                            col_name,
233                            DataType::Dictionary(
234                                Box::new(DataType::UInt16),
235                                Box::new(DataType::Utf8),
236                            ),
237                        )
238                    })
239                    .collect::<Vec<_>>();
240                let options = options.with_table_partition_cols(partitions);
241                Ok(Self {
242                    table_paths: self.table_paths,
243                    file_schema: self.file_schema,
244                    options: Some(options),
245                })
246            }
247            None => config_err!("No `ListingOptions` set for inferring schema"),
248        }
249    }
250}
251
252/// Options for creating a [`ListingTable`]
253#[derive(Clone, Debug)]
254pub struct ListingOptions {
255    /// A suffix on which files should be filtered (leave empty to
256    /// keep all files on the path)
257    pub file_extension: String,
258    /// The file format
259    pub format: Arc<dyn FileFormat>,
260    /// The expected partition column names in the folder structure.
261    /// See [Self::with_table_partition_cols] for details
262    pub table_partition_cols: Vec<(String, DataType)>,
263    /// Set true to try to guess statistics from the files.
264    /// This can add a lot of overhead as it will usually require files
265    /// to be opened and at least partially parsed.
266    pub collect_stat: bool,
267    /// Group files to avoid that the number of partitions exceeds
268    /// this limit
269    pub target_partitions: usize,
270    /// Optional pre-known sort order(s). Must be `SortExpr`s.
271    ///
272    /// DataFusion may take advantage of this ordering to omit sorts
273    /// or use more efficient algorithms. Currently sortedness must be
274    /// provided if it is known by some external mechanism, but may in
275    /// the future be automatically determined, for example using
276    /// parquet metadata.
277    ///
278    /// See <https://github.com/apache/datafusion/issues/4177>
279    /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
280    ///       where each ordering consists of an individual lexicographic
281    ///       ordering (encapsulated by a `Vec<Expr>`). If there aren't
282    ///       multiple equivalent orderings, the outer `Vec` will have a
283    ///       single element.
284    pub file_sort_order: Vec<Vec<SortExpr>>,
285}
286
287impl ListingOptions {
288    /// Creates an options instance with the given format
289    /// Default values:
290    /// - use default file extension filter
291    /// - no input partition to discover
292    /// - one target partition
293    /// - stat collection
294    pub fn new(format: Arc<dyn FileFormat>) -> Self {
295        Self {
296            file_extension: format.get_ext(),
297            format,
298            table_partition_cols: vec![],
299            collect_stat: true,
300            target_partitions: 1,
301            file_sort_order: vec![],
302        }
303    }
304
305    /// Set file extension on [`ListingOptions`] and returns self.
306    ///
307    /// # Example
308    /// ```
309    /// # use std::sync::Arc;
310    /// # use datafusion::prelude::SessionContext;
311    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
312    ///
313    /// let listing_options = ListingOptions::new(Arc::new(
314    ///     ParquetFormat::default()
315    ///   ))
316    ///   .with_file_extension(".parquet");
317    ///
318    /// assert_eq!(listing_options.file_extension, ".parquet");
319    /// ```
320    pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
321        self.file_extension = file_extension.into();
322        self
323    }
324
325    /// Optionally set file extension on [`ListingOptions`] and returns self.
326    ///
327    /// If `file_extension` is `None`, the file extension will not be changed
328    ///
329    /// # Example
330    /// ```
331    /// # use std::sync::Arc;
332    /// # use datafusion::prelude::SessionContext;
333    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
334    /// let extension = Some(".parquet");
335    /// let listing_options = ListingOptions::new(Arc::new(
336    ///     ParquetFormat::default()
337    ///   ))
338    ///   .with_file_extension_opt(extension);
339    ///
340    /// assert_eq!(listing_options.file_extension, ".parquet");
341    /// ```
342    pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
343    where
344        S: Into<String>,
345    {
346        if let Some(file_extension) = file_extension {
347            self.file_extension = file_extension.into();
348        }
349        self
350    }
351
352    /// Set `table partition columns` on [`ListingOptions`] and returns self.
353    ///
354    /// "partition columns," used to support [Hive Partitioning], are
355    /// columns added to the data that is read, based on the folder
356    /// structure where the data resides.
357    ///
358    /// For example, give the following files in your filesystem:
359    ///
360    /// ```text
361    /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet
362    /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet
363    /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet
364    /// ```
365    ///
366    /// A [`ListingTable`] created at `/mnt/nyctaxi/` with partition
367    /// columns "year" and "month" will include new `year` and `month`
368    /// columns while reading the files. The `year` column would have
369    /// value `2022` and the `month` column would have value `01` for
370    /// the rows read from
371    /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet`
372    ///
373    ///# Notes
374    ///
375    /// - If only one level (e.g. `year` in the example above) is
376    ///   specified, the other levels are ignored but the files are
377    ///   still read.
378    ///
379    /// - Files that don't follow this partitioning scheme will be
380    ///   ignored.
381    ///
382    /// - Since the columns have the same value for all rows read from
383    ///   each individual file (such as dates), they are typically
384    ///   dictionary encoded for efficiency. You may use
385    ///   [`wrap_partition_type_in_dict`] to request a
386    ///   dictionary-encoded type.
387    ///
388    /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
389    ///
390    /// # Example
391    ///
392    /// ```
393    /// # use std::sync::Arc;
394    /// # use arrow::datatypes::DataType;
395    /// # use datafusion::prelude::col;
396    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
397    ///
398    /// // listing options for files with paths such as  `/mnt/data/col_a=x/col_b=y/data.parquet`
399    /// // `col_a` and `col_b` will be included in the data read from those files
400    /// let listing_options = ListingOptions::new(Arc::new(
401    ///     ParquetFormat::default()
402    ///   ))
403    ///   .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
404    ///       ("col_b".to_string(), DataType::Utf8)]);
405    ///
406    /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
407    ///     ("col_b".to_string(), DataType::Utf8)]);
408    /// ```
409    ///
410    /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
411    /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
412    pub fn with_table_partition_cols(
413        mut self,
414        table_partition_cols: Vec<(String, DataType)>,
415    ) -> Self {
416        self.table_partition_cols = table_partition_cols;
417        self
418    }
419
420    /// Set stat collection on [`ListingOptions`] and returns self.
421    ///
422    /// ```
423    /// # use std::sync::Arc;
424    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
425    ///
426    /// let listing_options = ListingOptions::new(Arc::new(
427    ///     ParquetFormat::default()
428    ///   ))
429    ///   .with_collect_stat(true);
430    ///
431    /// assert_eq!(listing_options.collect_stat, true);
432    /// ```
433    pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
434        self.collect_stat = collect_stat;
435        self
436    }
437
438    /// Set number of target partitions on [`ListingOptions`] and returns self.
439    ///
440    /// ```
441    /// # use std::sync::Arc;
442    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
443    ///
444    /// let listing_options = ListingOptions::new(Arc::new(
445    ///     ParquetFormat::default()
446    ///   ))
447    ///   .with_target_partitions(8);
448    ///
449    /// assert_eq!(listing_options.target_partitions, 8);
450    /// ```
451    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
452        self.target_partitions = target_partitions;
453        self
454    }
455
456    /// Set file sort order on [`ListingOptions`] and returns self.
457    ///
458    /// ```
459    /// # use std::sync::Arc;
460    /// # use datafusion::prelude::col;
461    /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
462    ///
463    ///  // Tell datafusion that the files are sorted by column "a"
464    ///  let file_sort_order = vec![vec![
465    ///    col("a").sort(true, true)
466    ///  ]];
467    ///
468    /// let listing_options = ListingOptions::new(Arc::new(
469    ///     ParquetFormat::default()
470    ///   ))
471    ///   .with_file_sort_order(file_sort_order.clone());
472    ///
473    /// assert_eq!(listing_options.file_sort_order, file_sort_order);
474    /// ```
475    pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
476        self.file_sort_order = file_sort_order;
477        self
478    }
479
480    /// Infer the schema of the files at the given path on the provided object store.
481    /// The inferred schema does not include the partitioning columns.
482    ///
483    /// This method will not be called by the table itself but before creating it.
484    /// This way when creating the logical plan we can decide to resolve the schema
485    /// locally or ask a remote service to do it (e.g a scheduler).
486    pub async fn infer_schema<'a>(
487        &'a self,
488        state: &dyn Session,
489        table_path: &'a ListingTableUrl,
490    ) -> Result<SchemaRef> {
491        let store = state.runtime_env().object_store(table_path)?;
492
493        let files: Vec<_> = table_path
494            .list_all_files(state, store.as_ref(), &self.file_extension)
495            .await?
496            // Empty files cannot affect schema but may throw when trying to read for it
497            .try_filter(|object_meta| future::ready(object_meta.size > 0))
498            .try_collect()
499            .await?;
500
501        let schema = self.format.infer_schema(state, &store, &files).await?;
502
503        Ok(schema)
504    }
505
506    /// Infers the partition columns stored in `LOCATION` and compares
507    /// them with the columns provided in `PARTITIONED BY` to help prevent
508    /// accidental corrupts of partitioned tables.
509    ///
510    /// Allows specifying partial partitions.
511    pub async fn validate_partitions(
512        &self,
513        state: &dyn Session,
514        table_path: &ListingTableUrl,
515    ) -> Result<()> {
516        if self.table_partition_cols.is_empty() {
517            return Ok(());
518        }
519
520        if !table_path.is_collection() {
521            return plan_err!(
522                "Can't create a partitioned table backed by a single file, \
523                perhaps the URL is missing a trailing slash?"
524            );
525        }
526
527        let inferred = self.infer_partitions(state, table_path).await?;
528
529        // no partitioned files found on disk
530        if inferred.is_empty() {
531            return Ok(());
532        }
533
534        let table_partition_names = self
535            .table_partition_cols
536            .iter()
537            .map(|(col_name, _)| col_name.clone())
538            .collect_vec();
539
540        if inferred.len() < table_partition_names.len() {
541            return plan_err!(
542                "Inferred partitions to be {:?}, but got {:?}",
543                inferred,
544                table_partition_names
545            );
546        }
547
548        // match prefix to allow creating tables with partial partitions
549        for (idx, col) in table_partition_names.iter().enumerate() {
550            if &inferred[idx] != col {
551                return plan_err!(
552                    "Inferred partitions to be {:?}, but got {:?}",
553                    inferred,
554                    table_partition_names
555                );
556            }
557        }
558
559        Ok(())
560    }
561
562    /// Infer the partitioning at the given path on the provided object store.
563    /// For performance reasons, it doesn't read all the files on disk
564    /// and therefore may fail to detect invalid partitioning.
565    pub(crate) async fn infer_partitions(
566        &self,
567        state: &dyn Session,
568        table_path: &ListingTableUrl,
569    ) -> Result<Vec<String>> {
570        let store = state.runtime_env().object_store(table_path)?;
571
572        // only use 10 files for inference
573        // This can fail to detect inconsistent partition keys
574        // A DFS traversal approach of the store can help here
575        let files: Vec<_> = table_path
576            .list_all_files(state, store.as_ref(), &self.file_extension)
577            .await?
578            .take(10)
579            .try_collect()
580            .await?;
581
582        let stripped_path_parts = files.iter().map(|file| {
583            table_path
584                .strip_prefix(&file.location)
585                .unwrap()
586                .collect_vec()
587        });
588
589        let partition_keys = stripped_path_parts
590            .map(|path_parts| {
591                path_parts
592                    .into_iter()
593                    .rev()
594                    .skip(1) // get parents only; skip the file itself
595                    .rev()
596                    .map(|s| s.split('=').take(1).collect())
597                    .collect_vec()
598            })
599            .collect_vec();
600
601        match partition_keys.into_iter().all_equal_value() {
602            Ok(v) => Ok(v),
603            Err(None) => Ok(vec![]),
604            Err(Some(diff)) => {
605                let mut sorted_diff = [diff.0, diff.1];
606                sorted_diff.sort();
607                plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
608            }
609        }
610    }
611}
612
613/// Reads data from one or more files as a single table.
614///
615/// Implements [`TableProvider`], a DataFusion data source. The files are read
616/// using an  [`ObjectStore`] instance, for example from local files or objects
617/// from AWS S3.
618///
619/// # Reading Directories
620/// For example, given the `table1` directory (or object store prefix)
621///
622/// ```text
623/// table1
624///  ├── file1.parquet
625///  └── file2.parquet
626/// ```
627///
628/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
629/// a single table, merging the schemas if the files have compatible but not
630/// identical schemas.
631///
632/// Given the `table2` directory (or object store prefix)
633///
634/// ```text
635/// table2
636///  ├── date=2024-06-01
637///  │    ├── file3.parquet
638///  │    └── file4.parquet
639///  └── date=2024-06-02
640///       └── file5.parquet
641/// ```
642///
643/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
644/// `file5.parquet` as a single table, again merging schemas if necessary.
645///
646/// Given the hive style partitioning structure (e.g,. directories named
647/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
648/// column when reading the table:
649/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
650/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
651///
652/// If the query has a predicate like `WHERE date = '2024-06-01'`
653/// only the corresponding directory will be read.
654///
655/// `ListingTable` also supports limit, filter and projection pushdown for formats that
656/// support it as such as Parquet.
657///
658/// # Implementation
659///
660/// `ListingTable` Uses [`DataSourceExec`] to execute the data. See that struct
661/// for more details.
662///
663/// [`DataSourceExec`]: crate::datasource::source::DataSourceExec
664///
665/// # Example
666///
667/// To read a directory of parquet files using a [`ListingTable`]:
668///
669/// ```no_run
670/// # use datafusion::prelude::SessionContext;
671/// # use datafusion::error::Result;
672/// # use std::sync::Arc;
673/// # use datafusion::datasource::{
674/// #   listing::{
675/// #      ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
676/// #   },
677/// #   file_format::parquet::ParquetFormat,
678/// # };
679/// # #[tokio::main]
680/// # async fn main() -> Result<()> {
681/// let ctx = SessionContext::new();
682/// let session_state = ctx.state();
683/// let table_path = "/path/to/parquet";
684///
685/// // Parse the path
686/// let table_path = ListingTableUrl::parse(table_path)?;
687///
688/// // Create default parquet options
689/// let file_format = ParquetFormat::new();
690/// let listing_options = ListingOptions::new(Arc::new(file_format))
691///   .with_file_extension(".parquet");
692///
693/// // Resolve the schema
694/// let resolved_schema = listing_options
695///    .infer_schema(&session_state, &table_path)
696///    .await?;
697///
698/// let config = ListingTableConfig::new(table_path)
699///   .with_listing_options(listing_options)
700///   .with_schema(resolved_schema);
701///
702/// // Create a new TableProvider
703/// let provider = Arc::new(ListingTable::try_new(config)?);
704///
705/// // This provider can now be read as a dataframe:
706/// let df = ctx.read_table(provider.clone());
707///
708/// // or registered as a named table:
709/// ctx.register_table("my_table", provider);
710///
711/// # Ok(())
712/// # }
713/// ```
714#[derive(Debug)]
715pub struct ListingTable {
716    table_paths: Vec<ListingTableUrl>,
717    /// File fields only
718    file_schema: SchemaRef,
719    /// File fields + partition columns
720    table_schema: SchemaRef,
721    options: ListingOptions,
722    definition: Option<String>,
723    collected_statistics: FileStatisticsCache,
724    constraints: Constraints,
725    column_defaults: HashMap<String, Expr>,
726}
727
728impl ListingTable {
729    /// Create new [`ListingTable`] that lists the FS to get the files
730    /// to scan. See [`ListingTable`] for and example.
731    ///
732    /// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`.
733    /// `ListingOptions` and `SchemaRef` are optional.  If they are not
734    /// provided the file type is inferred based on the file suffix.
735    /// If the schema is provided then it must be resolved before creating the table
736    /// and should contain the fields of the file without the table
737    /// partitioning columns.
738    ///
739    pub fn try_new(config: ListingTableConfig) -> Result<Self> {
740        let file_schema = config
741            .file_schema
742            .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
743
744        let options = config.options.ok_or_else(|| {
745            DataFusionError::Internal("No ListingOptions provided".into())
746        })?;
747
748        // Add the partition columns to the file schema
749        let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
750        for (part_col_name, part_col_type) in &options.table_partition_cols {
751            builder.push(Field::new(part_col_name, part_col_type.clone(), false));
752        }
753
754        let table_schema = Arc::new(
755            builder
756                .finish()
757                .with_metadata(file_schema.metadata().clone()),
758        );
759
760        let table = Self {
761            table_paths: config.table_paths,
762            file_schema,
763            table_schema,
764            options,
765            definition: None,
766            collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
767            constraints: Constraints::empty(),
768            column_defaults: HashMap::new(),
769        };
770
771        Ok(table)
772    }
773
774    /// Assign constraints
775    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
776        self.constraints = constraints;
777        self
778    }
779
780    /// Assign column defaults
781    pub fn with_column_defaults(
782        mut self,
783        column_defaults: HashMap<String, Expr>,
784    ) -> Self {
785        self.column_defaults = column_defaults;
786        self
787    }
788
789    /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
790    ///
791    /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
792    /// multiple times in the same session.
793    ///
794    /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
795    pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
796        self.collected_statistics =
797            cache.unwrap_or(Arc::new(DefaultFileStatisticsCache::default()));
798        self
799    }
800
801    /// Specify the SQL definition for this table, if any
802    pub fn with_definition(mut self, definition: Option<String>) -> Self {
803        self.definition = definition;
804        self
805    }
806
807    /// Get paths ref
808    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
809        &self.table_paths
810    }
811
812    /// Get options ref
813    pub fn options(&self) -> &ListingOptions {
814        &self.options
815    }
816
817    /// If file_sort_order is specified, creates the appropriate physical expressions
818    fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
819        create_ordering(&self.table_schema, &self.options.file_sort_order)
820    }
821}
822
823// Expressions can be used for parttion pruning if they can be evaluated using
824// only the partiton columns and there are partition columns.
825fn can_be_evaluted_for_partition_pruning(
826    partition_column_names: &[&str],
827    expr: &Expr,
828) -> bool {
829    !partition_column_names.is_empty()
830        && expr_applicable_for_cols(partition_column_names, expr)
831}
832
833#[async_trait]
834impl TableProvider for ListingTable {
835    fn as_any(&self) -> &dyn Any {
836        self
837    }
838
839    fn schema(&self) -> SchemaRef {
840        Arc::clone(&self.table_schema)
841    }
842
843    fn constraints(&self) -> Option<&Constraints> {
844        Some(&self.constraints)
845    }
846
847    fn table_type(&self) -> TableType {
848        TableType::Base
849    }
850
851    async fn scan(
852        &self,
853        state: &dyn Session,
854        projection: Option<&Vec<usize>>,
855        filters: &[Expr],
856        limit: Option<usize>,
857    ) -> Result<Arc<dyn ExecutionPlan>> {
858        // extract types of partition columns
859        let table_partition_cols = self
860            .options
861            .table_partition_cols
862            .iter()
863            .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
864            .collect::<Result<Vec<_>>>()?;
865
866        let table_partition_col_names = table_partition_cols
867            .iter()
868            .map(|field| field.name().as_str())
869            .collect::<Vec<_>>();
870        // If the filters can be resolved using only partition cols, there is no need to
871        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
872        let (partition_filters, filters): (Vec<_>, Vec<_>) =
873            filters.iter().cloned().partition(|filter| {
874                can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
875            });
876        // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
877        let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
878
879        // We should not limit the number of partitioned files to scan if there are filters and limit
880        // at the same time. This is because the limit should be applied after the filters are applied.
881        let statistic_file_limit = if filters.is_empty() { limit } else { None };
882
883        let (mut partitioned_file_lists, statistics) = self
884            .list_files_for_scan(session_state, &partition_filters, statistic_file_limit)
885            .await?;
886
887        // if no files need to be read, return an `EmptyExec`
888        if partitioned_file_lists.is_empty() {
889            let projected_schema = project_schema(&self.schema(), projection)?;
890            return Ok(Arc::new(EmptyExec::new(projected_schema)));
891        }
892
893        let output_ordering = self.try_create_output_ordering()?;
894        match state
895            .config_options()
896            .execution
897            .split_file_groups_by_statistics
898            .then(|| {
899                output_ordering.first().map(|output_ordering| {
900                    FileScanConfig::split_groups_by_statistics(
901                        &self.table_schema,
902                        &partitioned_file_lists,
903                        output_ordering,
904                    )
905                })
906            })
907            .flatten()
908        {
909            Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
910            Some(Ok(new_groups)) => {
911                if new_groups.len() <= self.options.target_partitions {
912                    partitioned_file_lists = new_groups;
913                } else {
914                    log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
915                }
916            }
917            None => {} // no ordering required
918        };
919
920        let filters = match conjunction(filters.to_vec()) {
921            Some(expr) => {
922                let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
923                let filters = create_physical_expr(
924                    &expr,
925                    &table_df_schema,
926                    state.execution_props(),
927                )?;
928                Some(filters)
929            }
930            None => None,
931        };
932
933        let Some(object_store_url) =
934            self.table_paths.first().map(ListingTableUrl::object_store)
935        else {
936            return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
937        };
938
939        // create the execution plan
940        self.options
941            .format
942            .create_physical_plan(
943                session_state,
944                FileScanConfig::new(
945                    object_store_url,
946                    Arc::clone(&self.file_schema),
947                    self.options.format.file_source(),
948                )
949                .with_file_groups(partitioned_file_lists)
950                .with_constraints(self.constraints.clone())
951                .with_statistics(statistics)
952                .with_projection(projection.cloned())
953                .with_limit(limit)
954                .with_output_ordering(output_ordering)
955                .with_table_partition_cols(table_partition_cols),
956                filters.as_ref(),
957            )
958            .await
959    }
960
961    fn supports_filters_pushdown(
962        &self,
963        filters: &[&Expr],
964    ) -> Result<Vec<TableProviderFilterPushDown>> {
965        let partition_column_names = self
966            .options
967            .table_partition_cols
968            .iter()
969            .map(|col| col.0.as_str())
970            .collect::<Vec<_>>();
971        filters
972            .iter()
973            .map(|filter| {
974                if can_be_evaluted_for_partition_pruning(&partition_column_names, filter)
975                {
976                    // if filter can be handled by partition pruning, it is exact
977                    return Ok(TableProviderFilterPushDown::Exact);
978                }
979
980                // if we can't push it down completely with only the filename-based/path-based
981                // column names, then we should check if we can do parquet predicate pushdown
982                let supports_pushdown = self.options.format.supports_filters_pushdown(
983                    &self.file_schema,
984                    &self.table_schema,
985                    &[filter],
986                )?;
987
988                if supports_pushdown == FilePushdownSupport::Supported {
989                    return Ok(TableProviderFilterPushDown::Exact);
990                }
991
992                Ok(TableProviderFilterPushDown::Inexact)
993            })
994            .collect()
995    }
996
997    fn get_table_definition(&self) -> Option<&str> {
998        self.definition.as_deref()
999    }
1000
1001    async fn insert_into(
1002        &self,
1003        state: &dyn Session,
1004        input: Arc<dyn ExecutionPlan>,
1005        insert_op: InsertOp,
1006    ) -> Result<Arc<dyn ExecutionPlan>> {
1007        // Check that the schema of the plan matches the schema of this table.
1008        self.schema()
1009            .logically_equivalent_names_and_types(&input.schema())?;
1010
1011        let table_path = &self.table_paths()[0];
1012        if !table_path.is_collection() {
1013            return plan_err!(
1014                "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
1015                To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
1016            );
1017        }
1018
1019        // Get the object store for the table path.
1020        let store = state.runtime_env().object_store(table_path)?;
1021
1022        // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
1023        let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
1024        let file_list_stream = pruned_partition_list(
1025            session_state,
1026            store.as_ref(),
1027            table_path,
1028            &[],
1029            &self.options.file_extension,
1030            &self.options.table_partition_cols,
1031        )
1032        .await?;
1033
1034        let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
1035        let keep_partition_by_columns =
1036            state.config_options().execution.keep_partition_by_columns;
1037
1038        // Sink related option, apart from format
1039        let config = FileSinkConfig {
1040            object_store_url: self.table_paths()[0].object_store(),
1041            table_paths: self.table_paths().clone(),
1042            file_groups,
1043            output_schema: self.schema(),
1044            table_partition_cols: self.options.table_partition_cols.clone(),
1045            insert_op,
1046            keep_partition_by_columns,
1047            file_extension: self.options().format.get_ext(),
1048        };
1049
1050        let order_requirements = if !self.options().file_sort_order.is_empty() {
1051            // Multiple sort orders in outer vec are equivalent, so we pass only the first one
1052            let orderings = self.try_create_output_ordering()?;
1053            let Some(ordering) = orderings.first() else {
1054                return internal_err!(
1055                    "Expected ListingTable to have a sort order, but none found!"
1056                );
1057            };
1058            // Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
1059            Some(LexRequirement::new(
1060                ordering
1061                    .into_iter()
1062                    .cloned()
1063                    .map(PhysicalSortRequirement::from)
1064                    .collect::<Vec<_>>(),
1065            ))
1066        } else {
1067            None
1068        };
1069
1070        self.options()
1071            .format
1072            .create_writer_physical_plan(input, session_state, config, order_requirements)
1073            .await
1074    }
1075
1076    fn get_column_default(&self, column: &str) -> Option<&Expr> {
1077        self.column_defaults.get(column)
1078    }
1079}
1080
1081impl ListingTable {
1082    /// Get the list of files for a scan as well as the file level statistics.
1083    /// The list is grouped to let the execution plan know how the files should
1084    /// be distributed to different threads / executors.
1085    async fn list_files_for_scan<'a>(
1086        &'a self,
1087        ctx: &'a dyn Session,
1088        filters: &'a [Expr],
1089        limit: Option<usize>,
1090    ) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
1091        let store = if let Some(url) = self.table_paths.first() {
1092            ctx.runtime_env().object_store(url)?
1093        } else {
1094            return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
1095        };
1096        // list files (with partitions)
1097        let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
1098            pruned_partition_list(
1099                ctx,
1100                store.as_ref(),
1101                table_path,
1102                filters,
1103                &self.options.file_extension,
1104                &self.options.table_partition_cols,
1105            )
1106        }))
1107        .await?;
1108        let file_list = stream::iter(file_list).flatten();
1109        // collect the statistics if required by the config
1110        let files = file_list
1111            .map(|part_file| async {
1112                let part_file = part_file?;
1113                if self.options.collect_stat {
1114                    let statistics =
1115                        self.do_collect_statistics(ctx, &store, &part_file).await?;
1116                    Ok((part_file, statistics))
1117                } else {
1118                    Ok((
1119                        part_file,
1120                        Arc::new(Statistics::new_unknown(&self.file_schema)),
1121                    ))
1122                }
1123            })
1124            .boxed()
1125            .buffered(ctx.config_options().execution.meta_fetch_concurrency);
1126
1127        let (files, statistics) = get_statistics_with_limit(
1128            files,
1129            self.schema(),
1130            limit,
1131            self.options.collect_stat,
1132        )
1133        .await?;
1134
1135        Ok((
1136            split_files(files, self.options.target_partitions),
1137            statistics,
1138        ))
1139    }
1140
1141    /// Collects statistics for a given partitioned file.
1142    ///
1143    /// This method first checks if the statistics for the given file are already cached.
1144    /// If they are, it returns the cached statistics.
1145    /// If they are not, it infers the statistics from the file and stores them in the cache.
1146    async fn do_collect_statistics(
1147        &self,
1148        ctx: &dyn Session,
1149        store: &Arc<dyn ObjectStore>,
1150        part_file: &PartitionedFile,
1151    ) -> Result<Arc<Statistics>> {
1152        match self
1153            .collected_statistics
1154            .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
1155        {
1156            Some(statistics) => Ok(statistics),
1157            None => {
1158                let statistics = self
1159                    .options
1160                    .format
1161                    .infer_stats(
1162                        ctx,
1163                        store,
1164                        Arc::clone(&self.file_schema),
1165                        &part_file.object_meta,
1166                    )
1167                    .await?;
1168                let statistics = Arc::new(statistics);
1169                self.collected_statistics.put_with_extra(
1170                    &part_file.object_meta.location,
1171                    Arc::clone(&statistics),
1172                    &part_file.object_meta,
1173                );
1174                Ok(statistics)
1175            }
1176        }
1177    }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182    use super::*;
1183    use crate::datasource::file_format::avro::AvroFormat;
1184    use crate::datasource::file_format::csv::CsvFormat;
1185    use crate::datasource::file_format::json::JsonFormat;
1186    #[cfg(feature = "parquet")]
1187    use crate::datasource::file_format::parquet::ParquetFormat;
1188    use crate::datasource::{provider_as_source, DefaultTableSource, MemTable};
1189    use crate::execution::options::ArrowReadOptions;
1190    use crate::prelude::*;
1191    use crate::{
1192        assert_batches_eq,
1193        test::{columns, object_store::register_test_store},
1194    };
1195    use datafusion_physical_plan::collect;
1196
1197    use arrow::compute::SortOptions;
1198    use arrow::record_batch::RecordBatch;
1199    use datafusion_common::stats::Precision;
1200    use datafusion_common::{assert_contains, ScalarValue};
1201    use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
1202    use datafusion_physical_expr::PhysicalSortExpr;
1203    use datafusion_physical_plan::ExecutionPlanProperties;
1204
1205    use tempfile::TempDir;
1206
1207    #[tokio::test]
1208    async fn read_single_file() -> Result<()> {
1209        let ctx = SessionContext::new();
1210
1211        let table = load_table(&ctx, "alltypes_plain.parquet").await?;
1212        let projection = None;
1213        let exec = table
1214            .scan(&ctx.state(), projection, &[], None)
1215            .await
1216            .expect("Scan table");
1217
1218        assert_eq!(exec.children().len(), 0);
1219        assert_eq!(exec.output_partitioning().partition_count(), 1);
1220
1221        // test metadata
1222        assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1223        assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1224
1225        Ok(())
1226    }
1227
1228    #[cfg(feature = "parquet")]
1229    #[tokio::test]
1230    async fn load_table_stats_by_default() -> Result<()> {
1231        use crate::datasource::file_format::parquet::ParquetFormat;
1232
1233        let testdata = crate::test_util::parquet_test_data();
1234        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1235        let table_path = ListingTableUrl::parse(filename).unwrap();
1236
1237        let ctx = SessionContext::new();
1238        let state = ctx.state();
1239
1240        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
1241        let schema = opt.infer_schema(&state, &table_path).await?;
1242        let config = ListingTableConfig::new(table_path)
1243            .with_listing_options(opt)
1244            .with_schema(schema);
1245        let table = ListingTable::try_new(config)?;
1246
1247        let exec = table.scan(&state, None, &[], None).await?;
1248        assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1249        // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
1250        assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1251
1252        Ok(())
1253    }
1254
1255    #[cfg(feature = "parquet")]
1256    #[tokio::test]
1257    async fn load_table_stats_when_no_stats() -> Result<()> {
1258        use crate::datasource::file_format::parquet::ParquetFormat;
1259
1260        let testdata = crate::test_util::parquet_test_data();
1261        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1262        let table_path = ListingTableUrl::parse(filename).unwrap();
1263
1264        let ctx = SessionContext::new();
1265        let state = ctx.state();
1266
1267        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
1268            .with_collect_stat(false);
1269        let schema = opt.infer_schema(&state, &table_path).await?;
1270        let config = ListingTableConfig::new(table_path)
1271            .with_listing_options(opt)
1272            .with_schema(schema);
1273        let table = ListingTable::try_new(config)?;
1274
1275        let exec = table.scan(&state, None, &[], None).await?;
1276        assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
1277        assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
1278
1279        Ok(())
1280    }
1281
1282    #[cfg(feature = "parquet")]
1283    #[tokio::test]
1284    async fn test_try_create_output_ordering() {
1285        let testdata = crate::test_util::parquet_test_data();
1286        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1287        let table_path = ListingTableUrl::parse(filename).unwrap();
1288
1289        let ctx = SessionContext::new();
1290        let state = ctx.state();
1291        let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
1292        let schema = options.infer_schema(&state, &table_path).await.unwrap();
1293
1294        use crate::datasource::file_format::parquet::ParquetFormat;
1295        use datafusion_physical_plan::expressions::col as physical_col;
1296        use std::ops::Add;
1297
1298        // (file_sort_order, expected_result)
1299        let cases = vec![
1300            (vec![], Ok(vec![])),
1301            // sort expr, but non column
1302            (
1303                vec![vec![
1304                    col("int_col").add(lit(1)).sort(true, true),
1305                ]],
1306                Err("Expected single column references in output_ordering, got int_col + Int32(1)"),
1307            ),
1308            // ok with one column
1309            (
1310                vec![vec![col("string_col").sort(true, false)]],
1311                Ok(vec![LexOrdering::new(
1312                        vec![PhysicalSortExpr {
1313                            expr: physical_col("string_col", &schema).unwrap(),
1314                            options: SortOptions {
1315                                descending: false,
1316                                nulls_first: false,
1317                            },
1318                        }],
1319                )
1320                ])
1321            ),
1322            // ok with two columns, different options
1323            (
1324                vec![vec![
1325                    col("string_col").sort(true, false),
1326                    col("int_col").sort(false, true),
1327                ]],
1328                Ok(vec![LexOrdering::new(
1329                        vec![
1330                            PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
1331                                        .asc()
1332                                        .nulls_last(),
1333                            PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
1334                                        .desc()
1335                                        .nulls_first()
1336                        ],
1337                )
1338                ])
1339            ),
1340        ];
1341
1342        for (file_sort_order, expected_result) in cases {
1343            let options = options.clone().with_file_sort_order(file_sort_order);
1344
1345            let config = ListingTableConfig::new(table_path.clone())
1346                .with_listing_options(options)
1347                .with_schema(schema.clone());
1348
1349            let table =
1350                ListingTable::try_new(config.clone()).expect("Creating the table");
1351            let ordering_result = table.try_create_output_ordering();
1352
1353            match (expected_result, ordering_result) {
1354                (Ok(expected), Ok(result)) => {
1355                    assert_eq!(expected, result);
1356                }
1357                (Err(expected), Err(result)) => {
1358                    // can't compare the DataFusionError directly
1359                    let result = result.to_string();
1360                    let expected = expected.to_string();
1361                    assert_contains!(result.to_string(), expected);
1362                }
1363                (expected_result, ordering_result) => {
1364                    panic!(
1365                        "expected: {expected_result:#?}\n\nactual:{ordering_result:#?}"
1366                    );
1367                }
1368            }
1369        }
1370    }
1371
1372    #[tokio::test]
1373    async fn read_empty_table() -> Result<()> {
1374        let ctx = SessionContext::new();
1375        let path = String::from("table/p1=v1/file.avro");
1376        register_test_store(&ctx, &[(&path, 100)]);
1377
1378        let opt = ListingOptions::new(Arc::new(AvroFormat {}))
1379            .with_file_extension(AvroFormat.get_ext())
1380            .with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
1381            .with_target_partitions(4);
1382
1383        let table_path = ListingTableUrl::parse("test:///table/").unwrap();
1384        let file_schema =
1385            Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
1386        let config = ListingTableConfig::new(table_path)
1387            .with_listing_options(opt)
1388            .with_schema(file_schema);
1389        let table = ListingTable::try_new(config)?;
1390
1391        assert_eq!(
1392            columns(&table.schema()),
1393            vec!["a".to_owned(), "p1".to_owned()]
1394        );
1395
1396        // this will filter out the only file in the store
1397        let filter = Expr::not_eq(col("p1"), lit("v1"));
1398
1399        let scan = table
1400            .scan(&ctx.state(), None, &[filter], None)
1401            .await
1402            .expect("Empty execution plan");
1403
1404        assert!(scan.as_any().is::<EmptyExec>());
1405        assert_eq!(
1406            columns(&scan.schema()),
1407            vec!["a".to_owned(), "p1".to_owned()]
1408        );
1409
1410        Ok(())
1411    }
1412
1413    #[tokio::test]
1414    async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
1415        // more expected partitions than files
1416        assert_list_files_for_scan_grouping(
1417            &[
1418                "bucket/key-prefix/file0",
1419                "bucket/key-prefix/file1",
1420                "bucket/key-prefix/file2",
1421                "bucket/key-prefix/file3",
1422                "bucket/key-prefix/file4",
1423            ],
1424            "test:///bucket/key-prefix/",
1425            12,
1426            5,
1427            Some(""),
1428        )
1429        .await?;
1430
1431        // as many expected partitions as files
1432        assert_list_files_for_scan_grouping(
1433            &[
1434                "bucket/key-prefix/file0",
1435                "bucket/key-prefix/file1",
1436                "bucket/key-prefix/file2",
1437                "bucket/key-prefix/file3",
1438            ],
1439            "test:///bucket/key-prefix/",
1440            4,
1441            4,
1442            Some(""),
1443        )
1444        .await?;
1445
1446        // more files as expected partitions
1447        assert_list_files_for_scan_grouping(
1448            &[
1449                "bucket/key-prefix/file0",
1450                "bucket/key-prefix/file1",
1451                "bucket/key-prefix/file2",
1452                "bucket/key-prefix/file3",
1453                "bucket/key-prefix/file4",
1454            ],
1455            "test:///bucket/key-prefix/",
1456            2,
1457            2,
1458            Some(""),
1459        )
1460        .await?;
1461
1462        // no files => no groups
1463        assert_list_files_for_scan_grouping(
1464            &[],
1465            "test:///bucket/key-prefix/",
1466            2,
1467            0,
1468            Some(""),
1469        )
1470        .await?;
1471
1472        // files that don't match the prefix
1473        assert_list_files_for_scan_grouping(
1474            &[
1475                "bucket/key-prefix/file0",
1476                "bucket/key-prefix/file1",
1477                "bucket/other-prefix/roguefile",
1478            ],
1479            "test:///bucket/key-prefix/",
1480            10,
1481            2,
1482            Some(""),
1483        )
1484        .await?;
1485
1486        // files that don't match the prefix or the default file extention
1487        assert_list_files_for_scan_grouping(
1488            &[
1489                "bucket/key-prefix/file0.avro",
1490                "bucket/key-prefix/file1.parquet",
1491                "bucket/other-prefix/roguefile.avro",
1492            ],
1493            "test:///bucket/key-prefix/",
1494            10,
1495            1,
1496            None,
1497        )
1498        .await?;
1499        Ok(())
1500    }
1501
1502    #[tokio::test]
1503    async fn test_assert_list_files_for_multi_path() -> Result<()> {
1504        // more expected partitions than files
1505        assert_list_files_for_multi_paths(
1506            &[
1507                "bucket/key1/file0",
1508                "bucket/key1/file1",
1509                "bucket/key1/file2",
1510                "bucket/key2/file3",
1511                "bucket/key2/file4",
1512                "bucket/key3/file5",
1513            ],
1514            &["test:///bucket/key1/", "test:///bucket/key2/"],
1515            12,
1516            5,
1517            Some(""),
1518        )
1519        .await?;
1520
1521        // as many expected partitions as files
1522        assert_list_files_for_multi_paths(
1523            &[
1524                "bucket/key1/file0",
1525                "bucket/key1/file1",
1526                "bucket/key1/file2",
1527                "bucket/key2/file3",
1528                "bucket/key2/file4",
1529                "bucket/key3/file5",
1530            ],
1531            &["test:///bucket/key1/", "test:///bucket/key2/"],
1532            5,
1533            5,
1534            Some(""),
1535        )
1536        .await?;
1537
1538        // more files as expected partitions
1539        assert_list_files_for_multi_paths(
1540            &[
1541                "bucket/key1/file0",
1542                "bucket/key1/file1",
1543                "bucket/key1/file2",
1544                "bucket/key2/file3",
1545                "bucket/key2/file4",
1546                "bucket/key3/file5",
1547            ],
1548            &["test:///bucket/key1/"],
1549            2,
1550            2,
1551            Some(""),
1552        )
1553        .await?;
1554
1555        // no files => no groups
1556        assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0, Some(""))
1557            .await?;
1558
1559        // files that don't match the prefix
1560        assert_list_files_for_multi_paths(
1561            &[
1562                "bucket/key1/file0",
1563                "bucket/key1/file1",
1564                "bucket/key1/file2",
1565                "bucket/key2/file3",
1566                "bucket/key2/file4",
1567                "bucket/key3/file5",
1568            ],
1569            &["test:///bucket/key3/"],
1570            2,
1571            1,
1572            Some(""),
1573        )
1574        .await?;
1575
1576        // files that don't match the prefix or the default file ext
1577        assert_list_files_for_multi_paths(
1578            &[
1579                "bucket/key1/file0.avro",
1580                "bucket/key1/file1.csv",
1581                "bucket/key1/file2.avro",
1582                "bucket/key2/file3.csv",
1583                "bucket/key2/file4.avro",
1584                "bucket/key3/file5.csv",
1585            ],
1586            &["test:///bucket/key1/", "test:///bucket/key3/"],
1587            2,
1588            2,
1589            None,
1590        )
1591        .await?;
1592        Ok(())
1593    }
1594
1595    async fn load_table(
1596        ctx: &SessionContext,
1597        name: &str,
1598    ) -> Result<Arc<dyn TableProvider>> {
1599        let testdata = crate::test_util::parquet_test_data();
1600        let filename = format!("{testdata}/{name}");
1601        let table_path = ListingTableUrl::parse(filename).unwrap();
1602
1603        let config = ListingTableConfig::new(table_path)
1604            .infer(&ctx.state())
1605            .await?;
1606        let table = ListingTable::try_new(config)?;
1607        Ok(Arc::new(table))
1608    }
1609
1610    /// Check that the files listed by the table match the specified `output_partitioning`
1611    /// when the object store contains `files`.
1612    async fn assert_list_files_for_scan_grouping(
1613        files: &[&str],
1614        table_prefix: &str,
1615        target_partitions: usize,
1616        output_partitioning: usize,
1617        file_ext: Option<&str>,
1618    ) -> Result<()> {
1619        let ctx = SessionContext::new();
1620        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1621
1622        let format = AvroFormat {};
1623
1624        let opt = ListingOptions::new(Arc::new(format))
1625            .with_file_extension_opt(file_ext)
1626            .with_target_partitions(target_partitions);
1627
1628        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1629
1630        let table_path = ListingTableUrl::parse(table_prefix).unwrap();
1631        let config = ListingTableConfig::new(table_path)
1632            .with_listing_options(opt)
1633            .with_schema(Arc::new(schema));
1634
1635        let table = ListingTable::try_new(config)?;
1636
1637        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1638
1639        assert_eq!(file_list.len(), output_partitioning);
1640
1641        Ok(())
1642    }
1643
1644    /// Check that the files listed by the table match the specified `output_partitioning`
1645    /// when the object store contains `files`.
1646    async fn assert_list_files_for_multi_paths(
1647        files: &[&str],
1648        table_prefix: &[&str],
1649        target_partitions: usize,
1650        output_partitioning: usize,
1651        file_ext: Option<&str>,
1652    ) -> Result<()> {
1653        let ctx = SessionContext::new();
1654        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1655
1656        let format = AvroFormat {};
1657
1658        let opt = ListingOptions::new(Arc::new(format))
1659            .with_file_extension_opt(file_ext)
1660            .with_target_partitions(target_partitions);
1661
1662        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1663
1664        let table_paths = table_prefix
1665            .iter()
1666            .map(|t| ListingTableUrl::parse(t).unwrap())
1667            .collect();
1668        let config = ListingTableConfig::new_with_multi_paths(table_paths)
1669            .with_listing_options(opt)
1670            .with_schema(Arc::new(schema));
1671
1672        let table = ListingTable::try_new(config)?;
1673
1674        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1675
1676        assert_eq!(file_list.len(), output_partitioning);
1677
1678        Ok(())
1679    }
1680
1681    #[tokio::test]
1682    async fn test_insert_into_append_new_json_files() -> Result<()> {
1683        let mut config_map: HashMap<String, String> = HashMap::new();
1684        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1685        config_map.insert(
1686            "datafusion.execution.soft_max_rows_per_output_file".into(),
1687            "10".into(),
1688        );
1689        helper_test_append_new_files_to_table(
1690            JsonFormat::default().get_ext(),
1691            FileCompressionType::UNCOMPRESSED,
1692            Some(config_map),
1693            2,
1694        )
1695        .await?;
1696        Ok(())
1697    }
1698
1699    #[tokio::test]
1700    async fn test_insert_into_append_new_csv_files() -> Result<()> {
1701        let mut config_map: HashMap<String, String> = HashMap::new();
1702        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1703        config_map.insert(
1704            "datafusion.execution.soft_max_rows_per_output_file".into(),
1705            "10".into(),
1706        );
1707        helper_test_append_new_files_to_table(
1708            CsvFormat::default().get_ext(),
1709            FileCompressionType::UNCOMPRESSED,
1710            Some(config_map),
1711            2,
1712        )
1713        .await?;
1714        Ok(())
1715    }
1716
1717    #[tokio::test]
1718    async fn test_insert_into_append_2_new_parquet_files_defaults() -> Result<()> {
1719        let mut config_map: HashMap<String, String> = HashMap::new();
1720        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1721        config_map.insert(
1722            "datafusion.execution.soft_max_rows_per_output_file".into(),
1723            "10".into(),
1724        );
1725        helper_test_append_new_files_to_table(
1726            ParquetFormat::default().get_ext(),
1727            FileCompressionType::UNCOMPRESSED,
1728            Some(config_map),
1729            2,
1730        )
1731        .await?;
1732        Ok(())
1733    }
1734
1735    #[tokio::test]
1736    async fn test_insert_into_append_1_new_parquet_files_defaults() -> Result<()> {
1737        let mut config_map: HashMap<String, String> = HashMap::new();
1738        config_map.insert("datafusion.execution.batch_size".into(), "20".into());
1739        config_map.insert(
1740            "datafusion.execution.soft_max_rows_per_output_file".into(),
1741            "20".into(),
1742        );
1743        helper_test_append_new_files_to_table(
1744            ParquetFormat::default().get_ext(),
1745            FileCompressionType::UNCOMPRESSED,
1746            Some(config_map),
1747            1,
1748        )
1749        .await?;
1750        Ok(())
1751    }
1752
1753    #[tokio::test]
1754    async fn test_insert_into_sql_csv_defaults() -> Result<()> {
1755        helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None)
1756            .await?;
1757        Ok(())
1758    }
1759
1760    #[tokio::test]
1761    async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
1762        helper_test_insert_into_sql(
1763            "csv",
1764            FileCompressionType::UNCOMPRESSED,
1765            "",
1766            Some(HashMap::from([("has_header".into(), "true".into())])),
1767        )
1768        .await?;
1769        Ok(())
1770    }
1771
1772    #[tokio::test]
1773    async fn test_insert_into_sql_json_defaults() -> Result<()> {
1774        helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None)
1775            .await?;
1776        Ok(())
1777    }
1778
1779    #[tokio::test]
1780    async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
1781        helper_test_insert_into_sql(
1782            "parquet",
1783            FileCompressionType::UNCOMPRESSED,
1784            "",
1785            None,
1786        )
1787        .await?;
1788        Ok(())
1789    }
1790
1791    #[tokio::test]
1792    async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
1793        let mut config_map: HashMap<String, String> = HashMap::new();
1794        config_map.insert(
1795            "datafusion.execution.parquet.compression".into(),
1796            "zstd(5)".into(),
1797        );
1798        config_map.insert(
1799            "datafusion.execution.parquet.dictionary_enabled".into(),
1800            "false".into(),
1801        );
1802        config_map.insert(
1803            "datafusion.execution.parquet.dictionary_page_size_limit".into(),
1804            "100".into(),
1805        );
1806        config_map.insert(
1807            "datafusion.execution.parquet.statistics_enabled".into(),
1808            "none".into(),
1809        );
1810        config_map.insert(
1811            "datafusion.execution.parquet.max_statistics_size".into(),
1812            "10".into(),
1813        );
1814        config_map.insert(
1815            "datafusion.execution.parquet.max_row_group_size".into(),
1816            "5".into(),
1817        );
1818        config_map.insert(
1819            "datafusion.execution.parquet.created_by".into(),
1820            "datafusion test".into(),
1821        );
1822        config_map.insert(
1823            "datafusion.execution.parquet.column_index_truncate_length".into(),
1824            "50".into(),
1825        );
1826        config_map.insert(
1827            "datafusion.execution.parquet.data_page_row_count_limit".into(),
1828            "50".into(),
1829        );
1830        config_map.insert(
1831            "datafusion.execution.parquet.bloom_filter_on_write".into(),
1832            "true".into(),
1833        );
1834        config_map.insert(
1835            "datafusion.execution.parquet.bloom_filter_fpp".into(),
1836            "0.01".into(),
1837        );
1838        config_map.insert(
1839            "datafusion.execution.parquet.bloom_filter_ndv".into(),
1840            "1000".into(),
1841        );
1842        config_map.insert(
1843            "datafusion.execution.parquet.writer_version".into(),
1844            "2.0".into(),
1845        );
1846        config_map.insert(
1847            "datafusion.execution.parquet.write_batch_size".into(),
1848            "5".into(),
1849        );
1850        helper_test_insert_into_sql(
1851            "parquet",
1852            FileCompressionType::UNCOMPRESSED,
1853            "",
1854            Some(config_map),
1855        )
1856        .await?;
1857        Ok(())
1858    }
1859
1860    #[tokio::test]
1861    async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
1862        let mut config_map: HashMap<String, String> = HashMap::new();
1863        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1864        config_map.insert(
1865            "datafusion.execution.soft_max_rows_per_output_file".into(),
1866            "10".into(),
1867        );
1868        config_map.insert(
1869            "datafusion.execution.parquet.compression".into(),
1870            "zstd(5)".into(),
1871        );
1872        config_map.insert(
1873            "datafusion.execution.parquet.dictionary_enabled".into(),
1874            "false".into(),
1875        );
1876        config_map.insert(
1877            "datafusion.execution.parquet.dictionary_page_size_limit".into(),
1878            "100".into(),
1879        );
1880        config_map.insert(
1881            "datafusion.execution.parquet.statistics_enabled".into(),
1882            "none".into(),
1883        );
1884        config_map.insert(
1885            "datafusion.execution.parquet.max_statistics_size".into(),
1886            "10".into(),
1887        );
1888        config_map.insert(
1889            "datafusion.execution.parquet.max_row_group_size".into(),
1890            "5".into(),
1891        );
1892        config_map.insert(
1893            "datafusion.execution.parquet.created_by".into(),
1894            "datafusion test".into(),
1895        );
1896        config_map.insert(
1897            "datafusion.execution.parquet.column_index_truncate_length".into(),
1898            "50".into(),
1899        );
1900        config_map.insert(
1901            "datafusion.execution.parquet.data_page_row_count_limit".into(),
1902            "50".into(),
1903        );
1904        config_map.insert(
1905            "datafusion.execution.parquet.encoding".into(),
1906            "delta_binary_packed".into(),
1907        );
1908        config_map.insert(
1909            "datafusion.execution.parquet.bloom_filter_on_write".into(),
1910            "true".into(),
1911        );
1912        config_map.insert(
1913            "datafusion.execution.parquet.bloom_filter_fpp".into(),
1914            "0.01".into(),
1915        );
1916        config_map.insert(
1917            "datafusion.execution.parquet.bloom_filter_ndv".into(),
1918            "1000".into(),
1919        );
1920        config_map.insert(
1921            "datafusion.execution.parquet.writer_version".into(),
1922            "2.0".into(),
1923        );
1924        config_map.insert(
1925            "datafusion.execution.parquet.write_batch_size".into(),
1926            "5".into(),
1927        );
1928        config_map.insert("datafusion.execution.batch_size".into(), "1".into());
1929        helper_test_append_new_files_to_table(
1930            ParquetFormat::default().get_ext(),
1931            FileCompressionType::UNCOMPRESSED,
1932            Some(config_map),
1933            2,
1934        )
1935        .await?;
1936        Ok(())
1937    }
1938
1939    #[tokio::test]
1940    async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
1941    ) -> Result<()> {
1942        let mut config_map: HashMap<String, String> = HashMap::new();
1943        config_map.insert(
1944            "datafusion.execution.parquet.compression".into(),
1945            "zstd".into(),
1946        );
1947        let e = helper_test_append_new_files_to_table(
1948            ParquetFormat::default().get_ext(),
1949            FileCompressionType::UNCOMPRESSED,
1950            Some(config_map),
1951            2,
1952        )
1953        .await
1954        .expect_err("Example should fail!");
1955        assert_eq!(e.strip_backtrace(), "Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)");
1956
1957        Ok(())
1958    }
1959
1960    async fn helper_test_append_new_files_to_table(
1961        file_type_ext: String,
1962        file_compression_type: FileCompressionType,
1963        session_config_map: Option<HashMap<String, String>>,
1964        expected_n_files_per_insert: usize,
1965    ) -> Result<()> {
1966        // Create the initial context, schema, and batch.
1967        let session_ctx = match session_config_map {
1968            Some(cfg) => {
1969                let config = SessionConfig::from_string_hash_map(&cfg)?;
1970                SessionContext::new_with_config(config)
1971            }
1972            None => SessionContext::new(),
1973        };
1974
1975        // Create a new schema with one field called "a" of type Int32
1976        let schema = Arc::new(Schema::new(vec![Field::new(
1977            "column1",
1978            DataType::Int32,
1979            false,
1980        )]));
1981
1982        let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
1983            Box::new(Expr::Column("column1".into())),
1984            Operator::GtEq,
1985            Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))),
1986        ));
1987
1988        // Create a new batch of data to insert into the table
1989        let batch = RecordBatch::try_new(
1990            schema.clone(),
1991            vec![Arc::new(arrow::array::Int32Array::from(vec![
1992                1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
1993            ]))],
1994        )?;
1995
1996        // Register appropriate table depending on file_type we want to test
1997        let tmp_dir = TempDir::new()?;
1998        match file_type_ext.as_str() {
1999            "csv" => {
2000                session_ctx
2001                    .register_csv(
2002                        "t",
2003                        tmp_dir.path().to_str().unwrap(),
2004                        CsvReadOptions::new()
2005                            .schema(schema.as_ref())
2006                            .file_compression_type(file_compression_type),
2007                    )
2008                    .await?;
2009            }
2010            "json" => {
2011                session_ctx
2012                    .register_json(
2013                        "t",
2014                        tmp_dir.path().to_str().unwrap(),
2015                        NdJsonReadOptions::default()
2016                            .schema(schema.as_ref())
2017                            .file_compression_type(file_compression_type),
2018                    )
2019                    .await?;
2020            }
2021            "parquet" => {
2022                session_ctx
2023                    .register_parquet(
2024                        "t",
2025                        tmp_dir.path().to_str().unwrap(),
2026                        ParquetReadOptions::default().schema(schema.as_ref()),
2027                    )
2028                    .await?;
2029            }
2030            "avro" => {
2031                session_ctx
2032                    .register_avro(
2033                        "t",
2034                        tmp_dir.path().to_str().unwrap(),
2035                        AvroReadOptions::default().schema(schema.as_ref()),
2036                    )
2037                    .await?;
2038            }
2039            "arrow" => {
2040                session_ctx
2041                    .register_arrow(
2042                        "t",
2043                        tmp_dir.path().to_str().unwrap(),
2044                        ArrowReadOptions::default().schema(schema.as_ref()),
2045                    )
2046                    .await?;
2047            }
2048            _ => panic!("Unrecognized file extension {file_type_ext}"),
2049        }
2050
2051        // Create and register the source table with the provided schema and inserted data
2052        let source_table = Arc::new(MemTable::try_new(
2053            schema.clone(),
2054            vec![vec![batch.clone(), batch.clone()]],
2055        )?);
2056        session_ctx.register_table("source", source_table.clone())?;
2057        // Convert the source table into a provider so that it can be used in a query
2058        let source = provider_as_source(source_table);
2059        let target = session_ctx.table_provider("t").await?;
2060        let target = Arc::new(DefaultTableSource::new(target));
2061        // Create a table scan logical plan to read from the source table
2062        let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
2063            .filter(filter_predicate)?
2064            .build()?;
2065        // Since logical plan contains a filter, increasing parallelism is helpful.
2066        // Therefore, we will have 8 partitions in the final plan.
2067        // Create an insert plan to insert the source data into the initial table
2068        let insert_into_table =
2069            LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
2070                .build()?;
2071        // Create a physical plan from the insert plan
2072        let plan = session_ctx
2073            .state()
2074            .create_physical_plan(&insert_into_table)
2075            .await?;
2076        // Execute the physical plan and collect the results
2077        let res = collect(plan, session_ctx.task_ctx()).await?;
2078        // Insert returns the number of rows written, in our case this would be 6.
2079        let expected = [
2080            "+-------+",
2081            "| count |",
2082            "+-------+",
2083            "| 20    |",
2084            "+-------+",
2085        ];
2086
2087        // Assert that the batches read from the file match the expected result.
2088        assert_batches_eq!(expected, &res);
2089
2090        // Read the records in the table
2091        let batches = session_ctx
2092            .sql("select count(*) as count from t")
2093            .await?
2094            .collect()
2095            .await?;
2096        let expected = [
2097            "+-------+",
2098            "| count |",
2099            "+-------+",
2100            "| 20    |",
2101            "+-------+",
2102        ];
2103
2104        // Assert that the batches read from the file match the expected result.
2105        assert_batches_eq!(expected, &batches);
2106
2107        // Assert that `target_partition_number` many files were added to the table.
2108        let num_files = tmp_dir.path().read_dir()?.count();
2109        assert_eq!(num_files, expected_n_files_per_insert);
2110
2111        // Create a physical plan from the insert plan
2112        let plan = session_ctx
2113            .state()
2114            .create_physical_plan(&insert_into_table)
2115            .await?;
2116
2117        // Again, execute the physical plan and collect the results
2118        let res = collect(plan, session_ctx.task_ctx()).await?;
2119        // Insert returns the number of rows written, in our case this would be 6.
2120        let expected = [
2121            "+-------+",
2122            "| count |",
2123            "+-------+",
2124            "| 20    |",
2125            "+-------+",
2126        ];
2127
2128        // Assert that the batches read from the file match the expected result.
2129        assert_batches_eq!(expected, &res);
2130
2131        // Read the contents of the table
2132        let batches = session_ctx
2133            .sql("select count(*) AS count from t")
2134            .await?
2135            .collect()
2136            .await?;
2137
2138        // Define the expected result after the second append.
2139        let expected = [
2140            "+-------+",
2141            "| count |",
2142            "+-------+",
2143            "| 40    |",
2144            "+-------+",
2145        ];
2146
2147        // Assert that the batches read from the file after the second append match the expected result.
2148        assert_batches_eq!(expected, &batches);
2149
2150        // Assert that another `target_partition_number` many files were added to the table.
2151        let num_files = tmp_dir.path().read_dir()?.count();
2152        assert_eq!(num_files, expected_n_files_per_insert * 2);
2153
2154        // Return Ok if the function
2155        Ok(())
2156    }
2157
2158    /// tests insert into with end to end sql
2159    /// create external table + insert into statements
2160    async fn helper_test_insert_into_sql(
2161        file_type: &str,
2162        // TODO test with create statement options such as compression
2163        _file_compression_type: FileCompressionType,
2164        external_table_options: &str,
2165        session_config_map: Option<HashMap<String, String>>,
2166    ) -> Result<()> {
2167        // Create the initial context
2168        let session_ctx = match session_config_map {
2169            Some(cfg) => {
2170                let config = SessionConfig::from_string_hash_map(&cfg)?;
2171                SessionContext::new_with_config(config)
2172            }
2173            None => SessionContext::new(),
2174        };
2175
2176        // create table
2177        let tmp_dir = TempDir::new()?;
2178        let tmp_path = tmp_dir.into_path();
2179        let str_path = tmp_path.to_str().expect("Temp path should convert to &str");
2180        session_ctx
2181            .sql(&format!(
2182                "create external table foo(a varchar, b varchar, c int) \
2183                        stored as {file_type} \
2184                        location '{str_path}' \
2185                        {external_table_options}"
2186            ))
2187            .await?
2188            .collect()
2189            .await?;
2190
2191        // insert data
2192        session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
2193            .await?
2194            .collect()
2195            .await?;
2196
2197        // check count
2198        let batches = session_ctx
2199            .sql("select * from foo")
2200            .await?
2201            .collect()
2202            .await?;
2203
2204        let expected = [
2205            "+-----+-----+---+",
2206            "| a   | b   | c |",
2207            "+-----+-----+---+",
2208            "| foo | bar | 1 |",
2209            "| foo | bar | 2 |",
2210            "| foo | bar | 3 |",
2211            "+-----+-----+---+",
2212        ];
2213        assert_batches_eq!(expected, &batches);
2214
2215        Ok(())
2216    }
2217
2218    #[tokio::test]
2219    async fn test_infer_options_compressed_csv() -> Result<()> {
2220        let testdata = crate::test_util::arrow_test_data();
2221        let filename = format!("{}/csv/aggregate_test_100.csv.gz", testdata);
2222        let table_path = ListingTableUrl::parse(filename).unwrap();
2223
2224        let ctx = SessionContext::new();
2225
2226        let config = ListingTableConfig::new(table_path);
2227        let config_with_opts = config.infer_options(&ctx.state()).await?;
2228        let config_with_schema = config_with_opts.infer_schema(&ctx.state()).await?;
2229
2230        let schema = config_with_schema.file_schema.unwrap();
2231
2232        assert_eq!(schema.fields.len(), 13);
2233
2234        Ok(())
2235    }
2236}