datafusion_catalog_listing/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::options::ListingOptions;
19use arrow::datatypes::{DataType, Schema, SchemaRef};
20use datafusion_catalog::Session;
21use datafusion_common::{config_err, internal_err};
22use datafusion_datasource::ListingTableUrl;
23use datafusion_datasource::file_compression_type::FileCompressionType;
24#[expect(deprecated)]
25use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
26use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
27use std::str::FromStr;
28use std::sync::Arc;
29
30/// Indicates the source of the schema for a [`crate::ListingTable`]
31// PartialEq required for assert_eq! in tests
32#[derive(Debug, Clone, Copy, PartialEq, Default)]
33pub enum SchemaSource {
34    /// Schema is not yet set (initial state)
35    #[default]
36    Unset,
37    /// Schema was inferred from first table_path
38    Inferred,
39    /// Schema was specified explicitly via with_schema
40    Specified,
41}
42
43/// Configuration for creating a [`crate::ListingTable`]
44///
45/// # Schema Evolution Support
46///
47/// This configuration supports schema evolution through the optional
48/// [`PhysicalExprAdapterFactory`]. You might want to override the default factory when you need:
49///
50/// - **Type coercion requirements**: When you need custom logic for converting between
51///   different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
52/// - **Column mapping**: You need to map columns with a legacy name to a new name
53/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`.
54#[derive(Debug, Clone, Default)]
55pub struct ListingTableConfig {
56    /// Paths on the `ObjectStore` for creating [`crate::ListingTable`].
57    /// They should share the same schema and object store.
58    pub table_paths: Vec<ListingTableUrl>,
59    /// Optional `SchemaRef` for the to be created [`crate::ListingTable`].
60    ///
61    /// See details on [`ListingTableConfig::with_schema`]
62    pub file_schema: Option<SchemaRef>,
63    /// Optional [`ListingOptions`] for the to be created [`crate::ListingTable`].
64    ///
65    /// See details on [`ListingTableConfig::with_listing_options`]
66    pub options: Option<ListingOptions>,
67    /// Tracks the source of the schema information
68    pub(crate) schema_source: SchemaSource,
69    /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
70    pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
71}
72
73impl ListingTableConfig {
74    /// Creates new [`ListingTableConfig`] for reading the specified URL
75    pub fn new(table_path: ListingTableUrl) -> Self {
76        Self {
77            table_paths: vec![table_path],
78            ..Default::default()
79        }
80    }
81
82    /// Creates new [`ListingTableConfig`] with multiple table paths.
83    ///
84    ///  See `ListingTableConfigExt::infer_options` for details on what happens with multiple paths
85    pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
86        Self {
87            table_paths,
88            ..Default::default()
89        }
90    }
91
92    /// Returns the source of the schema for this configuration
93    pub fn schema_source(&self) -> SchemaSource {
94        self.schema_source
95    }
96    /// Set the `schema` for the overall [`crate::ListingTable`]
97    ///
98    /// [`crate::ListingTable`] will automatically coerce, when possible, the schema
99    /// for individual files to match this schema.
100    ///
101    /// If a schema is not provided, it is inferred using
102    /// [`Self::infer_schema`].
103    ///
104    /// If the schema is provided, it must contain only the fields in the file
105    /// without the table partitioning columns.
106    ///
107    /// # Example: Specifying Table Schema
108    /// ```rust
109    /// # use std::sync::Arc;
110    /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
111    /// # use datafusion_datasource::ListingTableUrl;
112    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
113    /// # use arrow::datatypes::{Schema, Field, DataType};
114    /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
115    /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
116    /// let schema = Arc::new(Schema::new(vec![
117    ///     Field::new("id", DataType::Int64, false),
118    ///     Field::new("name", DataType::Utf8, true),
119    /// ]));
120    ///
121    /// let config = ListingTableConfig::new(table_paths)
122    ///     .with_listing_options(listing_options)  // Set options first
123    ///     .with_schema(schema);                    // Then set schema
124    /// ```
125    pub fn with_schema(self, schema: SchemaRef) -> Self {
126        // Note: We preserve existing options state, but downstream code may expect
127        // options to be set. Consider calling with_listing_options() or infer_options()
128        // before operations that require options to be present.
129        debug_assert!(
130            self.options.is_some() || cfg!(test),
131            "ListingTableConfig::with_schema called without options set. \
132             Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code."
133        );
134
135        Self {
136            file_schema: Some(schema),
137            schema_source: SchemaSource::Specified,
138            ..self
139        }
140    }
141
142    /// Add `listing_options` to [`ListingTableConfig`]
143    ///
144    /// If not provided, format and other options are inferred via
145    /// `ListingTableConfigExt::infer_options`.
146    ///
147    /// # Example: Configuring Parquet Files with Custom Options
148    /// ```rust
149    /// # use std::sync::Arc;
150    /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
151    /// # use datafusion_datasource::ListingTableUrl;
152    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
153    /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
154    /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
155    ///     .with_file_extension(".parquet")
156    ///     .with_collect_stat(true);
157    ///
158    /// let config = ListingTableConfig::new(table_paths).with_listing_options(options);
159    /// // Configure file format and options
160    /// ```
161    pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
162        // Note: This method properly sets options, but be aware that downstream
163        // methods like infer_schema() and try_new() require both schema and options
164        // to be set to function correctly.
165        debug_assert!(
166            !self.table_paths.is_empty() || cfg!(test),
167            "ListingTableConfig::with_listing_options called without table_paths set. \
168             Consider calling new() or new_with_multi_paths() first to establish table paths."
169        );
170
171        Self {
172            options: Some(listing_options),
173            ..self
174        }
175    }
176
177    /// Returns a tuple of `(file_extension, optional compression_extension)`
178    ///
179    /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
180    /// For example a path ending with blah.test.csv returns `("csv", None)`
181    pub fn infer_file_extension_and_compression_type(
182        path: &str,
183    ) -> datafusion_common::Result<(String, Option<String>)> {
184        let mut exts = path.rsplit('.');
185
186        let split = exts.next().unwrap_or("");
187
188        let file_compression_type = FileCompressionType::from_str(split)
189            .unwrap_or(FileCompressionType::UNCOMPRESSED);
190
191        if file_compression_type.is_compressed() {
192            let split2 = exts.next().unwrap_or("");
193            Ok((split2.to_string(), Some(split.to_string())))
194        } else {
195            Ok((split.to_string(), None))
196        }
197    }
198
199    /// Infer the [`SchemaRef`] based on `table_path`s.
200    ///
201    /// This method infers the table schema using the first `table_path`.
202    /// See [`ListingOptions::infer_schema`] for more details
203    ///
204    /// # Errors
205    /// * if `self.options` is not set. See [`Self::with_listing_options`]
206    pub async fn infer_schema(
207        self,
208        state: &dyn Session,
209    ) -> datafusion_common::Result<Self> {
210        match self.options {
211            Some(options) => {
212                let ListingTableConfig {
213                    table_paths,
214                    file_schema,
215                    options: _,
216                    schema_source,
217                    expr_adapter_factory,
218                } = self;
219
220                let (schema, new_schema_source) = match file_schema {
221                    Some(schema) => (schema, schema_source), // Keep existing source if schema exists
222                    None => {
223                        if let Some(url) = table_paths.first() {
224                            (
225                                options.infer_schema(state, url).await?,
226                                SchemaSource::Inferred,
227                            )
228                        } else {
229                            (Arc::new(Schema::empty()), SchemaSource::Inferred)
230                        }
231                    }
232                };
233
234                Ok(Self {
235                    table_paths,
236                    file_schema: Some(schema),
237                    options: Some(options),
238                    schema_source: new_schema_source,
239                    expr_adapter_factory,
240                })
241            }
242            None => internal_err!("No `ListingOptions` set for inferring schema"),
243        }
244    }
245
246    /// Infer the partition columns from `table_paths`.
247    ///
248    /// # Errors
249    /// * if `self.options` is not set. See [`Self::with_listing_options`]
250    pub async fn infer_partitions_from_path(
251        self,
252        state: &dyn Session,
253    ) -> datafusion_common::Result<Self> {
254        match self.options {
255            Some(options) => {
256                let Some(url) = self.table_paths.first() else {
257                    return config_err!("No table path found");
258                };
259                let partitions = options
260                    .infer_partitions(state, url)
261                    .await?
262                    .into_iter()
263                    .map(|col_name| {
264                        (
265                            col_name,
266                            DataType::Dictionary(
267                                Box::new(DataType::UInt16),
268                                Box::new(DataType::Utf8),
269                            ),
270                        )
271                    })
272                    .collect::<Vec<_>>();
273                let options = options.with_table_partition_cols(partitions);
274                Ok(Self {
275                    table_paths: self.table_paths,
276                    file_schema: self.file_schema,
277                    options: Some(options),
278                    schema_source: self.schema_source,
279                    expr_adapter_factory: self.expr_adapter_factory,
280                })
281            }
282            None => config_err!("No `ListingOptions` set for inferring schema"),
283        }
284    }
285
286    /// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`]
287    ///
288    /// The expression adapter factory is used to create physical expression adapters that can
289    /// handle schema evolution and type conversions when evaluating expressions
290    /// with different schemas than the table schema.
291    pub fn with_expr_adapter_factory(
292        self,
293        expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
294    ) -> Self {
295        Self {
296            expr_adapter_factory: Some(expr_adapter_factory),
297            ..self
298        }
299    }
300
301    /// Deprecated: Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`]
302    ///
303    /// `SchemaAdapterFactory` has been removed. Use [`Self::with_expr_adapter_factory`]
304    /// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details.
305    ///
306    /// This method is a no-op and returns `self` unchanged.
307    #[deprecated(
308        since = "52.0.0",
309        note = "SchemaAdapterFactory has been removed. Use with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
310    )]
311    #[expect(deprecated)]
312    pub fn with_schema_adapter_factory(
313        self,
314        _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
315    ) -> Self {
316        // No-op - just return self unchanged
317        self
318    }
319}