datafusion_datasource/
file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common behaviors that every file format needs to implement
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Formatter;
23use std::sync::Arc;
24
25use crate::file_groups::FileGroupPartitioner;
26use crate::file_scan_config::FileScanConfig;
27use crate::file_stream::FileOpener;
28#[expect(deprecated)]
29use crate::schema_adapter::SchemaAdapterFactory;
30use datafusion_common::config::ConfigOptions;
31use datafusion_common::{Result, not_impl_err};
32use datafusion_physical_expr::projection::ProjectionExprs;
33use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
34use datafusion_physical_plan::DisplayFormatType;
35use datafusion_physical_plan::SortOrderPushdownResult;
36use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
37use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
38
39use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
40use object_store::ObjectStore;
41
42/// Helper function to convert any type implementing FileSource to Arc<dyn FileSource>
43pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
44    Arc::new(source)
45}
46
47/// file format specific behaviors for elements in [`DataSource`]
48///
49/// See more details on specific implementations:
50/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
51/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
52/// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html)
53/// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html)
54/// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html)
55///
56/// [`DataSource`]: crate::source::DataSource
57pub trait FileSource: Send + Sync {
58    /// Creates a `dyn FileOpener` based on given parameters
59    fn create_file_opener(
60        &self,
61        object_store: Arc<dyn ObjectStore>,
62        base_config: &FileScanConfig,
63        partition: usize,
64    ) -> Result<Arc<dyn FileOpener>>;
65    /// Any
66    fn as_any(&self) -> &dyn Any;
67    /// Returns the table schema for this file source.
68    ///
69    /// This always returns the unprojected schema (the full schema of the data).
70    fn table_schema(&self) -> &crate::table_schema::TableSchema;
71    /// Initialize new type with batch size configuration
72    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
73    /// Returns the filter expression that will be applied during the file scan.
74    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
75        None
76    }
77    /// Return the projection that will be applied to the output stream on top of the table schema.
78    fn projection(&self) -> Option<&ProjectionExprs> {
79        None
80    }
81    /// Return execution plan metrics
82    fn metrics(&self) -> &ExecutionPlanMetricsSet;
83    /// String representation of file source such as "csv", "json", "parquet"
84    fn file_type(&self) -> &str;
85    /// Format FileType specific information
86    fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
87        Ok(())
88    }
89
90    /// Returns whether this file source supports repartitioning files by byte ranges.
91    ///
92    /// When this returns `true`, files can be split into multiple partitions
93    /// based on byte offsets for parallel reading.
94    ///
95    /// When this returns `false`, files cannot be repartitioned (e.g., CSV files
96    /// with `newlines_in_values` enabled cannot be split because record boundaries
97    /// cannot be determined by byte offset alone).
98    ///
99    /// The default implementation returns `true`. File sources that cannot support
100    /// repartitioning should override this method.
101    fn supports_repartitioning(&self) -> bool {
102        true
103    }
104
105    /// If supported by the [`FileSource`], redistribute files across partitions
106    /// according to their size. Allows custom file formats to implement their
107    /// own repartitioning logic.
108    ///
109    /// The default implementation uses [`FileGroupPartitioner`]. See that
110    /// struct for more details.
111    fn repartitioned(
112        &self,
113        target_partitions: usize,
114        repartition_file_min_size: usize,
115        output_ordering: Option<LexOrdering>,
116        config: &FileScanConfig,
117    ) -> Result<Option<FileScanConfig>> {
118        if config.file_compression_type.is_compressed() || !self.supports_repartitioning()
119        {
120            return Ok(None);
121        }
122
123        let repartitioned_file_groups_option = FileGroupPartitioner::new()
124            .with_target_partitions(target_partitions)
125            .with_repartition_file_min_size(repartition_file_min_size)
126            .with_preserve_order_within_groups(output_ordering.is_some())
127            .repartition_file_groups(&config.file_groups);
128
129        if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
130            let mut source = config.clone();
131            source.file_groups = repartitioned_file_groups;
132            return Ok(Some(source));
133        }
134        Ok(None)
135    }
136
137    /// Try to push down filters into this FileSource.
138    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
139    ///
140    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
141    fn try_pushdown_filters(
142        &self,
143        filters: Vec<Arc<dyn PhysicalExpr>>,
144        _config: &ConfigOptions,
145    ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
146        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
147            vec![PushedDown::No; filters.len()],
148        ))
149    }
150
151    /// Try to create a new FileSource that can produce data in the specified sort order.
152    ///
153    /// This method attempts to optimize data retrieval to match the requested ordering.
154    /// It receives both the requested ordering and equivalence properties that describe
155    /// the output data from this file source.
156    ///
157    /// # Parameters
158    /// * `order` - The requested sort ordering from the query
159    /// * `eq_properties` - Equivalence properties of the data that will be produced by this
160    ///   file source. These properties describe the ordering, constant columns, and other
161    ///   relationships in the output data, allowing the implementation to determine if
162    ///   optimizations like reversed scanning can help satisfy the requested ordering.
163    ///   This includes information about:
164    ///   - The file's natural ordering (from output_ordering in FileScanConfig)
165    ///   - Constant columns (e.g., from filters like `ticker = 'AAPL'`)
166    ///   - Monotonic functions (e.g., `extract_year_month(timestamp)`)
167    ///   - Other equivalence relationships
168    ///
169    /// # Examples
170    ///
171    /// ## Example 1: Simple reverse
172    /// ```text
173    /// File ordering: [a ASC, b DESC]
174    /// Requested:     [a DESC]
175    /// Reversed file: [a DESC, b ASC]
176    /// Result: Satisfies request (prefix match) → Inexact
177    /// ```
178    ///
179    /// ## Example 2: Monotonic function
180    /// ```text
181    /// File ordering: [extract_year_month(ts) ASC, ts ASC]
182    /// Requested:     [ts DESC]
183    /// Reversed file: [extract_year_month(ts) DESC, ts DESC]
184    /// Result: Through monotonicity, satisfies [ts DESC] → Inexact
185    /// ```
186    ///
187    /// # Returns
188    /// * `Exact` - Created a source that guarantees perfect ordering
189    /// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
190    /// * `Unsupported` - Cannot optimize for this ordering
191    ///
192    /// Default implementation returns `Unsupported`.
193    fn try_reverse_output(
194        &self,
195        _order: &[PhysicalSortExpr],
196        _eq_properties: &EquivalenceProperties,
197    ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
198        Ok(SortOrderPushdownResult::Unsupported)
199    }
200
201    /// Try to push down a projection into a this FileSource.
202    ///
203    /// `FileSource` implementations that support projection pushdown should
204    /// override this method and return a new `FileSource` instance with the
205    /// projection incorporated.
206    ///
207    /// If a `FileSource` does accept a projection it is expected to handle
208    /// the projection in it's entirety, including partition columns.
209    /// For example, the `FileSource` may translate that projection into a
210    /// file format specific projection (e.g. Parquet can push down struct field access,
211    /// some other file formats like Vortex can push down computed expressions into un-decoded data)
212    /// and also need to handle partition column projection (generally done by replacing partition column
213    /// references with literal values derived from each files partition values).
214    ///
215    /// Not all FileSource's can handle complex expression pushdowns. For example,
216    /// a CSV file source may only support simple column selections. In such cases,
217    /// the `FileSource` can use [`SplitProjection`] and [`ProjectionOpener`]
218    /// to split the projection into a pushdownable part and a non-pushdownable part.
219    /// These helpers also handle partition column projection.
220    ///
221    /// [`SplitProjection`]: crate::projection::SplitProjection
222    /// [`ProjectionOpener`]: crate::projection::ProjectionOpener
223    fn try_pushdown_projection(
224        &self,
225        _projection: &ProjectionExprs,
226    ) -> Result<Option<Arc<dyn FileSource>>> {
227        Ok(None)
228    }
229
230    /// Deprecated: Set optional schema adapter factory.
231    ///
232    /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
233    /// See `upgrading.md` for more details.
234    #[deprecated(
235        since = "52.0.0",
236        note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
237    )]
238    #[expect(deprecated)]
239    fn with_schema_adapter_factory(
240        &self,
241        _factory: Arc<dyn SchemaAdapterFactory>,
242    ) -> Result<Arc<dyn FileSource>> {
243        not_impl_err!(
244            "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
245        )
246    }
247
248    /// Deprecated: Returns the current schema adapter factory if set.
249    ///
250    /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
251    /// See `upgrading.md` for more details.
252    #[deprecated(
253        since = "52.0.0",
254        note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
255    )]
256    #[expect(deprecated)]
257    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
258        None
259    }
260}