datafusion_datasource/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods for the various file formats
19//! See write.rs for write related helper methods
20
21use std::any::Any;
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::Arc;
25
26use crate::file::FileSource;
27use crate::file_compression_type::FileCompressionType;
28use crate::file_scan_config::FileScanConfig;
29use crate::file_sink_config::FileSinkConfig;
30
31use arrow::datatypes::{Schema, SchemaRef};
32use async_trait::async_trait;
33use datafusion_catalog::Session;
34use datafusion_common::file_options::file_type::FileType;
35use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics};
36use datafusion_expr::Expr;
37use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
38use datafusion_physical_plan::ExecutionPlan;
39
40use object_store::{ObjectMeta, ObjectStore};
41
42/// Default max records to scan to infer the schema
43pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
44
45/// This trait abstracts all the file format specific implementations
46/// from the [`TableProvider`]. This helps code re-utilization across
47/// providers that support the same file formats.
48///
49/// [`TableProvider`]: datafusion_catalog::TableProvider
50#[async_trait]
51pub trait FileFormat: Send + Sync + fmt::Debug {
52    /// Returns the table provider as [`Any`](std::any::Any) so that it can be
53    /// downcast to a specific implementation.
54    fn as_any(&self) -> &dyn Any;
55
56    /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
57    fn get_ext(&self) -> String;
58
59    /// Returns the extension for this FileFormat when compressed, e.g. "file.csv.gz" -> csv
60    fn get_ext_with_compression(
61        &self,
62        _file_compression_type: &FileCompressionType,
63    ) -> Result<String>;
64
65    /// Infer the common schema of the provided objects. The objects will usually
66    /// be analysed up to a given number of records or files (as specified in the
67    /// format config) then give the estimated common schema. This might fail if
68    /// the files have schemas that cannot be merged.
69    async fn infer_schema(
70        &self,
71        state: &dyn Session,
72        store: &Arc<dyn ObjectStore>,
73        objects: &[ObjectMeta],
74    ) -> Result<SchemaRef>;
75
76    /// Infer the statistics for the provided object. The cost and accuracy of the
77    /// estimated statistics might vary greatly between file formats.
78    ///
79    /// `table_schema` is the (combined) schema of the overall table
80    /// and may be a superset of the schema contained in this file.
81    ///
82    /// TODO: should the file source return statistics for only columns referred to in the table schema?
83    async fn infer_stats(
84        &self,
85        state: &dyn Session,
86        store: &Arc<dyn ObjectStore>,
87        table_schema: SchemaRef,
88        object: &ObjectMeta,
89    ) -> Result<Statistics>;
90
91    /// Take a list of files and convert it to the appropriate executor
92    /// according to this file format.
93    async fn create_physical_plan(
94        &self,
95        state: &dyn Session,
96        conf: FileScanConfig,
97        filters: Option<&Arc<dyn PhysicalExpr>>,
98    ) -> Result<Arc<dyn ExecutionPlan>>;
99
100    /// Take a list of files and the configuration to convert it to the
101    /// appropriate writer executor according to this file format.
102    async fn create_writer_physical_plan(
103        &self,
104        _input: Arc<dyn ExecutionPlan>,
105        _state: &dyn Session,
106        _conf: FileSinkConfig,
107        _order_requirements: Option<LexRequirement>,
108    ) -> Result<Arc<dyn ExecutionPlan>> {
109        not_impl_err!("Writer not implemented for this format")
110    }
111
112    /// Check if the specified file format has support for pushing down the provided filters within
113    /// the given schemas. Added initially to support the Parquet file format's ability to do this.
114    fn supports_filters_pushdown(
115        &self,
116        _file_schema: &Schema,
117        _table_schema: &Schema,
118        _filters: &[&Expr],
119    ) -> Result<FilePushdownSupport> {
120        Ok(FilePushdownSupport::NoSupport)
121    }
122
123    /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
124    fn file_source(&self) -> Arc<dyn FileSource>;
125}
126
127/// An enum to distinguish between different states when determining if certain filters can be
128/// pushed down to file scanning
129#[derive(Debug, PartialEq)]
130pub enum FilePushdownSupport {
131    /// The file format/system being asked does not support any sort of pushdown. This should be
132    /// used even if the file format theoretically supports some sort of pushdown, but it's not
133    /// enabled or implemented yet.
134    NoSupport,
135    /// The file format/system being asked *does* support pushdown, but it can't make it work for
136    /// the provided filter/expression
137    NotSupportedForFilter,
138    /// The file format/system being asked *does* support pushdown and *can* make it work for the
139    /// provided filter/expression
140    Supported,
141}
142
143/// Factory for creating [`FileFormat`] instances based on session and command level options
144///
145/// Users can provide their own `FileFormatFactory` to support arbitrary file formats
146pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug {
147    /// Initialize a [FileFormat] and configure based on session and command level options
148    fn create(
149        &self,
150        state: &dyn Session,
151        format_options: &HashMap<String, String>,
152    ) -> Result<Arc<dyn FileFormat>>;
153
154    /// Initialize a [FileFormat] with all options set to default values
155    fn default(&self) -> Arc<dyn FileFormat>;
156
157    /// Returns the table source as [`Any`] so that it can be
158    /// downcast to a specific implementation.
159    fn as_any(&self) -> &dyn Any;
160}
161
162/// A container of [FileFormatFactory] which also implements [FileType].
163/// This enables converting a dyn FileFormat to a dyn FileType.
164/// The former trait is a superset of the latter trait, which includes execution time
165/// relevant methods. [FileType] is only used in logical planning and only implements
166/// the subset of methods required during logical planning.
167#[derive(Debug)]
168pub struct DefaultFileType {
169    file_format_factory: Arc<dyn FileFormatFactory>,
170}
171
172impl DefaultFileType {
173    /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
174    pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
175        Self {
176            file_format_factory,
177        }
178    }
179
180    /// get a reference to the inner [FileFormatFactory] struct
181    pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
182        &self.file_format_factory
183    }
184}
185
186impl FileType for DefaultFileType {
187    fn as_any(&self) -> &dyn Any {
188        self
189    }
190}
191
192impl fmt::Display for DefaultFileType {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        write!(f, "{:?}", self.file_format_factory)
195    }
196}
197
198impl GetExt for DefaultFileType {
199    fn get_ext(&self) -> String {
200        self.file_format_factory.get_ext()
201    }
202}
203
204/// Converts a [FileFormatFactory] to a [FileType]
205pub fn format_as_file_type(
206    file_format_factory: Arc<dyn FileFormatFactory>,
207) -> Arc<dyn FileType> {
208    Arc::new(DefaultFileType {
209        file_format_factory,
210    })
211}
212
213/// Converts a [FileType] to a [FileFormatFactory].
214/// Returns an error if the [FileType] cannot be
215/// downcasted to a [DefaultFileType].
216pub fn file_type_to_format(
217    file_type: &Arc<dyn FileType>,
218) -> Result<Arc<dyn FileFormatFactory>> {
219    match file_type
220        .as_ref()
221        .as_any()
222        .downcast_ref::<DefaultFileType>()
223    {
224        Some(source) => Ok(Arc::clone(&source.file_format_factory)),
225        _ => internal_err!("FileType was not DefaultFileType"),
226    }
227}