Skip to main content

datafusion_datasource/
file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common behaviors that every file format needs to implement
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Formatter;
23use std::sync::Arc;
24
25use crate::file_groups::FileGroupPartitioner;
26use crate::file_scan_config::FileScanConfig;
27use crate::file_stream::FileOpener;
28use crate::morsel::{FileOpenerMorselizer, Morselizer};
29#[expect(deprecated)]
30use crate::schema_adapter::SchemaAdapterFactory;
31use datafusion_common::config::ConfigOptions;
32use datafusion_common::{Result, not_impl_err};
33use datafusion_physical_expr::projection::ProjectionExprs;
34use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
35use datafusion_physical_plan::DisplayFormatType;
36use datafusion_physical_plan::SortOrderPushdownResult;
37use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
38use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
39
40use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
41use object_store::ObjectStore;
42
43/// Helper function to convert any type implementing [`FileSource`] to `Arc<dyn FileSource>`
44pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
45    Arc::new(source)
46}
47
48/// File format specific behaviors for [`DataSource`]
49///
50/// # Schema information
51/// There are two important schemas for a [`FileSource`]:
52/// 1. [`Self::table_schema`] -- the schema for the overall table
53///    (file data plus partition columns)
54/// 2. The logical output schema, comprised of [`Self::table_schema`] with
55///    [`Self::projection`] applied
56///
57/// See more details on specific implementations:
58/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
59/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
60/// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html)
61/// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html)
62/// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html)
63///
64/// [`DataSource`]: crate::source::DataSource
65pub trait FileSource: Any + Send + Sync {
66    /// Creates a `dyn FileOpener` based on given parameters.
67    ///
68    /// Note: File sources with a native morsel implementation should return an
69    /// error from this method and implementing [`Self::create_morselizer`] instead.
70    fn create_file_opener(
71        &self,
72        object_store: Arc<dyn ObjectStore>,
73        base_config: &FileScanConfig,
74        partition: usize,
75    ) -> Result<Arc<dyn FileOpener>>;
76
77    /// Creates a `dyn Morselizer` based on given parameters.
78    ///
79    /// The default implementation preserves existing behavior by adapting the
80    /// legacy [`FileOpener`] API into a [`Morselizer`].
81    ///
82    /// It is preferred to implement the [`Morselizer`] API directly by
83    /// implementing this method.
84    fn create_morselizer(
85        &self,
86        object_store: Arc<dyn ObjectStore>,
87        base_config: &FileScanConfig,
88        partition: usize,
89    ) -> Result<Box<dyn Morselizer>> {
90        let opener = self.create_file_opener(object_store, base_config, partition)?;
91        Ok(Box::new(FileOpenerMorselizer::new(opener)))
92    }
93
94    /// Returns the table schema for the overall table (including partition columns, if any)
95    ///
96    /// This method returns the unprojected schema: the full schema of the data
97    /// without [`Self::projection`] applied.
98    ///
99    /// The output schema of this `FileSource` is this TableSchema
100    /// with [`Self::projection`] applied.
101    ///
102    /// Use [`ProjectionExprs::project_schema`] to get the projected schema
103    /// after applying the projection.
104    fn table_schema(&self) -> &crate::table_schema::TableSchema;
105
106    /// Initialize new type with batch size configuration
107    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
108
109    /// Returns the filter expression that will be applied *during* the file scan.
110    ///
111    /// These expressions are in terms of the unprojected [`Self::table_schema`].
112    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
113        None
114    }
115
116    /// Return the projection that will be applied to the output stream on top
117    /// of [`Self::table_schema`].
118    ///
119    /// Note you can use [`ProjectionExprs::project_schema`] on the table
120    /// schema to get the effective output schema of this source.
121    fn projection(&self) -> Option<&ProjectionExprs> {
122        None
123    }
124
125    /// Return execution plan metrics
126    fn metrics(&self) -> &ExecutionPlanMetricsSet;
127
128    /// String representation of file source such as "csv", "json", "parquet"
129    fn file_type(&self) -> &str;
130
131    /// Format FileType specific information
132    fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
133        Ok(())
134    }
135
136    /// Returns whether this file source supports repartitioning files by byte ranges.
137    ///
138    /// When this returns `true`, files can be split into multiple partitions
139    /// based on byte offsets for parallel reading.
140    ///
141    /// When this returns `false`, files cannot be repartitioned (e.g., CSV files
142    /// with `newlines_in_values` enabled cannot be split because record boundaries
143    /// cannot be determined by byte offset alone).
144    ///
145    /// The default implementation returns `true`. File sources that cannot support
146    /// repartitioning should override this method.
147    fn supports_repartitioning(&self) -> bool {
148        true
149    }
150
151    /// If supported by the [`FileSource`], redistribute files across partitions
152    /// according to their size. Allows custom file formats to implement their
153    /// own repartitioning logic.
154    ///
155    /// The default implementation uses [`FileGroupPartitioner`]. See that
156    /// struct for more details.
157    fn repartitioned(
158        &self,
159        target_partitions: usize,
160        repartition_file_min_size: usize,
161        output_ordering: Option<LexOrdering>,
162        config: &FileScanConfig,
163    ) -> Result<Option<FileScanConfig>> {
164        if config.file_compression_type.is_compressed() || !self.supports_repartitioning()
165        {
166            return Ok(None);
167        }
168
169        let repartitioned_file_groups_option = FileGroupPartitioner::new()
170            .with_target_partitions(target_partitions)
171            .with_repartition_file_min_size(repartition_file_min_size)
172            .with_preserve_order_within_groups(output_ordering.is_some())
173            .repartition_file_groups(&config.file_groups);
174
175        if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
176            let mut source = config.clone();
177            source.file_groups = repartitioned_file_groups;
178            return Ok(Some(source));
179        }
180        Ok(None)
181    }
182
183    /// Try to push down filters into this FileSource.
184    ///
185    /// `filters` must be in terms of the unprojected table schema (file schema
186    /// plus partition columns), before any projection is applied.
187    ///
188    /// Any filters that this FileSource chooses to evaluate itself should be
189    /// returned as `PushedDown::Yes` in the result, along with a FileSource
190    /// instance that incorporates those filters. Such filters are logically
191    /// applied "during" the file scan, meaning they may refer to columns not
192    /// included in the final output projection.
193    ///
194    /// Filters that cannot be pushed down should be marked as `PushedDown::No`,
195    /// and will be evaluated by an execution plan after the file source.
196    ///
197    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
198    ///
199    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
200    fn try_pushdown_filters(
201        &self,
202        filters: Vec<Arc<dyn PhysicalExpr>>,
203        _config: &ConfigOptions,
204    ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
205        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
206            vec![PushedDown::No; filters.len()],
207        ))
208    }
209
210    /// Try to create a new FileSource that can produce data in the specified sort order.
211    ///
212    /// This method attempts to optimize data retrieval to match the requested ordering.
213    /// It receives both the requested ordering and equivalence properties that describe
214    /// the output data from this file source.
215    ///
216    /// # Parameters
217    /// * `order` - The requested sort ordering from the query
218    /// * `eq_properties` - Equivalence properties of the data that will be produced by this
219    ///   file source. These properties describe the ordering, constant columns, and other
220    ///   relationships in the output data, allowing the implementation to determine if
221    ///   optimizations like reversed scanning can help satisfy the requested ordering.
222    ///   This includes information about:
223    ///   - The file's natural ordering (from output_ordering in FileScanConfig)
224    ///   - Constant columns (e.g., from filters like `ticker = 'AAPL'`)
225    ///   - Monotonic functions (e.g., `extract_year_month(timestamp)`)
226    ///   - Other equivalence relationships
227    ///
228    /// # Examples
229    ///
230    /// ## Example 1: Simple reverse
231    /// ```text
232    /// File ordering: [a ASC, b DESC]
233    /// Requested:     [a DESC]
234    /// Reversed file: [a DESC, b ASC]
235    /// Result: Satisfies request (prefix match) → Inexact
236    /// ```
237    ///
238    /// ## Example 2: Monotonic function
239    /// ```text
240    /// File ordering: [extract_year_month(ts) ASC, ts ASC]
241    /// Requested:     [ts DESC]
242    /// Reversed file: [extract_year_month(ts) DESC, ts DESC]
243    /// Result: Through monotonicity, satisfies [ts DESC] → Inexact
244    /// ```
245    ///
246    /// # Returns
247    /// * `Exact` - Created a source that guarantees perfect ordering
248    /// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
249    /// * `Unsupported` - Cannot optimize for this ordering
250    ///
251    /// # Deprecation / migration notes
252    /// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `53.0.0`.
253    ///   Per DataFusion's deprecation guidelines, it will be removed in `59.0.0` or later
254    ///   (6 major versions or 6 months, whichever is longer).
255    /// - New implementations should override [`Self::try_pushdown_sort`] directly.
256    /// - For backwards compatibility, the default implementation of
257    ///   [`Self::try_pushdown_sort`] delegates to the deprecated
258    ///   [`Self::try_reverse_output`] until it is removed. After that point, the
259    ///   default implementation will return [`SortOrderPushdownResult::Unsupported`].
260    fn try_pushdown_sort(
261        &self,
262        order: &[PhysicalSortExpr],
263        eq_properties: &EquivalenceProperties,
264    ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
265        #[expect(deprecated)]
266        self.try_reverse_output(order, eq_properties)
267    }
268
269    /// Deprecated: Renamed to [`Self::try_pushdown_sort`].
270    #[deprecated(
271        since = "53.0.0",
272        note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 59.0.0 or later."
273    )]
274    fn try_reverse_output(
275        &self,
276        _order: &[PhysicalSortExpr],
277        _eq_properties: &EquivalenceProperties,
278    ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
279        Ok(SortOrderPushdownResult::Unsupported)
280    }
281
282    /// Reorder files in the shared work queue to optimize query performance.
283    ///
284    /// For example, TopK queries benefit from reading files with the best
285    /// statistics first, so the dynamic filter threshold tightens quickly.
286    ///
287    /// The default implementation returns files unchanged (no reordering).
288    fn reorder_files(
289        &self,
290        files: Vec<crate::PartitionedFile>,
291    ) -> Vec<crate::PartitionedFile> {
292        files
293    }
294
295    /// Try to push down a projection into this FileSource.
296    ///
297    /// `FileSource` implementations that support projection pushdown should
298    /// override this method and return a new `FileSource` instance with the
299    /// projection incorporated.
300    ///
301    /// If a `FileSource` does accept a projection it is expected to handle
302    /// the projection in it's entirety, including partition columns.
303    /// For example, the `FileSource` may translate that projection into a
304    /// file format specific projection (e.g. Parquet can push down struct field access,
305    /// some other file formats like Vortex can push down computed expressions into un-decoded data)
306    /// and also need to handle partition column projection (generally done by replacing partition column
307    /// references with literal values derived from each files partition values).
308    ///
309    /// Not all FileSource's can handle complex expression pushdowns. For example,
310    /// a CSV file source may only support simple column selections. In such cases,
311    /// the `FileSource` can use [`SplitProjection`] and [`ProjectionOpener`]
312    /// to split the projection into a pushdownable part and a non-pushdownable part.
313    /// These helpers also handle partition column projection.
314    ///
315    /// [`SplitProjection`]: crate::projection::SplitProjection
316    /// [`ProjectionOpener`]: crate::projection::ProjectionOpener
317    fn try_pushdown_projection(
318        &self,
319        _projection: &ProjectionExprs,
320    ) -> Result<Option<Arc<dyn FileSource>>> {
321        Ok(None)
322    }
323
324    /// Deprecated: Set optional schema adapter factory.
325    ///
326    /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
327    /// See `upgrading.md` for more details.
328    #[deprecated(
329        since = "53.0.0",
330        note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
331    )]
332    #[expect(deprecated)]
333    fn with_schema_adapter_factory(
334        &self,
335        _factory: Arc<dyn SchemaAdapterFactory>,
336    ) -> Result<Arc<dyn FileSource>> {
337        not_impl_err!(
338            "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
339        )
340    }
341
342    /// Deprecated: Returns the current schema adapter factory if set.
343    ///
344    /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
345    /// See `upgrading.md` for more details.
346    #[deprecated(
347        since = "53.0.0",
348        note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
349    )]
350    #[expect(deprecated)]
351    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
352        None
353    }
354}
355
356impl dyn FileSource {
357    /// Returns `true` if this source is of type `T`.
358    pub fn is<T: FileSource>(&self) -> bool {
359        (self as &dyn Any).is::<T>()
360    }
361
362    /// Attempts to downcast this source to a concrete type `T`.
363    pub fn downcast_ref<T: FileSource>(&self) -> Option<&T> {
364        (self as &dyn Any).downcast_ref()
365    }
366}