Skip to main content

datafusion_datasource/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods for the various file formats
19//! See write.rs for write related helper methods
20
21use std::any::Any;
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::Arc;
25
26use crate::file::FileSource;
27use crate::file_compression_type::FileCompressionType;
28use crate::file_scan_config::FileScanConfig;
29use crate::file_sink_config::FileSinkConfig;
30
31use arrow::datatypes::SchemaRef;
32use datafusion_common::file_options::file_type::FileType;
33use datafusion_common::{GetExt, Result, Statistics, internal_err, not_impl_err};
34use datafusion_physical_expr::LexRequirement;
35use datafusion_physical_expr_common::sort_expr::LexOrdering;
36use datafusion_physical_plan::ExecutionPlan;
37use datafusion_session::Session;
38
39use async_trait::async_trait;
40use object_store::{ObjectMeta, ObjectStore};
41
42/// Default max records to scan to infer the schema
43pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
44
45/// Metadata fetched from a file, including statistics and ordering.
46///
47/// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to
48/// provide all metadata in a single read, avoiding duplicate I/O operations.
49#[derive(Debug, Clone)]
50#[non_exhaustive]
51pub struct FileMeta {
52    /// Statistics for the file (row counts, byte sizes, column statistics).
53    pub statistics: Statistics,
54    /// The ordering (sort order) of the file, if known.
55    pub ordering: Option<LexOrdering>,
56}
57
58impl FileMeta {
59    /// Creates a new `FileMeta` with the given statistics and no ordering.
60    pub fn new(statistics: Statistics) -> Self {
61        Self {
62            statistics,
63            ordering: None,
64        }
65    }
66
67    /// Sets the ordering for this file metadata.
68    pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
69        self.ordering = ordering;
70        self
71    }
72}
73
74/// This trait abstracts all the file format specific implementations
75/// from the [`TableProvider`]. This helps code re-utilization across
76/// providers that support the same file formats.
77///
78/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
79#[async_trait]
80pub trait FileFormat: Any + Send + Sync + fmt::Debug {
81    /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
82    fn get_ext(&self) -> String;
83
84    /// Returns the extension for this FileFormat when compressed, e.g. "file.csv.gz" -> csv
85    fn get_ext_with_compression(
86        &self,
87        _file_compression_type: &FileCompressionType,
88    ) -> Result<String>;
89
90    /// Returns whether this instance uses compression if applicable
91    fn compression_type(&self) -> Option<FileCompressionType>;
92
93    /// Infer the common schema of the provided objects. The objects will usually
94    /// be analysed up to a given number of records or files (as specified in the
95    /// format config) then give the estimated common schema. This might fail if
96    /// the files have schemas that cannot be merged.
97    async fn infer_schema(
98        &self,
99        state: &dyn Session,
100        store: &Arc<dyn ObjectStore>,
101        objects: &[ObjectMeta],
102    ) -> Result<SchemaRef>;
103
104    /// Infer the statistics for the provided object. The cost and accuracy of the
105    /// estimated statistics might vary greatly between file formats.
106    ///
107    /// `table_schema` is the (combined) schema of the overall table
108    /// and may be a superset of the schema contained in this file.
109    ///
110    /// TODO: should the file source return statistics for only columns referred to in the table schema?
111    async fn infer_stats(
112        &self,
113        state: &dyn Session,
114        store: &Arc<dyn ObjectStore>,
115        table_schema: SchemaRef,
116        object: &ObjectMeta,
117    ) -> Result<Statistics>;
118
119    /// Infer the ordering (sort order) for the provided object from file metadata.
120    ///
121    /// Returns `Ok(None)` if the file format does not support ordering inference
122    /// or if the file does not have ordering information.
123    ///
124    /// `table_schema` is the (combined) schema of the overall table
125    /// and may be a superset of the schema contained in this file.
126    ///
127    /// The default implementation returns `Ok(None)`.
128    async fn infer_ordering(
129        &self,
130        _state: &dyn Session,
131        _store: &Arc<dyn ObjectStore>,
132        _table_schema: SchemaRef,
133        _object: &ObjectMeta,
134    ) -> Result<Option<LexOrdering>> {
135        Ok(None)
136    }
137
138    /// Infer both statistics and ordering from a single metadata read.
139    ///
140    /// This is more efficient than calling [`Self::infer_stats`] and
141    /// [`Self::infer_ordering`] separately when both are needed, as it avoids
142    /// reading file metadata twice.
143    ///
144    /// The default implementation calls both methods separately. File formats
145    /// that can extract both from a single read should override this method.
146    async fn infer_stats_and_ordering(
147        &self,
148        state: &dyn Session,
149        store: &Arc<dyn ObjectStore>,
150        table_schema: SchemaRef,
151        object: &ObjectMeta,
152    ) -> Result<FileMeta> {
153        let statistics = self
154            .infer_stats(state, store, Arc::clone(&table_schema), object)
155            .await?;
156        let ordering = self
157            .infer_ordering(state, store, table_schema, object)
158            .await?;
159        Ok(FileMeta {
160            statistics,
161            ordering,
162        })
163    }
164
165    /// Take a list of files and convert it to the appropriate executor
166    /// according to this file format.
167    async fn create_physical_plan(
168        &self,
169        state: &dyn Session,
170        conf: FileScanConfig,
171    ) -> Result<Arc<dyn ExecutionPlan>>;
172
173    /// Take a list of files and the configuration to convert it to the
174    /// appropriate writer executor according to this file format.
175    async fn create_writer_physical_plan(
176        &self,
177        _input: Arc<dyn ExecutionPlan>,
178        _state: &dyn Session,
179        _conf: FileSinkConfig,
180        _order_requirements: Option<LexRequirement>,
181    ) -> Result<Arc<dyn ExecutionPlan>> {
182        not_impl_err!("Writer not implemented for this format")
183    }
184
185    /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
186    ///
187    /// # Arguments
188    /// * `table_schema` - The table schema to use for the FileSource (includes partition columns)
189    fn file_source(&self, table_schema: crate::TableSchema) -> Arc<dyn FileSource>;
190}
191
192impl dyn FileFormat {
193    pub fn is<T: FileFormat>(&self) -> bool {
194        (self as &dyn Any).is::<T>()
195    }
196
197    pub fn downcast_ref<T: FileFormat>(&self) -> Option<&T> {
198        (self as &dyn Any).downcast_ref()
199    }
200}
201
202/// Factory for creating [`FileFormat`] instances based on session and command level options
203///
204/// Users can provide their own `FileFormatFactory` to support arbitrary file formats
205pub trait FileFormatFactory: Any + Sync + Send + GetExt + fmt::Debug {
206    /// Initialize a [FileFormat] and configure based on session and command level options
207    fn create(
208        &self,
209        state: &dyn Session,
210        format_options: &HashMap<String, String>,
211    ) -> Result<Arc<dyn FileFormat>>;
212
213    /// Initialize a [FileFormat] with all options set to default values
214    fn default(&self) -> Arc<dyn FileFormat>;
215}
216
217impl dyn FileFormatFactory {
218    pub fn is<T: FileFormatFactory>(&self) -> bool {
219        (self as &dyn Any).is::<T>()
220    }
221
222    pub fn downcast_ref<T: FileFormatFactory>(&self) -> Option<&T> {
223        (self as &dyn Any).downcast_ref()
224    }
225}
226
227/// A container of [FileFormatFactory] which also implements [FileType].
228/// This enables converting a dyn FileFormat to a dyn FileType.
229/// The former trait is a superset of the latter trait, which includes execution time
230/// relevant methods. [FileType] is only used in logical planning and only implements
231/// the subset of methods required during logical planning.
232#[derive(Debug)]
233pub struct DefaultFileType {
234    file_format_factory: Arc<dyn FileFormatFactory>,
235}
236
237impl DefaultFileType {
238    /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
239    pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
240        Self {
241            file_format_factory,
242        }
243    }
244
245    /// get a reference to the inner [FileFormatFactory] struct
246    pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
247        &self.file_format_factory
248    }
249}
250
251impl FileType for DefaultFileType {
252    fn as_any(&self) -> &dyn Any {
253        self
254    }
255}
256
257impl fmt::Display for DefaultFileType {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        write!(f, "{:?}", self.file_format_factory)
260    }
261}
262
263impl GetExt for DefaultFileType {
264    fn get_ext(&self) -> String {
265        self.file_format_factory.get_ext()
266    }
267}
268
269/// Converts a [FileFormatFactory] to a [FileType]
270pub fn format_as_file_type(
271    file_format_factory: Arc<dyn FileFormatFactory>,
272) -> Arc<dyn FileType> {
273    Arc::new(DefaultFileType {
274        file_format_factory,
275    })
276}
277
278/// Converts a [FileType] to a [FileFormatFactory].
279/// Returns an error if the [FileType] cannot be
280/// downcasted to a [DefaultFileType].
281pub fn file_type_to_format(
282    file_type: &Arc<dyn FileType>,
283) -> Result<Arc<dyn FileFormatFactory>> {
284    match file_type
285        .as_ref()
286        .as_any()
287        .downcast_ref::<DefaultFileType>()
288    {
289        Some(source) => Ok(Arc::clone(&source.file_format_factory)),
290        _ => internal_err!("FileType was not DefaultFileType"),
291    }
292}