datafusion_datasource/file.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common behaviors that every file format needs to implement
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Formatter;
23use std::sync::Arc;
24
25use crate::file_groups::FileGroupPartitioner;
26use crate::file_scan_config::FileScanConfig;
27use crate::file_stream::FileOpener;
28use crate::morsel::{FileOpenerMorselizer, Morselizer};
29#[expect(deprecated)]
30use crate::schema_adapter::SchemaAdapterFactory;
31use datafusion_common::config::ConfigOptions;
32use datafusion_common::{Result, not_impl_err};
33use datafusion_physical_expr::projection::ProjectionExprs;
34use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
35use datafusion_physical_plan::DisplayFormatType;
36use datafusion_physical_plan::SortOrderPushdownResult;
37use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
38use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
39
40use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
41use object_store::ObjectStore;
42
43/// Helper function to convert any type implementing [`FileSource`] to `Arc<dyn FileSource>`
44pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
45 Arc::new(source)
46}
47
48/// File format specific behaviors for [`DataSource`]
49///
50/// # Schema information
51/// There are two important schemas for a [`FileSource`]:
52/// 1. [`Self::table_schema`] -- the schema for the overall table
53/// (file data plus partition columns)
54/// 2. The logical output schema, comprised of [`Self::table_schema`] with
55/// [`Self::projection`] applied
56///
57/// See more details on specific implementations:
58/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
59/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
60/// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html)
61/// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html)
62/// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html)
63///
64/// [`DataSource`]: crate::source::DataSource
65pub trait FileSource: Any + Send + Sync {
66 /// Creates a `dyn FileOpener` based on given parameters.
67 ///
68 /// Note: File sources with a native morsel implementation should return an
69 /// error from this method and implementing [`Self::create_morselizer`] instead.
70 fn create_file_opener(
71 &self,
72 object_store: Arc<dyn ObjectStore>,
73 base_config: &FileScanConfig,
74 partition: usize,
75 ) -> Result<Arc<dyn FileOpener>>;
76
77 /// Creates a `dyn Morselizer` based on given parameters.
78 ///
79 /// The default implementation preserves existing behavior by adapting the
80 /// legacy [`FileOpener`] API into a [`Morselizer`].
81 ///
82 /// It is preferred to implement the [`Morselizer`] API directly by
83 /// implementing this method.
84 fn create_morselizer(
85 &self,
86 object_store: Arc<dyn ObjectStore>,
87 base_config: &FileScanConfig,
88 partition: usize,
89 ) -> Result<Box<dyn Morselizer>> {
90 let opener = self.create_file_opener(object_store, base_config, partition)?;
91 Ok(Box::new(FileOpenerMorselizer::new(opener)))
92 }
93
94 /// Returns the table schema for the overall table (including partition columns, if any)
95 ///
96 /// This method returns the unprojected schema: the full schema of the data
97 /// without [`Self::projection`] applied.
98 ///
99 /// The output schema of this `FileSource` is this TableSchema
100 /// with [`Self::projection`] applied.
101 ///
102 /// Use [`ProjectionExprs::project_schema`] to get the projected schema
103 /// after applying the projection.
104 fn table_schema(&self) -> &crate::table_schema::TableSchema;
105
106 /// Initialize new type with batch size configuration
107 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
108
109 /// Returns the filter expression that will be applied *during* the file scan.
110 ///
111 /// These expressions are in terms of the unprojected [`Self::table_schema`].
112 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
113 None
114 }
115
116 /// Return the projection that will be applied to the output stream on top
117 /// of [`Self::table_schema`].
118 ///
119 /// Note you can use [`ProjectionExprs::project_schema`] on the table
120 /// schema to get the effective output schema of this source.
121 fn projection(&self) -> Option<&ProjectionExprs> {
122 None
123 }
124
125 /// Return execution plan metrics
126 fn metrics(&self) -> &ExecutionPlanMetricsSet;
127
128 /// String representation of file source such as "csv", "json", "parquet"
129 fn file_type(&self) -> &str;
130
131 /// Format FileType specific information
132 fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
133 Ok(())
134 }
135
136 /// Returns whether this file source supports repartitioning files by byte ranges.
137 ///
138 /// When this returns `true`, files can be split into multiple partitions
139 /// based on byte offsets for parallel reading.
140 ///
141 /// When this returns `false`, files cannot be repartitioned (e.g., CSV files
142 /// with `newlines_in_values` enabled cannot be split because record boundaries
143 /// cannot be determined by byte offset alone).
144 ///
145 /// The default implementation returns `true`. File sources that cannot support
146 /// repartitioning should override this method.
147 fn supports_repartitioning(&self) -> bool {
148 true
149 }
150
151 /// If supported by the [`FileSource`], redistribute files across partitions
152 /// according to their size. Allows custom file formats to implement their
153 /// own repartitioning logic.
154 ///
155 /// The default implementation uses [`FileGroupPartitioner`]. See that
156 /// struct for more details.
157 fn repartitioned(
158 &self,
159 target_partitions: usize,
160 repartition_file_min_size: usize,
161 output_ordering: Option<LexOrdering>,
162 config: &FileScanConfig,
163 ) -> Result<Option<FileScanConfig>> {
164 if config.file_compression_type.is_compressed() || !self.supports_repartitioning()
165 {
166 return Ok(None);
167 }
168
169 let repartitioned_file_groups_option = FileGroupPartitioner::new()
170 .with_target_partitions(target_partitions)
171 .with_repartition_file_min_size(repartition_file_min_size)
172 .with_preserve_order_within_groups(output_ordering.is_some())
173 .repartition_file_groups(&config.file_groups);
174
175 if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
176 let mut source = config.clone();
177 source.file_groups = repartitioned_file_groups;
178 return Ok(Some(source));
179 }
180 Ok(None)
181 }
182
183 /// Try to push down filters into this FileSource.
184 ///
185 /// `filters` must be in terms of the unprojected table schema (file schema
186 /// plus partition columns), before any projection is applied.
187 ///
188 /// Any filters that this FileSource chooses to evaluate itself should be
189 /// returned as `PushedDown::Yes` in the result, along with a FileSource
190 /// instance that incorporates those filters. Such filters are logically
191 /// applied "during" the file scan, meaning they may refer to columns not
192 /// included in the final output projection.
193 ///
194 /// Filters that cannot be pushed down should be marked as `PushedDown::No`,
195 /// and will be evaluated by an execution plan after the file source.
196 ///
197 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
198 ///
199 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
200 fn try_pushdown_filters(
201 &self,
202 filters: Vec<Arc<dyn PhysicalExpr>>,
203 _config: &ConfigOptions,
204 ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
205 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
206 vec![PushedDown::No; filters.len()],
207 ))
208 }
209
210 /// Try to create a new FileSource that can produce data in the specified sort order.
211 ///
212 /// This method attempts to optimize data retrieval to match the requested ordering.
213 /// It receives both the requested ordering and equivalence properties that describe
214 /// the output data from this file source.
215 ///
216 /// # Parameters
217 /// * `order` - The requested sort ordering from the query
218 /// * `eq_properties` - Equivalence properties of the data that will be produced by this
219 /// file source. These properties describe the ordering, constant columns, and other
220 /// relationships in the output data, allowing the implementation to determine if
221 /// optimizations like reversed scanning can help satisfy the requested ordering.
222 /// This includes information about:
223 /// - The file's natural ordering (from output_ordering in FileScanConfig)
224 /// - Constant columns (e.g., from filters like `ticker = 'AAPL'`)
225 /// - Monotonic functions (e.g., `extract_year_month(timestamp)`)
226 /// - Other equivalence relationships
227 ///
228 /// # Examples
229 ///
230 /// ## Example 1: Simple reverse
231 /// ```text
232 /// File ordering: [a ASC, b DESC]
233 /// Requested: [a DESC]
234 /// Reversed file: [a DESC, b ASC]
235 /// Result: Satisfies request (prefix match) → Inexact
236 /// ```
237 ///
238 /// ## Example 2: Monotonic function
239 /// ```text
240 /// File ordering: [extract_year_month(ts) ASC, ts ASC]
241 /// Requested: [ts DESC]
242 /// Reversed file: [extract_year_month(ts) DESC, ts DESC]
243 /// Result: Through monotonicity, satisfies [ts DESC] → Inexact
244 /// ```
245 ///
246 /// # Returns
247 /// * `Exact` - Created a source that guarantees perfect ordering
248 /// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
249 /// * `Unsupported` - Cannot optimize for this ordering
250 ///
251 /// # Deprecation / migration notes
252 /// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `53.0.0`.
253 /// Per DataFusion's deprecation guidelines, it will be removed in `59.0.0` or later
254 /// (6 major versions or 6 months, whichever is longer).
255 /// - New implementations should override [`Self::try_pushdown_sort`] directly.
256 /// - For backwards compatibility, the default implementation of
257 /// [`Self::try_pushdown_sort`] delegates to the deprecated
258 /// [`Self::try_reverse_output`] until it is removed. After that point, the
259 /// default implementation will return [`SortOrderPushdownResult::Unsupported`].
260 fn try_pushdown_sort(
261 &self,
262 order: &[PhysicalSortExpr],
263 eq_properties: &EquivalenceProperties,
264 ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
265 #[expect(deprecated)]
266 self.try_reverse_output(order, eq_properties)
267 }
268
269 /// Deprecated: Renamed to [`Self::try_pushdown_sort`].
270 #[deprecated(
271 since = "53.0.0",
272 note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 59.0.0 or later."
273 )]
274 fn try_reverse_output(
275 &self,
276 _order: &[PhysicalSortExpr],
277 _eq_properties: &EquivalenceProperties,
278 ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
279 Ok(SortOrderPushdownResult::Unsupported)
280 }
281
282 /// Reorder files in the shared work queue to optimize query performance.
283 ///
284 /// For example, TopK queries benefit from reading files with the best
285 /// statistics first, so the dynamic filter threshold tightens quickly.
286 ///
287 /// The default implementation returns files unchanged (no reordering).
288 fn reorder_files(
289 &self,
290 files: Vec<crate::PartitionedFile>,
291 ) -> Vec<crate::PartitionedFile> {
292 files
293 }
294
295 /// Try to push down a projection into this FileSource.
296 ///
297 /// `FileSource` implementations that support projection pushdown should
298 /// override this method and return a new `FileSource` instance with the
299 /// projection incorporated.
300 ///
301 /// If a `FileSource` does accept a projection it is expected to handle
302 /// the projection in it's entirety, including partition columns.
303 /// For example, the `FileSource` may translate that projection into a
304 /// file format specific projection (e.g. Parquet can push down struct field access,
305 /// some other file formats like Vortex can push down computed expressions into un-decoded data)
306 /// and also need to handle partition column projection (generally done by replacing partition column
307 /// references with literal values derived from each files partition values).
308 ///
309 /// Not all FileSource's can handle complex expression pushdowns. For example,
310 /// a CSV file source may only support simple column selections. In such cases,
311 /// the `FileSource` can use [`SplitProjection`] and [`ProjectionOpener`]
312 /// to split the projection into a pushdownable part and a non-pushdownable part.
313 /// These helpers also handle partition column projection.
314 ///
315 /// [`SplitProjection`]: crate::projection::SplitProjection
316 /// [`ProjectionOpener`]: crate::projection::ProjectionOpener
317 fn try_pushdown_projection(
318 &self,
319 _projection: &ProjectionExprs,
320 ) -> Result<Option<Arc<dyn FileSource>>> {
321 Ok(None)
322 }
323
324 /// Deprecated: Set optional schema adapter factory.
325 ///
326 /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
327 /// See `upgrading.md` for more details.
328 #[deprecated(
329 since = "53.0.0",
330 note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
331 )]
332 #[expect(deprecated)]
333 fn with_schema_adapter_factory(
334 &self,
335 _factory: Arc<dyn SchemaAdapterFactory>,
336 ) -> Result<Arc<dyn FileSource>> {
337 not_impl_err!(
338 "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
339 )
340 }
341
342 /// Deprecated: Returns the current schema adapter factory if set.
343 ///
344 /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
345 /// See `upgrading.md` for more details.
346 #[deprecated(
347 since = "53.0.0",
348 note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
349 )]
350 #[expect(deprecated)]
351 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
352 None
353 }
354}
355
356impl dyn FileSource {
357 /// Returns `true` if this source is of type `T`.
358 pub fn is<T: FileSource>(&self) -> bool {
359 (self as &dyn Any).is::<T>()
360 }
361
362 /// Attempts to downcast this source to a concrete type `T`.
363 pub fn downcast_ref<T: FileSource>(&self) -> Option<&T> {
364 (self as &dyn Any).downcast_ref()
365 }
366}