datafusion_datasource/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods for the various file formats
19//! See write.rs for write related helper methods
20
21use std::any::Any;
22use std::collections::HashMap;
23use std::fmt;
24use std::sync::Arc;
25
26use crate::file::FileSource;
27use crate::file_compression_type::FileCompressionType;
28use crate::file_scan_config::FileScanConfig;
29use crate::file_sink_config::FileSinkConfig;
30
31use arrow::datatypes::SchemaRef;
32use datafusion_common::file_options::file_type::FileType;
33use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics};
34use datafusion_physical_expr::LexRequirement;
35use datafusion_physical_plan::ExecutionPlan;
36use datafusion_session::Session;
37
38use async_trait::async_trait;
39use object_store::{ObjectMeta, ObjectStore};
40
41/// Default max records to scan to infer the schema
42pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
43
44/// This trait abstracts all the file format specific implementations
45/// from the [`TableProvider`]. This helps code re-utilization across
46/// providers that support the same file formats.
47///
48/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
49#[async_trait]
50pub trait FileFormat: Send + Sync + fmt::Debug {
51    /// Returns the table provider as [`Any`](std::any::Any) so that it can be
52    /// downcast to a specific implementation.
53    fn as_any(&self) -> &dyn Any;
54
55    /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
56    fn get_ext(&self) -> String;
57
58    /// Returns the extension for this FileFormat when compressed, e.g. "file.csv.gz" -> csv
59    fn get_ext_with_compression(
60        &self,
61        _file_compression_type: &FileCompressionType,
62    ) -> Result<String>;
63
64    /// Returns whether this instance uses compression if applicable
65    fn compression_type(&self) -> Option<FileCompressionType>;
66
67    /// Infer the common schema of the provided objects. The objects will usually
68    /// be analysed up to a given number of records or files (as specified in the
69    /// format config) then give the estimated common schema. This might fail if
70    /// the files have schemas that cannot be merged.
71    async fn infer_schema(
72        &self,
73        state: &dyn Session,
74        store: &Arc<dyn ObjectStore>,
75        objects: &[ObjectMeta],
76    ) -> Result<SchemaRef>;
77
78    /// Infer the statistics for the provided object. The cost and accuracy of the
79    /// estimated statistics might vary greatly between file formats.
80    ///
81    /// `table_schema` is the (combined) schema of the overall table
82    /// and may be a superset of the schema contained in this file.
83    ///
84    /// TODO: should the file source return statistics for only columns referred to in the table schema?
85    async fn infer_stats(
86        &self,
87        state: &dyn Session,
88        store: &Arc<dyn ObjectStore>,
89        table_schema: SchemaRef,
90        object: &ObjectMeta,
91    ) -> Result<Statistics>;
92
93    /// Take a list of files and convert it to the appropriate executor
94    /// according to this file format.
95    async fn create_physical_plan(
96        &self,
97        state: &dyn Session,
98        conf: FileScanConfig,
99    ) -> Result<Arc<dyn ExecutionPlan>>;
100
101    /// Take a list of files and the configuration to convert it to the
102    /// appropriate writer executor according to this file format.
103    async fn create_writer_physical_plan(
104        &self,
105        _input: Arc<dyn ExecutionPlan>,
106        _state: &dyn Session,
107        _conf: FileSinkConfig,
108        _order_requirements: Option<LexRequirement>,
109    ) -> Result<Arc<dyn ExecutionPlan>> {
110        not_impl_err!("Writer not implemented for this format")
111    }
112
113    /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
114    fn file_source(&self) -> Arc<dyn FileSource>;
115}
116
117/// Factory for creating [`FileFormat`] instances based on session and command level options
118///
119/// Users can provide their own `FileFormatFactory` to support arbitrary file formats
120pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug {
121    /// Initialize a [FileFormat] and configure based on session and command level options
122    fn create(
123        &self,
124        state: &dyn Session,
125        format_options: &HashMap<String, String>,
126    ) -> Result<Arc<dyn FileFormat>>;
127
128    /// Initialize a [FileFormat] with all options set to default values
129    fn default(&self) -> Arc<dyn FileFormat>;
130
131    /// Returns the table source as [`Any`] so that it can be
132    /// downcast to a specific implementation.
133    fn as_any(&self) -> &dyn Any;
134}
135
136/// A container of [FileFormatFactory] which also implements [FileType].
137/// This enables converting a dyn FileFormat to a dyn FileType.
138/// The former trait is a superset of the latter trait, which includes execution time
139/// relevant methods. [FileType] is only used in logical planning and only implements
140/// the subset of methods required during logical planning.
141#[derive(Debug)]
142pub struct DefaultFileType {
143    file_format_factory: Arc<dyn FileFormatFactory>,
144}
145
146impl DefaultFileType {
147    /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
148    pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
149        Self {
150            file_format_factory,
151        }
152    }
153
154    /// get a reference to the inner [FileFormatFactory] struct
155    pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
156        &self.file_format_factory
157    }
158}
159
160impl FileType for DefaultFileType {
161    fn as_any(&self) -> &dyn Any {
162        self
163    }
164}
165
166impl fmt::Display for DefaultFileType {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        write!(f, "{:?}", self.file_format_factory)
169    }
170}
171
172impl GetExt for DefaultFileType {
173    fn get_ext(&self) -> String {
174        self.file_format_factory.get_ext()
175    }
176}
177
178/// Converts a [FileFormatFactory] to a [FileType]
179pub fn format_as_file_type(
180    file_format_factory: Arc<dyn FileFormatFactory>,
181) -> Arc<dyn FileType> {
182    Arc::new(DefaultFileType {
183        file_format_factory,
184    })
185}
186
187/// Converts a [FileType] to a [FileFormatFactory].
188/// Returns an error if the [FileType] cannot be
189/// downcasted to a [DefaultFileType].
190pub fn file_type_to_format(
191    file_type: &Arc<dyn FileType>,
192) -> Result<Arc<dyn FileFormatFactory>> {
193    match file_type
194        .as_ref()
195        .as_any()
196        .downcast_ref::<DefaultFileType>()
197    {
198        Some(source) => Ok(Arc::clone(&source.file_format_factory)),
199        _ => internal_err!("FileType was not DefaultFileType"),
200    }
201}