datafusion_catalog_listing/options.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::datatypes::{DataType, SchemaRef};
19use datafusion_catalog::Session;
20use datafusion_common::plan_err;
21use datafusion_datasource::ListingTableUrl;
22use datafusion_datasource::file_format::FileFormat;
23use datafusion_execution::config::SessionConfig;
24use datafusion_expr::SortExpr;
25use futures::StreamExt;
26use futures::TryStreamExt;
27use itertools::Itertools;
28use std::sync::Arc;
29
30/// Options for creating a [`crate::ListingTable`]
31#[derive(Clone, Debug)]
32pub struct ListingOptions {
33 /// A suffix on which files should be filtered (leave empty to
34 /// keep all files on the path)
35 pub file_extension: String,
36 /// The file format
37 pub format: Arc<dyn FileFormat>,
38 /// The expected partition column names in the folder structure.
39 /// See [Self::with_table_partition_cols] for details
40 pub table_partition_cols: Vec<(String, DataType)>,
41 /// Set true to try to guess statistics from the files.
42 /// This can add a lot of overhead as it will usually require files
43 /// to be opened and at least partially parsed.
44 pub collect_stat: bool,
45 /// Group files to avoid that the number of partitions exceeds
46 /// this limit
47 pub target_partitions: usize,
48 /// Optional pre-known sort order(s). Must be `SortExpr`s.
49 ///
50 /// DataFusion may take advantage of this ordering to omit sorts
51 /// or use more efficient algorithms. Currently sortedness must be
52 /// provided if it is known by some external mechanism, but may in
53 /// the future be automatically determined, for example using
54 /// parquet metadata.
55 ///
56 /// See <https://github.com/apache/datafusion/issues/4177>
57 ///
58 /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
59 /// where each ordering consists of an individual lexicographic
60 /// ordering (encapsulated by a `Vec<Expr>`). If there aren't
61 /// multiple equivalent orderings, the outer `Vec` will have a
62 /// single element.
63 pub file_sort_order: Vec<Vec<SortExpr>>,
64}
65
66impl ListingOptions {
67 /// Creates an options instance with the given format
68 /// Default values:
69 /// - use default file extension filter
70 /// - no input partition to discover
71 /// - one target partition
72 /// - do not collect statistics
73 pub fn new(format: Arc<dyn FileFormat>) -> Self {
74 Self {
75 file_extension: format.get_ext(),
76 format,
77 table_partition_cols: vec![],
78 collect_stat: false,
79 target_partitions: 1,
80 file_sort_order: vec![],
81 }
82 }
83
84 /// Set options from [`SessionConfig`] and returns self.
85 ///
86 /// Currently this sets `target_partitions` and `collect_stat`
87 /// but if more options are added in the future that need to be coordinated
88 /// they will be synchronized through this method.
89 pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
90 self = self.with_target_partitions(config.target_partitions());
91 self = self.with_collect_stat(config.collect_statistics());
92 self
93 }
94
95 /// Set file extension on [`ListingOptions`] and returns self.
96 ///
97 /// # Example
98 /// ```
99 /// # use std::sync::Arc;
100 /// # use datafusion_catalog_listing::ListingOptions;
101 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
102 ///
103 /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
104 /// .with_file_extension(".parquet");
105 ///
106 /// assert_eq!(listing_options.file_extension, ".parquet");
107 /// ```
108 pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
109 self.file_extension = file_extension.into();
110 self
111 }
112
113 /// Optionally set file extension on [`ListingOptions`] and returns self.
114 ///
115 /// If `file_extension` is `None`, the file extension will not be changed
116 ///
117 /// # Example
118 /// ```
119 /// # use std::sync::Arc;
120 /// # use datafusion_catalog_listing::ListingOptions;
121 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
122 ///
123 /// let extension = Some(".parquet");
124 /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
125 /// .with_file_extension_opt(extension);
126 ///
127 /// assert_eq!(listing_options.file_extension, ".parquet");
128 /// ```
129 pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
130 where
131 S: Into<String>,
132 {
133 if let Some(file_extension) = file_extension {
134 self.file_extension = file_extension.into();
135 }
136 self
137 }
138
139 /// Set `table partition columns` on [`ListingOptions`] and returns self.
140 ///
141 /// "partition columns," used to support [Hive Partitioning], are
142 /// columns added to the data that is read, based on the folder
143 /// structure where the data resides.
144 ///
145 /// For example, give the following files in your filesystem:
146 ///
147 /// ```text
148 /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet
149 /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet
150 /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet
151 /// ```
152 ///
153 /// A [`crate::ListingTable`] created at `/mnt/nyctaxi/` with partition
154 /// columns "year" and "month" will include new `year` and `month`
155 /// columns while reading the files. The `year` column would have
156 /// value `2022` and the `month` column would have value `01` for
157 /// the rows read from
158 /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet`
159 ///
160 ///# Notes
161 ///
162 /// - If only one level (e.g. `year` in the example above) is
163 /// specified, the other levels are ignored but the files are
164 /// still read.
165 ///
166 /// - Files that don't follow this partitioning scheme will be
167 /// ignored.
168 ///
169 /// - Since the columns have the same value for all rows read from
170 /// each individual file (such as dates), they are typically
171 /// dictionary encoded for efficiency. You may use
172 /// [`wrap_partition_type_in_dict`] to request a
173 /// dictionary-encoded type.
174 ///
175 /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// # use std::sync::Arc;
181 /// # use arrow::datatypes::DataType;
182 /// # use datafusion_expr::col;
183 /// # use datafusion_catalog_listing::ListingOptions;
184 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
185 ///
186 /// // listing options for files with paths such as `/mnt/data/col_a=x/col_b=y/data.parquet`
187 /// // `col_a` and `col_b` will be included in the data read from those files
188 /// let listing_options = ListingOptions::new(Arc::new(
189 /// ParquetFormat::default()
190 /// ))
191 /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
192 /// ("col_b".to_string(), DataType::Utf8)]);
193 ///
194 /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
195 /// ("col_b".to_string(), DataType::Utf8)]);
196 /// ```
197 ///
198 /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
199 /// [`wrap_partition_type_in_dict`]: datafusion_datasource::file_scan_config::wrap_partition_type_in_dict
200 pub fn with_table_partition_cols(
201 mut self,
202 table_partition_cols: Vec<(String, DataType)>,
203 ) -> Self {
204 self.table_partition_cols = table_partition_cols;
205 self
206 }
207
208 /// Set stat collection on [`ListingOptions`] and returns self.
209 ///
210 /// ```
211 /// # use std::sync::Arc;
212 /// # use datafusion_catalog_listing::ListingOptions;
213 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
214 ///
215 /// let listing_options =
216 /// ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
217 ///
218 /// assert_eq!(listing_options.collect_stat, true);
219 /// ```
220 pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
221 self.collect_stat = collect_stat;
222 self
223 }
224
225 /// Set number of target partitions on [`ListingOptions`] and returns self.
226 ///
227 /// ```
228 /// # use std::sync::Arc;
229 /// # use datafusion_catalog_listing::ListingOptions;
230 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
231 ///
232 /// let listing_options =
233 /// ListingOptions::new(Arc::new(ParquetFormat::default())).with_target_partitions(8);
234 ///
235 /// assert_eq!(listing_options.target_partitions, 8);
236 /// ```
237 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
238 self.target_partitions = target_partitions;
239 self
240 }
241
242 /// Set file sort order on [`ListingOptions`] and returns self.
243 ///
244 /// ```
245 /// # use std::sync::Arc;
246 /// # use datafusion_expr::col;
247 /// # use datafusion_catalog_listing::ListingOptions;
248 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
249 ///
250 /// // Tell datafusion that the files are sorted by column "a"
251 /// let file_sort_order = vec![vec![col("a").sort(true, true)]];
252 ///
253 /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
254 /// .with_file_sort_order(file_sort_order.clone());
255 ///
256 /// assert_eq!(listing_options.file_sort_order, file_sort_order);
257 /// ```
258 pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
259 self.file_sort_order = file_sort_order;
260 self
261 }
262
263 /// Infer the schema of the files at the given path on the provided object store.
264 ///
265 /// If the table_path contains one or more files (i.e. it is a directory /
266 /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`].
267 ///
268 /// Returns a `Plan` error if `table_path` contains no files at all (e.g. an
269 /// empty or non-existent directory), since an inferred schema with zero
270 /// columns produces confusing "column not found" errors at query time.
271 /// Callers that need to support empty locations must declare an explicit
272 /// schema instead of relying on inference. Locations that contain files
273 /// which all happen to be 0-byte are still accepted — the empty files are
274 /// filtered out before format-specific inference runs.
275 ///
276 /// Note: The inferred schema does not include any partitioning columns.
277 ///
278 /// This method is called as part of creating a [`crate::ListingTable`].
279 pub async fn infer_schema<'a>(
280 &'a self,
281 state: &dyn Session,
282 table_path: &'a ListingTableUrl,
283 ) -> datafusion_common::Result<SchemaRef> {
284 let store = state.runtime_env().object_store(table_path)?;
285
286 let all_files: Vec<_> = table_path
287 .list_all_files(state, store.as_ref(), &self.file_extension)
288 .await?
289 .try_collect()
290 .await?;
291
292 if all_files.is_empty() {
293 return plan_err!(
294 "No files found at {}. \
295 Cannot infer schema from an empty location; either add data files \
296 or declare an explicit schema for the table.",
297 table_path
298 );
299 }
300
301 // Empty files cannot affect schema but may throw when trying to read for it
302 let files: Vec<_> = all_files
303 .into_iter()
304 .filter(|object_meta| object_meta.size > 0)
305 .collect();
306
307 let schema = self.format.infer_schema(state, &store, &files).await?;
308
309 Ok(schema)
310 }
311
312 /// Infers the partition columns stored in `LOCATION` and compares
313 /// them with the columns provided in `PARTITIONED BY` to help prevent
314 /// accidental corrupts of partitioned tables.
315 ///
316 /// Allows specifying partial partitions.
317 pub async fn validate_partitions(
318 &self,
319 state: &dyn Session,
320 table_path: &ListingTableUrl,
321 ) -> datafusion_common::Result<()> {
322 if self.table_partition_cols.is_empty() {
323 return Ok(());
324 }
325
326 if !table_path.is_collection() {
327 return plan_err!(
328 "Can't create a partitioned table backed by a single file, \
329 perhaps the URL is missing a trailing slash?"
330 );
331 }
332
333 let inferred = self.infer_partitions(state, table_path).await?;
334
335 // no partitioned files found on disk
336 if inferred.is_empty() {
337 return Ok(());
338 }
339
340 let table_partition_names = self
341 .table_partition_cols
342 .iter()
343 .map(|(col_name, _)| col_name.clone())
344 .collect_vec();
345
346 if inferred.len() < table_partition_names.len() {
347 return plan_err!(
348 "Inferred partitions to be {:?}, but got {:?}",
349 inferred,
350 table_partition_names
351 );
352 }
353
354 // match prefix to allow creating tables with partial partitions
355 for (idx, col) in table_partition_names.iter().enumerate() {
356 if &inferred[idx] != col {
357 return plan_err!(
358 "Inferred partitions to be {:?}, but got {:?}",
359 inferred,
360 table_partition_names
361 );
362 }
363 }
364
365 Ok(())
366 }
367
368 /// Infer the partitioning at the given path on the provided object store.
369 /// For performance reasons, it doesn't read all the files on disk
370 /// and therefore may fail to detect invalid partitioning.
371 pub async fn infer_partitions(
372 &self,
373 state: &dyn Session,
374 table_path: &ListingTableUrl,
375 ) -> datafusion_common::Result<Vec<String>> {
376 let store = state.runtime_env().object_store(table_path)?;
377
378 // only use 10 files for inference
379 // This can fail to detect inconsistent partition keys
380 // A DFS traversal approach of the store can help here
381 let files: Vec<_> = table_path
382 .list_all_files(state, store.as_ref(), &self.file_extension)
383 .await?
384 .take(10)
385 .try_collect()
386 .await?;
387
388 let stripped_path_parts = files.iter().map(|file| {
389 table_path
390 .strip_prefix(&file.location)
391 .unwrap()
392 .collect_vec()
393 });
394
395 let partition_keys = stripped_path_parts
396 .map(|path_parts| {
397 path_parts
398 .into_iter()
399 .rev()
400 .skip(1) // get parents only; skip the file itself
401 .rev()
402 // Partitions are expected to follow the format "column_name=value", so we
403 // should ignore any path part that cannot be parsed into the expected format
404 .filter(|s| s.contains('='))
405 .map(|s| s.split('=').take(1).collect())
406 .collect_vec()
407 })
408 .collect_vec();
409
410 match partition_keys.into_iter().all_equal_value() {
411 Ok(v) => Ok(v),
412 Err(None) => Ok(vec![]),
413 Err(Some(diff)) => {
414 let mut sorted_diff = [diff.0, diff.1];
415 sorted_diff.sort();
416 plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
417 }
418 }
419 }
420}