datafusion_datasource/file.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common behaviors that every file format needs to implement
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Formatter;
23use std::sync::Arc;
24
25use crate::file_groups::FileGroupPartitioner;
26use crate::file_scan_config::FileScanConfig;
27use crate::file_stream::FileOpener;
28#[expect(deprecated)]
29use crate::schema_adapter::SchemaAdapterFactory;
30use datafusion_common::config::ConfigOptions;
31use datafusion_common::{Result, not_impl_err};
32use datafusion_physical_expr::projection::ProjectionExprs;
33use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
34use datafusion_physical_plan::DisplayFormatType;
35use datafusion_physical_plan::SortOrderPushdownResult;
36use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
37use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
38
39use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
40use object_store::ObjectStore;
41
42/// Helper function to convert any type implementing [`FileSource`] to `Arc<dyn FileSource>`
43pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
44 Arc::new(source)
45}
46
47/// File format specific behaviors for [`DataSource`]
48///
49/// # Schema information
50/// There are two important schemas for a [`FileSource`]:
51/// 1. [`Self::table_schema`] -- the schema for the overall table
52/// (file data plus partition columns)
53/// 2. The logical output schema, comprised of [`Self::table_schema`] with
54/// [`Self::projection`] applied
55///
56/// See more details on specific implementations:
57/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
58/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
59/// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html)
60/// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html)
61/// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html)
62///
63/// [`DataSource`]: crate::source::DataSource
64pub trait FileSource: Send + Sync {
65 /// Creates a `dyn FileOpener` based on given parameters
66 fn create_file_opener(
67 &self,
68 object_store: Arc<dyn ObjectStore>,
69 base_config: &FileScanConfig,
70 partition: usize,
71 ) -> Result<Arc<dyn FileOpener>>;
72 /// Any
73 fn as_any(&self) -> &dyn Any;
74
75 /// Returns the table schema for the overall table (including partition columns, if any)
76 ///
77 /// This method returns the unprojected schema: the full schema of the data
78 /// without [`Self::projection`] applied.
79 ///
80 /// The output schema of this `FileSource` is this TableSchema
81 /// with [`Self::projection`] applied.
82 ///
83 /// Use [`ProjectionExprs::project_schema`] to get the projected schema
84 /// after applying the projection.
85 fn table_schema(&self) -> &crate::table_schema::TableSchema;
86
87 /// Initialize new type with batch size configuration
88 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
89
90 /// Returns the filter expression that will be applied *during* the file scan.
91 ///
92 /// These expressions are in terms of the unprojected [`Self::table_schema`].
93 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
94 None
95 }
96
97 /// Return the projection that will be applied to the output stream on top
98 /// of [`Self::table_schema`].
99 ///
100 /// Note you can use [`ProjectionExprs::project_schema`] on the table
101 /// schema to get the effective output schema of this source.
102 fn projection(&self) -> Option<&ProjectionExprs> {
103 None
104 }
105
106 /// Return execution plan metrics
107 fn metrics(&self) -> &ExecutionPlanMetricsSet;
108
109 /// String representation of file source such as "csv", "json", "parquet"
110 fn file_type(&self) -> &str;
111
112 /// Format FileType specific information
113 fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
114 Ok(())
115 }
116
117 /// Returns whether this file source supports repartitioning files by byte ranges.
118 ///
119 /// When this returns `true`, files can be split into multiple partitions
120 /// based on byte offsets for parallel reading.
121 ///
122 /// When this returns `false`, files cannot be repartitioned (e.g., CSV files
123 /// with `newlines_in_values` enabled cannot be split because record boundaries
124 /// cannot be determined by byte offset alone).
125 ///
126 /// The default implementation returns `true`. File sources that cannot support
127 /// repartitioning should override this method.
128 fn supports_repartitioning(&self) -> bool {
129 true
130 }
131
132 /// If supported by the [`FileSource`], redistribute files across partitions
133 /// according to their size. Allows custom file formats to implement their
134 /// own repartitioning logic.
135 ///
136 /// The default implementation uses [`FileGroupPartitioner`]. See that
137 /// struct for more details.
138 fn repartitioned(
139 &self,
140 target_partitions: usize,
141 repartition_file_min_size: usize,
142 output_ordering: Option<LexOrdering>,
143 config: &FileScanConfig,
144 ) -> Result<Option<FileScanConfig>> {
145 if config.file_compression_type.is_compressed() || !self.supports_repartitioning()
146 {
147 return Ok(None);
148 }
149
150 let repartitioned_file_groups_option = FileGroupPartitioner::new()
151 .with_target_partitions(target_partitions)
152 .with_repartition_file_min_size(repartition_file_min_size)
153 .with_preserve_order_within_groups(output_ordering.is_some())
154 .repartition_file_groups(&config.file_groups);
155
156 if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
157 let mut source = config.clone();
158 source.file_groups = repartitioned_file_groups;
159 return Ok(Some(source));
160 }
161 Ok(None)
162 }
163
164 /// Try to push down filters into this FileSource.
165 ///
166 /// `filters` must be in terms of the unprojected table schema (file schema
167 /// plus partition columns), before any projection is applied.
168 ///
169 /// Any filters that this FileSource chooses to evaluate itself should be
170 /// returned as `PushedDown::Yes` in the result, along with a FileSource
171 /// instance that incorporates those filters. Such filters are logically
172 /// applied "during" the file scan, meaning they may refer to columns not
173 /// included in the final output projection.
174 ///
175 /// Filters that cannot be pushed down should be marked as `PushedDown::No`,
176 /// and will be evaluated by an execution plan after the file source.
177 ///
178 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
179 ///
180 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
181 fn try_pushdown_filters(
182 &self,
183 filters: Vec<Arc<dyn PhysicalExpr>>,
184 _config: &ConfigOptions,
185 ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
186 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
187 vec![PushedDown::No; filters.len()],
188 ))
189 }
190
191 /// Try to create a new FileSource that can produce data in the specified sort order.
192 ///
193 /// This method attempts to optimize data retrieval to match the requested ordering.
194 /// It receives both the requested ordering and equivalence properties that describe
195 /// the output data from this file source.
196 ///
197 /// # Parameters
198 /// * `order` - The requested sort ordering from the query
199 /// * `eq_properties` - Equivalence properties of the data that will be produced by this
200 /// file source. These properties describe the ordering, constant columns, and other
201 /// relationships in the output data, allowing the implementation to determine if
202 /// optimizations like reversed scanning can help satisfy the requested ordering.
203 /// This includes information about:
204 /// - The file's natural ordering (from output_ordering in FileScanConfig)
205 /// - Constant columns (e.g., from filters like `ticker = 'AAPL'`)
206 /// - Monotonic functions (e.g., `extract_year_month(timestamp)`)
207 /// - Other equivalence relationships
208 ///
209 /// # Examples
210 ///
211 /// ## Example 1: Simple reverse
212 /// ```text
213 /// File ordering: [a ASC, b DESC]
214 /// Requested: [a DESC]
215 /// Reversed file: [a DESC, b ASC]
216 /// Result: Satisfies request (prefix match) → Inexact
217 /// ```
218 ///
219 /// ## Example 2: Monotonic function
220 /// ```text
221 /// File ordering: [extract_year_month(ts) ASC, ts ASC]
222 /// Requested: [ts DESC]
223 /// Reversed file: [extract_year_month(ts) DESC, ts DESC]
224 /// Result: Through monotonicity, satisfies [ts DESC] → Inexact
225 /// ```
226 ///
227 /// # Returns
228 /// * `Exact` - Created a source that guarantees perfect ordering
229 /// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
230 /// * `Unsupported` - Cannot optimize for this ordering
231 ///
232 /// # Deprecation / migration notes
233 /// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `53.0.0`.
234 /// Per DataFusion's deprecation guidelines, it will be removed in `59.0.0` or later
235 /// (6 major versions or 6 months, whichever is longer).
236 /// - New implementations should override [`Self::try_pushdown_sort`] directly.
237 /// - For backwards compatibility, the default implementation of
238 /// [`Self::try_pushdown_sort`] delegates to the deprecated
239 /// [`Self::try_reverse_output`] until it is removed. After that point, the
240 /// default implementation will return [`SortOrderPushdownResult::Unsupported`].
241 fn try_pushdown_sort(
242 &self,
243 order: &[PhysicalSortExpr],
244 eq_properties: &EquivalenceProperties,
245 ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
246 #[expect(deprecated)]
247 self.try_reverse_output(order, eq_properties)
248 }
249
250 /// Deprecated: Renamed to [`Self::try_pushdown_sort`].
251 #[deprecated(
252 since = "53.0.0",
253 note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 59.0.0 or later."
254 )]
255 fn try_reverse_output(
256 &self,
257 _order: &[PhysicalSortExpr],
258 _eq_properties: &EquivalenceProperties,
259 ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
260 Ok(SortOrderPushdownResult::Unsupported)
261 }
262
263 /// Try to push down a projection into this FileSource.
264 ///
265 /// `FileSource` implementations that support projection pushdown should
266 /// override this method and return a new `FileSource` instance with the
267 /// projection incorporated.
268 ///
269 /// If a `FileSource` does accept a projection it is expected to handle
270 /// the projection in it's entirety, including partition columns.
271 /// For example, the `FileSource` may translate that projection into a
272 /// file format specific projection (e.g. Parquet can push down struct field access,
273 /// some other file formats like Vortex can push down computed expressions into un-decoded data)
274 /// and also need to handle partition column projection (generally done by replacing partition column
275 /// references with literal values derived from each files partition values).
276 ///
277 /// Not all FileSource's can handle complex expression pushdowns. For example,
278 /// a CSV file source may only support simple column selections. In such cases,
279 /// the `FileSource` can use [`SplitProjection`] and [`ProjectionOpener`]
280 /// to split the projection into a pushdownable part and a non-pushdownable part.
281 /// These helpers also handle partition column projection.
282 ///
283 /// [`SplitProjection`]: crate::projection::SplitProjection
284 /// [`ProjectionOpener`]: crate::projection::ProjectionOpener
285 fn try_pushdown_projection(
286 &self,
287 _projection: &ProjectionExprs,
288 ) -> Result<Option<Arc<dyn FileSource>>> {
289 Ok(None)
290 }
291
292 /// Deprecated: Set optional schema adapter factory.
293 ///
294 /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
295 /// See `upgrading.md` for more details.
296 #[deprecated(
297 since = "53.0.0",
298 note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
299 )]
300 #[expect(deprecated)]
301 fn with_schema_adapter_factory(
302 &self,
303 _factory: Arc<dyn SchemaAdapterFactory>,
304 ) -> Result<Arc<dyn FileSource>> {
305 not_impl_err!(
306 "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
307 )
308 }
309
310 /// Deprecated: Returns the current schema adapter factory if set.
311 ///
312 /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
313 /// See `upgrading.md` for more details.
314 #[deprecated(
315 since = "53.0.0",
316 note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
317 )]
318 #[expect(deprecated)]
319 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
320 None
321 }
322}