datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_expr::projection::ProjectionExprs;
26use datafusion_physical_plan::execution_plan::{
27    Boundedness, EmissionType, SchedulingType,
28};
29use datafusion_physical_plan::metrics::SplitMetrics;
30use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
31use datafusion_physical_plan::projection::ProjectionExec;
32use datafusion_physical_plan::stream::BatchSplitStream;
33use datafusion_physical_plan::{
34    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
35};
36use itertools::Itertools;
37
38use crate::file_scan_config::FileScanConfig;
39use datafusion_common::config::ConfigOptions;
40use datafusion_common::{Constraints, Result, Statistics};
41use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
43use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
44use datafusion_physical_plan::SortOrderPushdownResult;
45use datafusion_physical_plan::filter_pushdown::{
46    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
47};
48
49/// A source of data, typically a list of files or memory
50///
51/// This trait provides common behaviors for abstract sources of data. It has
52/// two common implementations:
53///
54/// 1. [`FileScanConfig`]: lists of files
55/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
56///
57/// File format specific behaviors are defined by [`FileSource`]
58///
59/// # See Also
60/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
61/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
62///
63/// # Notes
64///
65/// Requires `Debug` to assist debugging
66///
67/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
68/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
69/// [`FileSource`]: crate::file::FileSource
70/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
71/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
72///
73/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
74/// ```text
75///                       ┌─────────────────────┐                              -----► execute path
76///                       │                     │                              ┄┄┄┄┄► init path
77///                       │   DataSourceExec    │  
78///                       │                     │    
79///                       └───────▲─────────────┘
80///                               ┊  │
81///                               ┊  │
82///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
83///                       │                     │                            |                     |
84///                       │  DataSource(trait)  │                            | TableProvider(trait)|
85///                       │                     │                            |                     |
86///                       └───────▲─────────────┘                            └─────────────────────┘
87///                               ┊  │                                                  ┊
88///               ┌───────────────┿──┴────────────────┐                                 ┊
89///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
90///               |   ┊                               |                                 ┊
91///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
92///    │                     │             │                     │           ┌──────────▼──────────┐
93///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
94///    │                     │             │                     │           |  FileFormat(trait)  |
95///    └──────────────▲──────┘             └─────────────────────┘           |                     |
96///               │   ┊                                                      └─────────────────────┘
97///               │   ┊                                                                 ┊
98///               │   ┊                                                                 ┊
99///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
100///    │                     │                                               │     ArrowSource     │
101///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
102///    │                     │                                               │    ParquetSource    │
103///    └─────────────────────┘                                               └─────────────────────┘
104///               │
105///               │
106///               │
107///               │
108///    ┌──────────▼──────────┐
109///    │     ArrowSource     │
110///    │          ...        │
111///    │    ParquetSource    │
112///    └─────────────────────┘
113///               |
114/// FileOpener (called by FileStream)
115///               │
116///    ┌──────────▼──────────┐
117///    │                     │
118///    │     RecordBatch     │
119///    │                     │
120///    └─────────────────────┘
121/// ```
122pub trait DataSource: Send + Sync + Debug {
123    fn open(
124        &self,
125        partition: usize,
126        context: Arc<TaskContext>,
127    ) -> Result<SendableRecordBatchStream>;
128    fn as_any(&self) -> &dyn Any;
129    /// Format this source for display in explain plans
130    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
131
132    /// Return a copy of this DataSource with a new partitioning scheme.
133    ///
134    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
135    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
136    ///
137    /// Repartitioning should not change the output ordering, if this ordering exists.
138    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
139    /// and the FileSource's
140    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
141    /// for examples.
142    fn repartitioned(
143        &self,
144        _target_partitions: usize,
145        _repartition_file_min_size: usize,
146        _output_ordering: Option<LexOrdering>,
147    ) -> Result<Option<Arc<dyn DataSource>>> {
148        Ok(None)
149    }
150
151    fn output_partitioning(&self) -> Partitioning;
152    fn eq_properties(&self) -> EquivalenceProperties;
153    fn scheduling_type(&self) -> SchedulingType {
154        SchedulingType::NonCooperative
155    }
156
157    /// Returns statistics for a specific partition, or aggregate statistics
158    /// across all partitions if `partition` is `None`.
159    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
160
161    /// Returns aggregate statistics across all partitions.
162    ///
163    /// # Deprecated
164    /// Use [`Self::partition_statistics`] instead, which provides more fine-grained
165    /// control over statistics retrieval (per-partition or aggregate).
166    #[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
167    fn statistics(&self) -> Result<Statistics> {
168        self.partition_statistics(None)
169    }
170
171    /// Return a copy of this DataSource with a new fetch limit
172    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
173    fn fetch(&self) -> Option<usize>;
174    fn metrics(&self) -> ExecutionPlanMetricsSet {
175        ExecutionPlanMetricsSet::new()
176    }
177    fn try_swapping_with_projection(
178        &self,
179        _projection: &ProjectionExprs,
180    ) -> Result<Option<Arc<dyn DataSource>>>;
181    /// Try to push down filters into this DataSource.
182    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
183    ///
184    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
185    fn try_pushdown_filters(
186        &self,
187        filters: Vec<Arc<dyn PhysicalExpr>>,
188        _config: &ConfigOptions,
189    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
190        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
191            vec![PushedDown::No; filters.len()],
192        ))
193    }
194
195    /// Try to create a new DataSource that produces data in the specified sort order.
196    ///
197    /// # Arguments
198    /// * `order` - The desired output ordering
199    ///
200    /// # Returns
201    /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering
202    /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering
203    /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering
204    /// * `Err(e)` - Error occurred
205    ///
206    /// Default implementation returns `Unsupported`.
207    fn try_pushdown_sort(
208        &self,
209        _order: &[PhysicalSortExpr],
210    ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
211        Ok(SortOrderPushdownResult::Unsupported)
212    }
213}
214
215/// [`ExecutionPlan`] that reads one or more files
216///
217/// `DataSourceExec` implements common functionality such as applying
218/// projections, and caching plan properties.
219///
220/// The [`DataSource`] describes where to find the data for this data source
221/// (for example in files or what in memory partitions).
222///
223/// For file based [`DataSource`]s, format specific behavior is implemented in
224/// the [`FileSource`] trait.
225///
226/// [`FileSource`]: crate::file::FileSource
227#[derive(Clone, Debug)]
228pub struct DataSourceExec {
229    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
230    data_source: Arc<dyn DataSource>,
231    /// Cached plan properties such as sort order
232    cache: PlanProperties,
233}
234
235impl DisplayAs for DataSourceExec {
236    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
237        match t {
238            DisplayFormatType::Default | DisplayFormatType::Verbose => {
239                write!(f, "DataSourceExec: ")?;
240            }
241            DisplayFormatType::TreeRender => {}
242        }
243        self.data_source.fmt_as(t, f)
244    }
245}
246
247impl ExecutionPlan for DataSourceExec {
248    fn name(&self) -> &'static str {
249        "DataSourceExec"
250    }
251
252    fn as_any(&self) -> &dyn Any {
253        self
254    }
255
256    fn properties(&self) -> &PlanProperties {
257        &self.cache
258    }
259
260    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
261        Vec::new()
262    }
263
264    fn with_new_children(
265        self: Arc<Self>,
266        _: Vec<Arc<dyn ExecutionPlan>>,
267    ) -> Result<Arc<dyn ExecutionPlan>> {
268        Ok(self)
269    }
270
271    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
272    ///
273    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
274    /// to [`ExecutionPlan::repartitioned`] for more details.
275    fn repartitioned(
276        &self,
277        target_partitions: usize,
278        config: &ConfigOptions,
279    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
280        let data_source = self.data_source.repartitioned(
281            target_partitions,
282            config.optimizer.repartition_file_min_size,
283            self.properties().eq_properties.output_ordering(),
284        )?;
285
286        Ok(data_source.map(|source| {
287            let output_partitioning = source.output_partitioning();
288            let plan = self
289                .clone()
290                .with_data_source(source)
291                // Changing source partitioning may invalidate output partitioning. Update it also
292                .with_partitioning(output_partitioning);
293            Arc::new(plan) as _
294        }))
295    }
296
297    fn execute(
298        &self,
299        partition: usize,
300        context: Arc<TaskContext>,
301    ) -> Result<SendableRecordBatchStream> {
302        let stream = self.data_source.open(partition, Arc::clone(&context))?;
303        let batch_size = context.session_config().batch_size();
304        log::debug!(
305            "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
306        );
307        let metrics = self.data_source.metrics();
308        let split_metrics = SplitMetrics::new(&metrics, partition);
309        Ok(Box::pin(BatchSplitStream::new(
310            stream,
311            batch_size,
312            split_metrics,
313        )))
314    }
315
316    fn metrics(&self) -> Option<MetricsSet> {
317        Some(self.data_source.metrics().clone_inner())
318    }
319
320    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
321        self.data_source.partition_statistics(partition)
322    }
323
324    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
325        let data_source = self.data_source.with_fetch(limit)?;
326        let cache = self.cache.clone();
327
328        Some(Arc::new(Self { data_source, cache }))
329    }
330
331    fn fetch(&self) -> Option<usize> {
332        self.data_source.fetch()
333    }
334
335    fn try_swapping_with_projection(
336        &self,
337        projection: &ProjectionExec,
338    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
339        match self
340            .data_source
341            .try_swapping_with_projection(projection.projection_expr())?
342        {
343            Some(new_data_source) => {
344                Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
345            }
346            None => Ok(None),
347        }
348    }
349
350    fn handle_child_pushdown_result(
351        &self,
352        _phase: FilterPushdownPhase,
353        child_pushdown_result: ChildPushdownResult,
354        config: &ConfigOptions,
355    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
356        // Push any remaining filters into our data source
357        let parent_filters = child_pushdown_result
358            .parent_filters
359            .into_iter()
360            .map(|f| f.filter)
361            .collect_vec();
362        let res = self
363            .data_source
364            .try_pushdown_filters(parent_filters, config)?;
365        match res.updated_node {
366            Some(data_source) => {
367                let mut new_node = self.clone();
368                new_node.data_source = data_source;
369                // Re-compute properties since we have new filters which will impact equivalence info
370                new_node.cache = Self::compute_properties(&new_node.data_source);
371
372                Ok(FilterPushdownPropagation {
373                    filters: res.filters,
374                    updated_node: Some(Arc::new(new_node)),
375                })
376            }
377            None => Ok(FilterPushdownPropagation {
378                filters: res.filters,
379                updated_node: None,
380            }),
381        }
382    }
383
384    fn try_pushdown_sort(
385        &self,
386        order: &[PhysicalSortExpr],
387    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
388        // Delegate to the data source and wrap result with DataSourceExec
389        self.data_source
390            .try_pushdown_sort(order)?
391            .try_map(|new_data_source| {
392                let new_exec = self.clone().with_data_source(new_data_source);
393                Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
394            })
395    }
396}
397
398impl DataSourceExec {
399    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
400        Arc::new(Self::new(Arc::new(data_source)))
401    }
402
403    // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
404    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
405        let cache = Self::compute_properties(&data_source);
406        Self { data_source, cache }
407    }
408
409    /// Return the source object
410    pub fn data_source(&self) -> &Arc<dyn DataSource> {
411        &self.data_source
412    }
413
414    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
415        self.cache = Self::compute_properties(&data_source);
416        self.data_source = data_source;
417        self
418    }
419
420    /// Assign constraints
421    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
422        self.cache = self.cache.with_constraints(constraints);
423        self
424    }
425
426    /// Assign output partitioning
427    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
428        self.cache = self.cache.with_partitioning(partitioning);
429        self
430    }
431
432    fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
433        PlanProperties::new(
434            data_source.eq_properties(),
435            data_source.output_partitioning(),
436            EmissionType::Incremental,
437            Boundedness::Bounded,
438        )
439        .with_scheduling_type(data_source.scheduling_type())
440    }
441
442    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
443    ///
444    /// Returns `None` if
445    /// 1. the datasource is not scanning files (`FileScanConfig`)
446    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
447    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
448        self.data_source()
449            .as_any()
450            .downcast_ref::<FileScanConfig>()
451            .and_then(|file_scan_conf| {
452                file_scan_conf
453                    .file_source()
454                    .as_any()
455                    .downcast_ref::<T>()
456                    .map(|source| (file_scan_conf, source))
457            })
458    }
459}
460
461/// Create a new `DataSourceExec` from a `DataSource`
462impl<S> From<S> for DataSourceExec
463where
464    S: DataSource + 'static,
465{
466    fn from(source: S) -> Self {
467        Self::new(Arc::new(source))
468    }
469}