datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{
26    Boundedness, EmissionType, SchedulingType,
27};
28use datafusion_physical_plan::metrics::SplitMetrics;
29use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
30use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
31use datafusion_physical_plan::stream::BatchSplitStream;
32use datafusion_physical_plan::{
33    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use itertools::Itertools;
36
37use crate::file_scan_config::FileScanConfig;
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{Constraints, Result, Statistics};
40use datafusion_execution::{SendableRecordBatchStream, TaskContext};
41use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
42use datafusion_physical_expr_common::sort_expr::LexOrdering;
43use datafusion_physical_plan::filter_pushdown::{
44    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
45};
46
47/// A source of data, typically a list of files or memory
48///
49/// This trait provides common behaviors for abstract sources of data. It has
50/// two common implementations:
51///
52/// 1. [`FileScanConfig`]: lists of files
53/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
54///
55/// File format specific behaviors are defined by [`FileSource`]
56///
57/// # See Also
58/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
59/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
60///
61/// # Notes
62///
63/// Requires `Debug` to assist debugging
64///
65/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
66/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
67/// [`FileSource`]: crate::file::FileSource
68/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
69/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
70///
71/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
72/// ```text
73///                       ┌─────────────────────┐                              -----► execute path
74///                       │                     │                              ┄┄┄┄┄► init path
75///                       │   DataSourceExec    │  
76///                       │                     │    
77///                       └───────▲─────────────┘
78///                               ┊  │
79///                               ┊  │
80///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
81///                       │                     │                            |                     |
82///                       │  DataSource(trait)  │                            | TableProvider(trait)|
83///                       │                     │                            |                     |
84///                       └───────▲─────────────┘                            └─────────────────────┘
85///                               ┊  │                                                  ┊
86///               ┌───────────────┿──┴────────────────┐                                 ┊
87///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
88///               |   ┊                               |                                 ┊
89///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
90///    │                     │             │                     │           ┌──────────▼──────────┐
91///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
92///    │                     │             │                     │           |  FileFormat(trait)  |
93///    └──────────────▲──────┘             └─────────────────────┘           |                     |
94///               │   ┊                                                      └─────────────────────┘
95///               │   ┊                                                                 ┊
96///               │   ┊                                                                 ┊
97///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
98///    │                     │                                               │     ArrowSource     │
99///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
100///    │                     │                                               │    ParquetSource    │
101///    └─────────────────────┘                                               └─────────────────────┘
102///               │
103///               │
104///               │
105///               │
106///    ┌──────────▼──────────┐
107///    │     ArrowSource     │
108///    │          ...        │
109///    │    ParquetSource    │
110///    └─────────────────────┘
111///               |
112/// FileOpener (called by FileStream)
113///               │
114///    ┌──────────▼──────────┐
115///    │                     │
116///    │     RecordBatch     │
117///    │                     │
118///    └─────────────────────┘
119/// ```
120pub trait DataSource: Send + Sync + Debug {
121    fn open(
122        &self,
123        partition: usize,
124        context: Arc<TaskContext>,
125    ) -> Result<SendableRecordBatchStream>;
126    fn as_any(&self) -> &dyn Any;
127    /// Format this source for display in explain plans
128    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
129
130    /// Return a copy of this DataSource with a new partitioning scheme.
131    ///
132    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
133    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
134    ///
135    /// Repartitioning should not change the output ordering, if this ordering exists.
136    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
137    /// and the FileSource's
138    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
139    /// for examples.
140    fn repartitioned(
141        &self,
142        _target_partitions: usize,
143        _repartition_file_min_size: usize,
144        _output_ordering: Option<LexOrdering>,
145    ) -> Result<Option<Arc<dyn DataSource>>> {
146        Ok(None)
147    }
148
149    fn output_partitioning(&self) -> Partitioning;
150    fn eq_properties(&self) -> EquivalenceProperties;
151    fn scheduling_type(&self) -> SchedulingType {
152        SchedulingType::NonCooperative
153    }
154
155    /// Returns statistics for a specific partition, or aggregate statistics
156    /// across all partitions if `partition` is `None`.
157    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
158
159    /// Returns aggregate statistics across all partitions.
160    ///
161    /// # Deprecated
162    /// Use [`Self::partition_statistics`] instead, which provides more fine-grained
163    /// control over statistics retrieval (per-partition or aggregate).
164    #[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
165    fn statistics(&self) -> Result<Statistics> {
166        self.partition_statistics(None)
167    }
168
169    /// Return a copy of this DataSource with a new fetch limit
170    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
171    fn fetch(&self) -> Option<usize>;
172    fn metrics(&self) -> ExecutionPlanMetricsSet {
173        ExecutionPlanMetricsSet::new()
174    }
175    fn try_swapping_with_projection(
176        &self,
177        _projection: &[ProjectionExpr],
178    ) -> Result<Option<Arc<dyn DataSource>>>;
179    /// Try to push down filters into this DataSource.
180    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
181    ///
182    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
183    fn try_pushdown_filters(
184        &self,
185        filters: Vec<Arc<dyn PhysicalExpr>>,
186        _config: &ConfigOptions,
187    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
188        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
189            vec![PushedDown::No; filters.len()],
190        ))
191    }
192}
193
194/// [`ExecutionPlan`] that reads one or more files
195///
196/// `DataSourceExec` implements common functionality such as applying
197/// projections, and caching plan properties.
198///
199/// The [`DataSource`] describes where to find the data for this data source
200/// (for example in files or what in memory partitions).
201///
202/// For file based [`DataSource`]s, format specific behavior is implemented in
203/// the [`FileSource`] trait.
204///
205/// [`FileSource`]: crate::file::FileSource
206#[derive(Clone, Debug)]
207pub struct DataSourceExec {
208    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
209    data_source: Arc<dyn DataSource>,
210    /// Cached plan properties such as sort order
211    cache: PlanProperties,
212}
213
214impl DisplayAs for DataSourceExec {
215    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
216        match t {
217            DisplayFormatType::Default | DisplayFormatType::Verbose => {
218                write!(f, "DataSourceExec: ")?;
219            }
220            DisplayFormatType::TreeRender => {}
221        }
222        self.data_source.fmt_as(t, f)
223    }
224}
225
226impl ExecutionPlan for DataSourceExec {
227    fn name(&self) -> &'static str {
228        "DataSourceExec"
229    }
230
231    fn as_any(&self) -> &dyn Any {
232        self
233    }
234
235    fn properties(&self) -> &PlanProperties {
236        &self.cache
237    }
238
239    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
240        Vec::new()
241    }
242
243    fn with_new_children(
244        self: Arc<Self>,
245        _: Vec<Arc<dyn ExecutionPlan>>,
246    ) -> Result<Arc<dyn ExecutionPlan>> {
247        Ok(self)
248    }
249
250    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
251    ///
252    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
253    /// to [`ExecutionPlan::repartitioned`] for more details.
254    fn repartitioned(
255        &self,
256        target_partitions: usize,
257        config: &ConfigOptions,
258    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
259        let data_source = self.data_source.repartitioned(
260            target_partitions,
261            config.optimizer.repartition_file_min_size,
262            self.properties().eq_properties.output_ordering(),
263        )?;
264
265        if let Some(source) = data_source {
266            let output_partitioning = source.output_partitioning();
267            let plan = self
268                .clone()
269                .with_data_source(source)
270                // Changing source partitioning may invalidate output partitioning. Update it also
271                .with_partitioning(output_partitioning);
272            Ok(Some(Arc::new(plan)))
273        } else {
274            Ok(Some(Arc::new(self.clone())))
275        }
276    }
277
278    fn execute(
279        &self,
280        partition: usize,
281        context: Arc<TaskContext>,
282    ) -> Result<SendableRecordBatchStream> {
283        let stream = self.data_source.open(partition, Arc::clone(&context))?;
284        let batch_size = context.session_config().batch_size();
285        log::debug!(
286            "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
287        );
288        let metrics = self.data_source.metrics();
289        let split_metrics = SplitMetrics::new(&metrics, partition);
290        Ok(Box::pin(BatchSplitStream::new(
291            stream,
292            batch_size,
293            split_metrics,
294        )))
295    }
296
297    fn metrics(&self) -> Option<MetricsSet> {
298        Some(self.data_source.metrics().clone_inner())
299    }
300
301    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
302        self.data_source.partition_statistics(partition)
303    }
304
305    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
306        let data_source = self.data_source.with_fetch(limit)?;
307        let cache = self.cache.clone();
308
309        Some(Arc::new(Self { data_source, cache }))
310    }
311
312    fn fetch(&self) -> Option<usize> {
313        self.data_source.fetch()
314    }
315
316    fn try_swapping_with_projection(
317        &self,
318        projection: &ProjectionExec,
319    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
320        match self
321            .data_source
322            .try_swapping_with_projection(projection.expr())?
323        {
324            Some(new_data_source) => {
325                Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
326            }
327            None => Ok(None),
328        }
329    }
330
331    fn handle_child_pushdown_result(
332        &self,
333        _phase: FilterPushdownPhase,
334        child_pushdown_result: ChildPushdownResult,
335        config: &ConfigOptions,
336    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
337        // Push any remaining filters into our data source
338        let parent_filters = child_pushdown_result
339            .parent_filters
340            .into_iter()
341            .map(|f| f.filter)
342            .collect_vec();
343        let res = self
344            .data_source
345            .try_pushdown_filters(parent_filters.clone(), config)?;
346        match res.updated_node {
347            Some(data_source) => {
348                let mut new_node = self.clone();
349                new_node.data_source = data_source;
350                // Re-compute properties since we have new filters which will impact equivalence info
351                new_node.cache =
352                    Self::compute_properties(Arc::clone(&new_node.data_source));
353
354                Ok(FilterPushdownPropagation {
355                    filters: res.filters,
356                    updated_node: Some(Arc::new(new_node)),
357                })
358            }
359            None => Ok(FilterPushdownPropagation {
360                filters: res.filters,
361                updated_node: None,
362            }),
363        }
364    }
365}
366
367impl DataSourceExec {
368    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
369        Arc::new(Self::new(Arc::new(data_source)))
370    }
371
372    // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
373    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
374        let cache = Self::compute_properties(Arc::clone(&data_source));
375        Self { data_source, cache }
376    }
377
378    /// Return the source object
379    pub fn data_source(&self) -> &Arc<dyn DataSource> {
380        &self.data_source
381    }
382
383    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
384        self.cache = Self::compute_properties(Arc::clone(&data_source));
385        self.data_source = data_source;
386        self
387    }
388
389    /// Assign constraints
390    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
391        self.cache = self.cache.with_constraints(constraints);
392        self
393    }
394
395    /// Assign output partitioning
396    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
397        self.cache = self.cache.with_partitioning(partitioning);
398        self
399    }
400
401    fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
402        PlanProperties::new(
403            data_source.eq_properties(),
404            data_source.output_partitioning(),
405            EmissionType::Incremental,
406            Boundedness::Bounded,
407        )
408        .with_scheduling_type(data_source.scheduling_type())
409    }
410
411    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
412    ///
413    /// Returns `None` if
414    /// 1. the datasource is not scanning files (`FileScanConfig`)
415    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
416    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
417        self.data_source()
418            .as_any()
419            .downcast_ref::<FileScanConfig>()
420            .and_then(|file_scan_conf| {
421                file_scan_conf
422                    .file_source()
423                    .as_any()
424                    .downcast_ref::<T>()
425                    .map(|source| (file_scan_conf, source))
426            })
427    }
428}
429
430/// Create a new `DataSourceExec` from a `DataSource`
431impl<S> From<S> for DataSourceExec
432where
433    S: DataSource + 'static,
434{
435    fn from(source: S) -> Self {
436        Self::new(Arc::new(source))
437    }
438}