datafusion_datasource/file_format.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods for the various file formats
19//! See write.rs for write related helper methods
20
21use std::any::Any;
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::Arc;
25
26use crate::file::FileSource;
27use crate::file_compression_type::FileCompressionType;
28use crate::file_scan_config::FileScanConfig;
29use crate::file_sink_config::FileSinkConfig;
30
31use arrow::datatypes::SchemaRef;
32use datafusion_common::file_options::file_type::FileType;
33use datafusion_common::{GetExt, Result, Statistics, internal_err, not_impl_err};
34use datafusion_physical_expr::LexRequirement;
35use datafusion_physical_expr_common::sort_expr::LexOrdering;
36use datafusion_physical_plan::ExecutionPlan;
37use datafusion_session::Session;
38
39use async_trait::async_trait;
40use object_store::{ObjectMeta, ObjectStore};
41
42/// Default max records to scan to infer the schema
43pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
44
45/// Metadata fetched from a file, including statistics and ordering.
46///
47/// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to
48/// provide all metadata in a single read, avoiding duplicate I/O operations.
49#[derive(Debug, Clone)]
50#[non_exhaustive]
51pub struct FileMeta {
52 /// Statistics for the file (row counts, byte sizes, column statistics).
53 pub statistics: Statistics,
54 /// The ordering (sort order) of the file, if known.
55 pub ordering: Option<LexOrdering>,
56}
57
58impl FileMeta {
59 /// Creates a new `FileMeta` with the given statistics and no ordering.
60 pub fn new(statistics: Statistics) -> Self {
61 Self {
62 statistics,
63 ordering: None,
64 }
65 }
66
67 /// Sets the ordering for this file metadata.
68 pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
69 self.ordering = ordering;
70 self
71 }
72}
73
74/// This trait abstracts all the file format specific implementations
75/// from the [`TableProvider`]. This helps code re-utilization across
76/// providers that support the same file formats.
77///
78/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
79#[async_trait]
80pub trait FileFormat: Send + Sync + fmt::Debug {
81 /// Returns the table provider as [`Any`] so that it can be
82 /// downcast to a specific implementation.
83 fn as_any(&self) -> &dyn Any;
84
85 /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
86 fn get_ext(&self) -> String;
87
88 /// Returns the extension for this FileFormat when compressed, e.g. "file.csv.gz" -> csv
89 fn get_ext_with_compression(
90 &self,
91 _file_compression_type: &FileCompressionType,
92 ) -> Result<String>;
93
94 /// Returns whether this instance uses compression if applicable
95 fn compression_type(&self) -> Option<FileCompressionType>;
96
97 /// Infer the common schema of the provided objects. The objects will usually
98 /// be analysed up to a given number of records or files (as specified in the
99 /// format config) then give the estimated common schema. This might fail if
100 /// the files have schemas that cannot be merged.
101 async fn infer_schema(
102 &self,
103 state: &dyn Session,
104 store: &Arc<dyn ObjectStore>,
105 objects: &[ObjectMeta],
106 ) -> Result<SchemaRef>;
107
108 /// Infer the statistics for the provided object. The cost and accuracy of the
109 /// estimated statistics might vary greatly between file formats.
110 ///
111 /// `table_schema` is the (combined) schema of the overall table
112 /// and may be a superset of the schema contained in this file.
113 ///
114 /// TODO: should the file source return statistics for only columns referred to in the table schema?
115 async fn infer_stats(
116 &self,
117 state: &dyn Session,
118 store: &Arc<dyn ObjectStore>,
119 table_schema: SchemaRef,
120 object: &ObjectMeta,
121 ) -> Result<Statistics>;
122
123 /// Infer the ordering (sort order) for the provided object from file metadata.
124 ///
125 /// Returns `Ok(None)` if the file format does not support ordering inference
126 /// or if the file does not have ordering information.
127 ///
128 /// `table_schema` is the (combined) schema of the overall table
129 /// and may be a superset of the schema contained in this file.
130 ///
131 /// The default implementation returns `Ok(None)`.
132 async fn infer_ordering(
133 &self,
134 _state: &dyn Session,
135 _store: &Arc<dyn ObjectStore>,
136 _table_schema: SchemaRef,
137 _object: &ObjectMeta,
138 ) -> Result<Option<LexOrdering>> {
139 Ok(None)
140 }
141
142 /// Infer both statistics and ordering from a single metadata read.
143 ///
144 /// This is more efficient than calling [`Self::infer_stats`] and
145 /// [`Self::infer_ordering`] separately when both are needed, as it avoids
146 /// reading file metadata twice.
147 ///
148 /// The default implementation calls both methods separately. File formats
149 /// that can extract both from a single read should override this method.
150 async fn infer_stats_and_ordering(
151 &self,
152 state: &dyn Session,
153 store: &Arc<dyn ObjectStore>,
154 table_schema: SchemaRef,
155 object: &ObjectMeta,
156 ) -> Result<FileMeta> {
157 let statistics = self
158 .infer_stats(state, store, Arc::clone(&table_schema), object)
159 .await?;
160 let ordering = self
161 .infer_ordering(state, store, table_schema, object)
162 .await?;
163 Ok(FileMeta {
164 statistics,
165 ordering,
166 })
167 }
168
169 /// Take a list of files and convert it to the appropriate executor
170 /// according to this file format.
171 async fn create_physical_plan(
172 &self,
173 state: &dyn Session,
174 conf: FileScanConfig,
175 ) -> Result<Arc<dyn ExecutionPlan>>;
176
177 /// Take a list of files and the configuration to convert it to the
178 /// appropriate writer executor according to this file format.
179 async fn create_writer_physical_plan(
180 &self,
181 _input: Arc<dyn ExecutionPlan>,
182 _state: &dyn Session,
183 _conf: FileSinkConfig,
184 _order_requirements: Option<LexRequirement>,
185 ) -> Result<Arc<dyn ExecutionPlan>> {
186 not_impl_err!("Writer not implemented for this format")
187 }
188
189 /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
190 ///
191 /// # Arguments
192 /// * `table_schema` - The table schema to use for the FileSource (includes partition columns)
193 fn file_source(&self, table_schema: crate::TableSchema) -> Arc<dyn FileSource>;
194}
195
196/// Factory for creating [`FileFormat`] instances based on session and command level options
197///
198/// Users can provide their own `FileFormatFactory` to support arbitrary file formats
199pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug {
200 /// Initialize a [FileFormat] and configure based on session and command level options
201 fn create(
202 &self,
203 state: &dyn Session,
204 format_options: &HashMap<String, String>,
205 ) -> Result<Arc<dyn FileFormat>>;
206
207 /// Initialize a [FileFormat] with all options set to default values
208 fn default(&self) -> Arc<dyn FileFormat>;
209
210 /// Returns the table source as [`Any`] so that it can be
211 /// downcast to a specific implementation.
212 fn as_any(&self) -> &dyn Any;
213}
214
215/// A container of [FileFormatFactory] which also implements [FileType].
216/// This enables converting a dyn FileFormat to a dyn FileType.
217/// The former trait is a superset of the latter trait, which includes execution time
218/// relevant methods. [FileType] is only used in logical planning and only implements
219/// the subset of methods required during logical planning.
220#[derive(Debug)]
221pub struct DefaultFileType {
222 file_format_factory: Arc<dyn FileFormatFactory>,
223}
224
225impl DefaultFileType {
226 /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
227 pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
228 Self {
229 file_format_factory,
230 }
231 }
232
233 /// get a reference to the inner [FileFormatFactory] struct
234 pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
235 &self.file_format_factory
236 }
237}
238
239impl FileType for DefaultFileType {
240 fn as_any(&self) -> &dyn Any {
241 self
242 }
243}
244
245impl fmt::Display for DefaultFileType {
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 write!(f, "{:?}", self.file_format_factory)
248 }
249}
250
251impl GetExt for DefaultFileType {
252 fn get_ext(&self) -> String {
253 self.file_format_factory.get_ext()
254 }
255}
256
257/// Converts a [FileFormatFactory] to a [FileType]
258pub fn format_as_file_type(
259 file_format_factory: Arc<dyn FileFormatFactory>,
260) -> Arc<dyn FileType> {
261 Arc::new(DefaultFileType {
262 file_format_factory,
263 })
264}
265
266/// Converts a [FileType] to a [FileFormatFactory].
267/// Returns an error if the [FileType] cannot be
268/// downcasted to a [DefaultFileType].
269pub fn file_type_to_format(
270 file_type: &Arc<dyn FileType>,
271) -> Result<Arc<dyn FileFormatFactory>> {
272 match file_type
273 .as_ref()
274 .as_any()
275 .downcast_ref::<DefaultFileType>()
276 {
277 Some(source) => Ok(Arc::clone(&source.file_format_factory)),
278 _ => internal_err!("FileType was not DefaultFileType"),
279 }
280}