datafusion_datasource/
file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common behaviors that every file format needs to implement
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Formatter;
23use std::sync::Arc;
24
25use crate::file_groups::FileGroupPartitioner;
26use crate::file_scan_config::FileScanConfig;
27use crate::file_stream::FileOpener;
28use crate::schema_adapter::SchemaAdapterFactory;
29use arrow::datatypes::SchemaRef;
30use datafusion_common::config::ConfigOptions;
31use datafusion_common::{not_impl_err, Result, Statistics};
32use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
33use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
34use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
35use datafusion_physical_plan::DisplayFormatType;
36
37use object_store::ObjectStore;
38
39/// Helper function to convert any type implementing FileSource to Arc<dyn FileSource>
40pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
41    Arc::new(source)
42}
43
44/// file format specific behaviors for elements in [`DataSource`]
45///
46/// See more details on specific implementations:
47/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
48/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
49/// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html)
50/// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html)
51/// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html)
52///
53/// [`DataSource`]: crate::source::DataSource
54pub trait FileSource: Send + Sync {
55    /// Creates a `dyn FileOpener` based on given parameters
56    fn create_file_opener(
57        &self,
58        object_store: Arc<dyn ObjectStore>,
59        base_config: &FileScanConfig,
60        partition: usize,
61    ) -> Arc<dyn FileOpener>;
62    /// Any
63    fn as_any(&self) -> &dyn Any;
64    /// Initialize new type with batch size configuration
65    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
66    /// Initialize new instance with a new schema
67    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
68    /// Initialize new instance with projection information
69    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
70    /// Initialize new instance with projected statistics
71    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
72    /// Returns the filter expression that will be applied during the file scan.
73    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
74        None
75    }
76    /// Return execution plan metrics
77    fn metrics(&self) -> &ExecutionPlanMetricsSet;
78    /// Return projected statistics
79    fn statistics(&self) -> Result<Statistics>;
80    /// String representation of file source such as "csv", "json", "parquet"
81    fn file_type(&self) -> &str;
82    /// Format FileType specific information
83    fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
84        Ok(())
85    }
86
87    /// If supported by the [`FileSource`], redistribute files across partitions
88    /// according to their size. Allows custom file formats to implement their
89    /// own repartitioning logic.
90    ///
91    /// The default implementation uses [`FileGroupPartitioner`]. See that
92    /// struct for more details.
93    fn repartitioned(
94        &self,
95        target_partitions: usize,
96        repartition_file_min_size: usize,
97        output_ordering: Option<LexOrdering>,
98        config: &FileScanConfig,
99    ) -> Result<Option<FileScanConfig>> {
100        if config.file_compression_type.is_compressed() || config.new_lines_in_values {
101            return Ok(None);
102        }
103
104        let repartitioned_file_groups_option = FileGroupPartitioner::new()
105            .with_target_partitions(target_partitions)
106            .with_repartition_file_min_size(repartition_file_min_size)
107            .with_preserve_order_within_groups(output_ordering.is_some())
108            .repartition_file_groups(&config.file_groups);
109
110        if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
111            let mut source = config.clone();
112            source.file_groups = repartitioned_file_groups;
113            return Ok(Some(source));
114        }
115        Ok(None)
116    }
117
118    /// Try to push down filters into this FileSource.
119    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
120    ///
121    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
122    fn try_pushdown_filters(
123        &self,
124        filters: Vec<Arc<dyn PhysicalExpr>>,
125        _config: &ConfigOptions,
126    ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
127        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
128            vec![PushedDown::No; filters.len()],
129        ))
130    }
131
132    /// Set optional schema adapter factory.
133    ///
134    /// [`SchemaAdapterFactory`] allows user to specify how fields from the
135    /// file get mapped to that of the table schema.  If you implement this
136    /// method, you should also implement [`schema_adapter_factory`].
137    ///
138    /// The default implementation returns a not implemented error.
139    ///
140    /// [`schema_adapter_factory`]: Self::schema_adapter_factory
141    fn with_schema_adapter_factory(
142        &self,
143        _factory: Arc<dyn SchemaAdapterFactory>,
144    ) -> Result<Arc<dyn FileSource>> {
145        not_impl_err!(
146            "FileSource {} does not support schema adapter factory",
147            self.file_type()
148        )
149    }
150
151    /// Returns the current schema adapter factory if set
152    ///
153    /// Default implementation returns `None`.
154    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
155        None
156    }
157}