Skip to main content

datafusion_catalog_listing/
options.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::datatypes::{DataType, SchemaRef};
19use datafusion_catalog::Session;
20use datafusion_common::plan_err;
21use datafusion_datasource::ListingTableUrl;
22use datafusion_datasource::file_format::FileFormat;
23use datafusion_execution::config::SessionConfig;
24use datafusion_expr::SortExpr;
25use futures::StreamExt;
26use futures::TryStreamExt;
27use itertools::Itertools;
28use std::sync::Arc;
29
30/// Options for creating a [`crate::ListingTable`]
31#[derive(Clone, Debug)]
32pub struct ListingOptions {
33    /// A suffix on which files should be filtered (leave empty to
34    /// keep all files on the path)
35    pub file_extension: String,
36    /// The file format
37    pub format: Arc<dyn FileFormat>,
38    /// The expected partition column names in the folder structure.
39    /// See [Self::with_table_partition_cols] for details
40    pub table_partition_cols: Vec<(String, DataType)>,
41    /// Set true to try to guess statistics from the files.
42    /// This can add a lot of overhead as it will usually require files
43    /// to be opened and at least partially parsed.
44    pub collect_stat: bool,
45    /// Group files to avoid that the number of partitions exceeds
46    /// this limit
47    pub target_partitions: usize,
48    /// Optional pre-known sort order(s). Must be `SortExpr`s.
49    ///
50    /// DataFusion may take advantage of this ordering to omit sorts
51    /// or use more efficient algorithms. Currently sortedness must be
52    /// provided if it is known by some external mechanism, but may in
53    /// the future be automatically determined, for example using
54    /// parquet metadata.
55    ///
56    /// See <https://github.com/apache/datafusion/issues/4177>
57    ///
58    /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
59    ///       where each ordering consists of an individual lexicographic
60    ///       ordering (encapsulated by a `Vec<Expr>`). If there aren't
61    ///       multiple equivalent orderings, the outer `Vec` will have a
62    ///       single element.
63    pub file_sort_order: Vec<Vec<SortExpr>>,
64}
65
66impl ListingOptions {
67    /// Creates an options instance with the given format
68    /// Default values:
69    /// - use default file extension filter
70    /// - no input partition to discover
71    /// - one target partition
72    /// - do not collect statistics
73    pub fn new(format: Arc<dyn FileFormat>) -> Self {
74        Self {
75            file_extension: format.get_ext(),
76            format,
77            table_partition_cols: vec![],
78            collect_stat: false,
79            target_partitions: 1,
80            file_sort_order: vec![],
81        }
82    }
83
84    /// Set options from [`SessionConfig`] and returns self.
85    ///
86    /// Currently this sets `target_partitions` and `collect_stat`
87    /// but if more options are added in the future that need to be coordinated
88    /// they will be synchronized through this method.
89    pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
90        self = self.with_target_partitions(config.target_partitions());
91        self = self.with_collect_stat(config.collect_statistics());
92        self
93    }
94
95    /// Set file extension on [`ListingOptions`] and returns self.
96    ///
97    /// # Example
98    /// ```
99    /// # use std::sync::Arc;
100    /// # use datafusion_catalog_listing::ListingOptions;
101    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
102    ///
103    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
104    ///     .with_file_extension(".parquet");
105    ///
106    /// assert_eq!(listing_options.file_extension, ".parquet");
107    /// ```
108    pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
109        self.file_extension = file_extension.into();
110        self
111    }
112
113    /// Optionally set file extension on [`ListingOptions`] and returns self.
114    ///
115    /// If `file_extension` is `None`, the file extension will not be changed
116    ///
117    /// # Example
118    /// ```
119    /// # use std::sync::Arc;
120    /// # use datafusion_catalog_listing::ListingOptions;
121    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
122    ///
123    /// let extension = Some(".parquet");
124    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
125    ///     .with_file_extension_opt(extension);
126    ///
127    /// assert_eq!(listing_options.file_extension, ".parquet");
128    /// ```
129    pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
130    where
131        S: Into<String>,
132    {
133        if let Some(file_extension) = file_extension {
134            self.file_extension = file_extension.into();
135        }
136        self
137    }
138
139    /// Set `table partition columns` on [`ListingOptions`] and returns self.
140    ///
141    /// "partition columns," used to support [Hive Partitioning], are
142    /// columns added to the data that is read, based on the folder
143    /// structure where the data resides.
144    ///
145    /// For example, give the following files in your filesystem:
146    ///
147    /// ```text
148    /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet
149    /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet
150    /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet
151    /// ```
152    ///
153    /// A [`crate::ListingTable`] created at `/mnt/nyctaxi/` with partition
154    /// columns "year" and "month" will include new `year` and `month`
155    /// columns while reading the files. The `year` column would have
156    /// value `2022` and the `month` column would have value `01` for
157    /// the rows read from
158    /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet`
159    ///
160    ///# Notes
161    ///
162    /// - If only one level (e.g. `year` in the example above) is
163    ///   specified, the other levels are ignored but the files are
164    ///   still read.
165    ///
166    /// - Files that don't follow this partitioning scheme will be
167    ///   ignored.
168    ///
169    /// - Since the columns have the same value for all rows read from
170    ///   each individual file (such as dates), they are typically
171    ///   dictionary encoded for efficiency. You may use
172    ///   [`wrap_partition_type_in_dict`] to request a
173    ///   dictionary-encoded type.
174    ///
175    /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// # use std::sync::Arc;
181    /// # use arrow::datatypes::DataType;
182    /// # use datafusion_expr::col;
183    /// # use datafusion_catalog_listing::ListingOptions;
184    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
185    ///
186    /// // listing options for files with paths such as  `/mnt/data/col_a=x/col_b=y/data.parquet`
187    /// // `col_a` and `col_b` will be included in the data read from those files
188    /// let listing_options = ListingOptions::new(Arc::new(
189    ///     ParquetFormat::default()
190    ///   ))
191    ///   .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
192    ///       ("col_b".to_string(), DataType::Utf8)]);
193    ///
194    /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
195    ///     ("col_b".to_string(), DataType::Utf8)]);
196    /// ```
197    ///
198    /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
199    /// [`wrap_partition_type_in_dict`]: datafusion_datasource::file_scan_config::wrap_partition_type_in_dict
200    pub fn with_table_partition_cols(
201        mut self,
202        table_partition_cols: Vec<(String, DataType)>,
203    ) -> Self {
204        self.table_partition_cols = table_partition_cols;
205        self
206    }
207
208    /// Set stat collection on [`ListingOptions`] and returns self.
209    ///
210    /// ```
211    /// # use std::sync::Arc;
212    /// # use datafusion_catalog_listing::ListingOptions;
213    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
214    ///
215    /// let listing_options =
216    ///     ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
217    ///
218    /// assert_eq!(listing_options.collect_stat, true);
219    /// ```
220    pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
221        self.collect_stat = collect_stat;
222        self
223    }
224
225    /// Set number of target partitions on [`ListingOptions`] and returns self.
226    ///
227    /// ```
228    /// # use std::sync::Arc;
229    /// # use datafusion_catalog_listing::ListingOptions;
230    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
231    ///
232    /// let listing_options =
233    ///     ListingOptions::new(Arc::new(ParquetFormat::default())).with_target_partitions(8);
234    ///
235    /// assert_eq!(listing_options.target_partitions, 8);
236    /// ```
237    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
238        self.target_partitions = target_partitions;
239        self
240    }
241
242    /// Set file sort order on [`ListingOptions`] and returns self.
243    ///
244    /// ```
245    /// # use std::sync::Arc;
246    /// # use datafusion_expr::col;
247    /// # use datafusion_catalog_listing::ListingOptions;
248    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
249    ///
250    /// // Tell datafusion that the files are sorted by column "a"
251    /// let file_sort_order = vec![vec![col("a").sort(true, true)]];
252    ///
253    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
254    ///     .with_file_sort_order(file_sort_order.clone());
255    ///
256    /// assert_eq!(listing_options.file_sort_order, file_sort_order);
257    /// ```
258    pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
259        self.file_sort_order = file_sort_order;
260        self
261    }
262
263    /// Infer the schema of the files at the given path on the provided object store.
264    ///
265    /// If the table_path contains one or more files (i.e. it is a directory /
266    /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`].
267    ///
268    /// Returns a `Plan` error if `table_path` contains no files at all (e.g. an
269    /// empty or non-existent directory), since an inferred schema with zero
270    /// columns produces confusing "column not found" errors at query time.
271    /// Callers that need to support empty locations must declare an explicit
272    /// schema instead of relying on inference. Locations that contain files
273    /// which all happen to be 0-byte are still accepted — the empty files are
274    /// filtered out before format-specific inference runs.
275    ///
276    /// Note: The inferred schema does not include any partitioning columns.
277    ///
278    /// This method is called as part of creating a [`crate::ListingTable`].
279    pub async fn infer_schema<'a>(
280        &'a self,
281        state: &dyn Session,
282        table_path: &'a ListingTableUrl,
283    ) -> datafusion_common::Result<SchemaRef> {
284        let store = state.runtime_env().object_store(table_path)?;
285
286        let all_files: Vec<_> = table_path
287            .list_all_files(state, store.as_ref(), &self.file_extension)
288            .await?
289            .try_collect()
290            .await?;
291
292        if all_files.is_empty() {
293            return plan_err!(
294                "No files found at {}. \
295                 Cannot infer schema from an empty location; either add data files \
296                 or declare an explicit schema for the table.",
297                table_path
298            );
299        }
300
301        // Empty files cannot affect schema but may throw when trying to read for it
302        let files: Vec<_> = all_files
303            .into_iter()
304            .filter(|object_meta| object_meta.size > 0)
305            .collect();
306
307        let schema = self.format.infer_schema(state, &store, &files).await?;
308
309        Ok(schema)
310    }
311
312    /// Infers the partition columns stored in `LOCATION` and compares
313    /// them with the columns provided in `PARTITIONED BY` to help prevent
314    /// accidental corrupts of partitioned tables.
315    ///
316    /// Allows specifying partial partitions.
317    pub async fn validate_partitions(
318        &self,
319        state: &dyn Session,
320        table_path: &ListingTableUrl,
321    ) -> datafusion_common::Result<()> {
322        if self.table_partition_cols.is_empty() {
323            return Ok(());
324        }
325
326        if !table_path.is_collection() {
327            return plan_err!(
328                "Can't create a partitioned table backed by a single file, \
329                perhaps the URL is missing a trailing slash?"
330            );
331        }
332
333        let inferred = self.infer_partitions(state, table_path).await?;
334
335        // no partitioned files found on disk
336        if inferred.is_empty() {
337            return Ok(());
338        }
339
340        let table_partition_names = self
341            .table_partition_cols
342            .iter()
343            .map(|(col_name, _)| col_name.clone())
344            .collect_vec();
345
346        if inferred.len() < table_partition_names.len() {
347            return plan_err!(
348                "Inferred partitions to be {:?}, but got {:?}",
349                inferred,
350                table_partition_names
351            );
352        }
353
354        // match prefix to allow creating tables with partial partitions
355        for (idx, col) in table_partition_names.iter().enumerate() {
356            if &inferred[idx] != col {
357                return plan_err!(
358                    "Inferred partitions to be {:?}, but got {:?}",
359                    inferred,
360                    table_partition_names
361                );
362            }
363        }
364
365        Ok(())
366    }
367
368    /// Infer the partitioning at the given path on the provided object store.
369    /// For performance reasons, it doesn't read all the files on disk
370    /// and therefore may fail to detect invalid partitioning.
371    pub async fn infer_partitions(
372        &self,
373        state: &dyn Session,
374        table_path: &ListingTableUrl,
375    ) -> datafusion_common::Result<Vec<String>> {
376        let store = state.runtime_env().object_store(table_path)?;
377
378        // only use 10 files for inference
379        // This can fail to detect inconsistent partition keys
380        // A DFS traversal approach of the store can help here
381        let files: Vec<_> = table_path
382            .list_all_files(state, store.as_ref(), &self.file_extension)
383            .await?
384            .take(10)
385            .try_collect()
386            .await?;
387
388        let stripped_path_parts = files.iter().map(|file| {
389            table_path
390                .strip_prefix(&file.location)
391                .unwrap()
392                .collect_vec()
393        });
394
395        let partition_keys = stripped_path_parts
396            .map(|path_parts| {
397                path_parts
398                    .into_iter()
399                    .rev()
400                    .skip(1) // get parents only; skip the file itself
401                    .rev()
402                    // Partitions are expected to follow the format "column_name=value", so we
403                    // should ignore any path part that cannot be parsed into the expected format
404                    .filter(|s| s.contains('='))
405                    .map(|s| s.split('=').take(1).collect())
406                    .collect_vec()
407            })
408            .collect_vec();
409
410        match partition_keys.into_iter().all_equal_value() {
411            Ok(v) => Ok(v),
412            Err(None) => Ok(vec![]),
413            Err(Some(diff)) => {
414                let mut sorted_diff = [diff.0, diff.1];
415                sorted_diff.sort();
416                plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
417            }
418        }
419    }
420}