datafusion_catalog_listing/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::options::ListingOptions;
19use arrow::datatypes::{DataType, Schema, SchemaRef};
20use datafusion_catalog::Session;
21use datafusion_common::{config_err, internal_err};
22use datafusion_datasource::ListingTableUrl;
23use datafusion_datasource::file_compression_type::FileCompressionType;
24#[expect(deprecated)]
25use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
26use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
27use std::str::FromStr;
28use std::sync::Arc;
29
30/// Indicates the source of the schema for a [`crate::ListingTable`]
31// PartialEq required for assert_eq! in tests
32#[derive(Debug, Clone, Copy, PartialEq, Default)]
33pub enum SchemaSource {
34 /// Schema is not yet set (initial state)
35 #[default]
36 Unset,
37 /// Schema was inferred from first table_path
38 Inferred,
39 /// Schema was specified explicitly via with_schema
40 Specified,
41}
42
43/// Configuration for creating a [`crate::ListingTable`]
44///
45/// # Schema Evolution Support
46///
47/// This configuration supports schema evolution through the optional
48/// [`PhysicalExprAdapterFactory`]. You might want to override the default factory when you need:
49///
50/// - **Type coercion requirements**: When you need custom logic for converting between
51/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
52/// - **Column mapping**: You need to map columns with a legacy name to a new name
53/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`.
54#[derive(Debug, Clone, Default)]
55pub struct ListingTableConfig {
56 /// Paths on the `ObjectStore` for creating [`crate::ListingTable`].
57 /// They should share the same schema and object store.
58 pub table_paths: Vec<ListingTableUrl>,
59 /// Optional `SchemaRef` for the to be created [`crate::ListingTable`].
60 ///
61 /// See details on [`ListingTableConfig::with_schema`]
62 pub file_schema: Option<SchemaRef>,
63 /// Optional [`ListingOptions`] for the to be created [`crate::ListingTable`].
64 ///
65 /// See details on [`ListingTableConfig::with_listing_options`]
66 pub options: Option<ListingOptions>,
67 /// Tracks the source of the schema information
68 pub(crate) schema_source: SchemaSource,
69 /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
70 pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
71}
72
73impl ListingTableConfig {
74 /// Creates new [`ListingTableConfig`] for reading the specified URL
75 pub fn new(table_path: ListingTableUrl) -> Self {
76 Self {
77 table_paths: vec![table_path],
78 ..Default::default()
79 }
80 }
81
82 /// Creates new [`ListingTableConfig`] with multiple table paths.
83 ///
84 /// See `ListingTableConfigExt::infer_options` for details on what happens with multiple paths
85 pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
86 Self {
87 table_paths,
88 ..Default::default()
89 }
90 }
91
92 /// Returns the source of the schema for this configuration
93 pub fn schema_source(&self) -> SchemaSource {
94 self.schema_source
95 }
96 /// Set the `schema` for the overall [`crate::ListingTable`]
97 ///
98 /// [`crate::ListingTable`] will automatically coerce, when possible, the schema
99 /// for individual files to match this schema.
100 ///
101 /// If a schema is not provided, it is inferred using
102 /// [`Self::infer_schema`].
103 ///
104 /// If the schema is provided, it must contain only the fields in the file
105 /// without the table partitioning columns.
106 ///
107 /// # Example: Specifying Table Schema
108 /// ```rust
109 /// # use std::sync::Arc;
110 /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
111 /// # use datafusion_datasource::ListingTableUrl;
112 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
113 /// # use arrow::datatypes::{Schema, Field, DataType};
114 /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
115 /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
116 /// let schema = Arc::new(Schema::new(vec![
117 /// Field::new("id", DataType::Int64, false),
118 /// Field::new("name", DataType::Utf8, true),
119 /// ]));
120 ///
121 /// let config = ListingTableConfig::new(table_paths)
122 /// .with_listing_options(listing_options) // Set options first
123 /// .with_schema(schema); // Then set schema
124 /// ```
125 pub fn with_schema(self, schema: SchemaRef) -> Self {
126 // Note: We preserve existing options state, but downstream code may expect
127 // options to be set. Consider calling with_listing_options() or infer_options()
128 // before operations that require options to be present.
129 debug_assert!(
130 self.options.is_some() || cfg!(test),
131 "ListingTableConfig::with_schema called without options set. \
132 Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code."
133 );
134
135 Self {
136 file_schema: Some(schema),
137 schema_source: SchemaSource::Specified,
138 ..self
139 }
140 }
141
142 /// Add `listing_options` to [`ListingTableConfig`]
143 ///
144 /// If not provided, format and other options are inferred via
145 /// `ListingTableConfigExt::infer_options`.
146 ///
147 /// # Example: Configuring Parquet Files with Custom Options
148 /// ```rust
149 /// # use std::sync::Arc;
150 /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
151 /// # use datafusion_datasource::ListingTableUrl;
152 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
153 /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
154 /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
155 /// .with_file_extension(".parquet")
156 /// .with_collect_stat(true);
157 ///
158 /// let config = ListingTableConfig::new(table_paths).with_listing_options(options);
159 /// // Configure file format and options
160 /// ```
161 pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
162 // Note: This method properly sets options, but be aware that downstream
163 // methods like infer_schema() and try_new() require both schema and options
164 // to be set to function correctly.
165 debug_assert!(
166 !self.table_paths.is_empty() || cfg!(test),
167 "ListingTableConfig::with_listing_options called without table_paths set. \
168 Consider calling new() or new_with_multi_paths() first to establish table paths."
169 );
170
171 Self {
172 options: Some(listing_options),
173 ..self
174 }
175 }
176
177 /// Returns a tuple of `(file_extension, optional compression_extension)`
178 ///
179 /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
180 /// For example a path ending with blah.test.csv returns `("csv", None)`
181 pub fn infer_file_extension_and_compression_type(
182 path: &str,
183 ) -> datafusion_common::Result<(String, Option<String>)> {
184 let mut exts = path.rsplit('.');
185
186 let split = exts.next().unwrap_or("");
187
188 let file_compression_type = FileCompressionType::from_str(split)
189 .unwrap_or(FileCompressionType::UNCOMPRESSED);
190
191 if file_compression_type.is_compressed() {
192 let split2 = exts.next().unwrap_or("");
193 Ok((split2.to_string(), Some(split.to_string())))
194 } else {
195 Ok((split.to_string(), None))
196 }
197 }
198
199 /// Infer the [`SchemaRef`] based on `table_path`s.
200 ///
201 /// This method infers the table schema using the first `table_path`.
202 /// See [`ListingOptions::infer_schema`] for more details
203 ///
204 /// # Errors
205 /// * if `self.options` is not set. See [`Self::with_listing_options`]
206 pub async fn infer_schema(
207 self,
208 state: &dyn Session,
209 ) -> datafusion_common::Result<Self> {
210 match self.options {
211 Some(options) => {
212 let ListingTableConfig {
213 table_paths,
214 file_schema,
215 options: _,
216 schema_source,
217 expr_adapter_factory,
218 } = self;
219
220 let (schema, new_schema_source) = match file_schema {
221 Some(schema) => (schema, schema_source), // Keep existing source if schema exists
222 None => {
223 if let Some(url) = table_paths.first() {
224 (
225 options.infer_schema(state, url).await?,
226 SchemaSource::Inferred,
227 )
228 } else {
229 (Arc::new(Schema::empty()), SchemaSource::Inferred)
230 }
231 }
232 };
233
234 Ok(Self {
235 table_paths,
236 file_schema: Some(schema),
237 options: Some(options),
238 schema_source: new_schema_source,
239 expr_adapter_factory,
240 })
241 }
242 None => internal_err!("No `ListingOptions` set for inferring schema"),
243 }
244 }
245
246 /// Infer the partition columns from `table_paths`.
247 ///
248 /// # Errors
249 /// * if `self.options` is not set. See [`Self::with_listing_options`]
250 pub async fn infer_partitions_from_path(
251 self,
252 state: &dyn Session,
253 ) -> datafusion_common::Result<Self> {
254 match self.options {
255 Some(options) => {
256 let Some(url) = self.table_paths.first() else {
257 return config_err!("No table path found");
258 };
259 let partitions = options
260 .infer_partitions(state, url)
261 .await?
262 .into_iter()
263 .map(|col_name| {
264 (
265 col_name,
266 DataType::Dictionary(
267 Box::new(DataType::UInt16),
268 Box::new(DataType::Utf8),
269 ),
270 )
271 })
272 .collect::<Vec<_>>();
273 let options = options.with_table_partition_cols(partitions);
274 Ok(Self {
275 table_paths: self.table_paths,
276 file_schema: self.file_schema,
277 options: Some(options),
278 schema_source: self.schema_source,
279 expr_adapter_factory: self.expr_adapter_factory,
280 })
281 }
282 None => config_err!("No `ListingOptions` set for inferring schema"),
283 }
284 }
285
286 /// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`]
287 ///
288 /// The expression adapter factory is used to create physical expression adapters that can
289 /// handle schema evolution and type conversions when evaluating expressions
290 /// with different schemas than the table schema.
291 pub fn with_expr_adapter_factory(
292 self,
293 expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
294 ) -> Self {
295 Self {
296 expr_adapter_factory: Some(expr_adapter_factory),
297 ..self
298 }
299 }
300
301 /// Deprecated: Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`]
302 ///
303 /// `SchemaAdapterFactory` has been removed. Use [`Self::with_expr_adapter_factory`]
304 /// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details.
305 ///
306 /// This method is a no-op and returns `self` unchanged.
307 #[deprecated(
308 since = "52.0.0",
309 note = "SchemaAdapterFactory has been removed. Use with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
310 )]
311 #[expect(deprecated)]
312 pub fn with_schema_adapter_factory(
313 self,
314 _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
315 ) -> Self {
316 // No-op - just return self unchanged
317 self
318 }
319}