datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{
26    Boundedness, EmissionType, SchedulingType,
27};
28use datafusion_physical_plan::metrics::SplitMetrics;
29use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
30use datafusion_physical_plan::projection::ProjectionExec;
31use datafusion_physical_plan::stream::BatchSplitStream;
32use datafusion_physical_plan::{
33    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use itertools::Itertools;
36
37use crate::file_scan_config::FileScanConfig;
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{Constraints, Result, Statistics};
40use datafusion_execution::{SendableRecordBatchStream, TaskContext};
41use datafusion_physical_expr::{
42    conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
43};
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::filter::collect_columns_from_predicate;
46use datafusion_physical_plan::filter_pushdown::{
47    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
48};
49
50/// A source of data, typically a list of files or memory
51///
52/// This trait provides common behaviors for abstract sources of data. It has
53/// two common implementations:
54///
55/// 1. [`FileScanConfig`]: lists of files
56/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
57///
58/// File format specific behaviors are defined by [`FileSource`]
59///
60/// # See Also
61/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
62/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
63///
64/// # Notes
65///
66/// Requires `Debug` to assist debugging
67///
68/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
69/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
70/// [`FileSource`]: crate::file::FileSource
71/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
72/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
73///
74/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
75/// ```text
76///                       ┌─────────────────────┐                              -----► execute path
77///                       │                     │                              ┄┄┄┄┄► init path
78///                       │   DataSourceExec    │  
79///                       │                     │    
80///                       └───────▲─────────────┘
81///                               ┊  │
82///                               ┊  │
83///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
84///                       │                     │                            |                     |
85///                       │  DataSource(trait)  │                            | TableProvider(trait)|
86///                       │                     │                            |                     |
87///                       └───────▲─────────────┘                            └─────────────────────┘
88///                               ┊  │                                                  ┊
89///               ┌───────────────┿──┴────────────────┐                                 ┊
90///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
91///               |   ┊                               |                                 ┊
92///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
93///    │                     │             │                     │           ┌──────────▼──────────┐
94///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
95///    │                     │             │                     │           |  FileFormat(trait)  |
96///    └──────────────▲──────┘             └─────────────────────┘           |                     |
97///               │   ┊                                                      └─────────────────────┘
98///               │   ┊                                                                 ┊
99///               │   ┊                                                                 ┊
100///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
101///    │                     │                                               │     ArrowSource     │
102///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
103///    │                     │                                               │    ParquetSource    │
104///    └─────────────────────┘                                               └─────────────────────┘
105///               │
106///               │
107///               │
108///               │
109///    ┌──────────▼──────────┐
110///    │     ArrowSource     │
111///    │          ...        │
112///    │    ParquetSource    │
113///    └─────────────────────┘
114///               |
115/// FileOpener (called by FileStream)
116///               │
117///    ┌──────────▼──────────┐
118///    │                     │
119///    │     RecordBatch     │
120///    │                     │
121///    └─────────────────────┘
122/// ```
123pub trait DataSource: Send + Sync + Debug {
124    fn open(
125        &self,
126        partition: usize,
127        context: Arc<TaskContext>,
128    ) -> Result<SendableRecordBatchStream>;
129    fn as_any(&self) -> &dyn Any;
130    /// Format this source for display in explain plans
131    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
132
133    /// Return a copy of this DataSource with a new partitioning scheme.
134    ///
135    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
136    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
137    ///
138    /// Repartitioning should not change the output ordering, if this ordering exists.
139    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
140    /// and the FileSource's
141    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
142    /// for examples.
143    fn repartitioned(
144        &self,
145        _target_partitions: usize,
146        _repartition_file_min_size: usize,
147        _output_ordering: Option<LexOrdering>,
148    ) -> Result<Option<Arc<dyn DataSource>>> {
149        Ok(None)
150    }
151
152    fn output_partitioning(&self) -> Partitioning;
153    fn eq_properties(&self) -> EquivalenceProperties;
154    fn scheduling_type(&self) -> SchedulingType {
155        SchedulingType::NonCooperative
156    }
157    fn statistics(&self) -> Result<Statistics>;
158    /// Return a copy of this DataSource with a new fetch limit
159    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
160    fn fetch(&self) -> Option<usize>;
161    fn metrics(&self) -> ExecutionPlanMetricsSet {
162        ExecutionPlanMetricsSet::new()
163    }
164    fn try_swapping_with_projection(
165        &self,
166        _projection: &ProjectionExec,
167    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
168    /// Try to push down filters into this DataSource.
169    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
170    ///
171    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
172    fn try_pushdown_filters(
173        &self,
174        filters: Vec<Arc<dyn PhysicalExpr>>,
175        _config: &ConfigOptions,
176    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
177        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
178            vec![PushedDown::No; filters.len()],
179        ))
180    }
181}
182
183/// [`ExecutionPlan`] that reads one or more files
184///
185/// `DataSourceExec` implements common functionality such as applying
186/// projections, and caching plan properties.
187///
188/// The [`DataSource`] describes where to find the data for this data source
189/// (for example in files or what in memory partitions).
190///
191/// For file based [`DataSource`]s, format specific behavior is implemented in
192/// the [`FileSource`] trait.
193///
194/// [`FileSource`]: crate::file::FileSource
195#[derive(Clone, Debug)]
196pub struct DataSourceExec {
197    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
198    data_source: Arc<dyn DataSource>,
199    /// Cached plan properties such as sort order
200    cache: PlanProperties,
201}
202
203impl DisplayAs for DataSourceExec {
204    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
205        match t {
206            DisplayFormatType::Default | DisplayFormatType::Verbose => {
207                write!(f, "DataSourceExec: ")?;
208            }
209            DisplayFormatType::TreeRender => {}
210        }
211        self.data_source.fmt_as(t, f)
212    }
213}
214
215impl ExecutionPlan for DataSourceExec {
216    fn name(&self) -> &'static str {
217        "DataSourceExec"
218    }
219
220    fn as_any(&self) -> &dyn Any {
221        self
222    }
223
224    fn properties(&self) -> &PlanProperties {
225        &self.cache
226    }
227
228    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229        Vec::new()
230    }
231
232    fn with_new_children(
233        self: Arc<Self>,
234        _: Vec<Arc<dyn ExecutionPlan>>,
235    ) -> Result<Arc<dyn ExecutionPlan>> {
236        Ok(self)
237    }
238
239    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
240    ///
241    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
242    /// to [`ExecutionPlan::repartitioned`] for more details.
243    fn repartitioned(
244        &self,
245        target_partitions: usize,
246        config: &ConfigOptions,
247    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
248        let data_source = self.data_source.repartitioned(
249            target_partitions,
250            config.optimizer.repartition_file_min_size,
251            self.properties().eq_properties.output_ordering(),
252        )?;
253
254        if let Some(source) = data_source {
255            let output_partitioning = source.output_partitioning();
256            let plan = self
257                .clone()
258                .with_data_source(source)
259                // Changing source partitioning may invalidate output partitioning. Update it also
260                .with_partitioning(output_partitioning);
261            Ok(Some(Arc::new(plan)))
262        } else {
263            Ok(Some(Arc::new(self.clone())))
264        }
265    }
266
267    fn execute(
268        &self,
269        partition: usize,
270        context: Arc<TaskContext>,
271    ) -> Result<SendableRecordBatchStream> {
272        let stream = self.data_source.open(partition, Arc::clone(&context))?;
273        let batch_size = context.session_config().batch_size();
274        log::debug!(
275            "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
276        );
277        let metrics = self.data_source.metrics();
278        let split_metrics = SplitMetrics::new(&metrics, partition);
279        Ok(Box::pin(BatchSplitStream::new(
280            stream,
281            batch_size,
282            split_metrics,
283        )))
284    }
285
286    fn metrics(&self) -> Option<MetricsSet> {
287        Some(self.data_source.metrics().clone_inner())
288    }
289
290    fn statistics(&self) -> Result<Statistics> {
291        self.data_source.statistics()
292    }
293
294    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
295        if let Some(partition) = partition {
296            let mut statistics = Statistics::new_unknown(&self.schema());
297            if let Some(file_config) =
298                self.data_source.as_any().downcast_ref::<FileScanConfig>()
299            {
300                if let Some(file_group) = file_config.file_groups.get(partition) {
301                    if let Some(stat) = file_group.file_statistics(None) {
302                        statistics = stat.clone();
303                    }
304                }
305            }
306            Ok(statistics)
307        } else {
308            Ok(self.data_source.statistics()?)
309        }
310    }
311
312    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
313        let data_source = self.data_source.with_fetch(limit)?;
314        let cache = self.cache.clone();
315
316        Some(Arc::new(Self { data_source, cache }))
317    }
318
319    fn fetch(&self) -> Option<usize> {
320        self.data_source.fetch()
321    }
322
323    fn try_swapping_with_projection(
324        &self,
325        projection: &ProjectionExec,
326    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
327        self.data_source.try_swapping_with_projection(projection)
328    }
329
330    fn handle_child_pushdown_result(
331        &self,
332        _phase: FilterPushdownPhase,
333        child_pushdown_result: ChildPushdownResult,
334        config: &ConfigOptions,
335    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
336        // Push any remaining filters into our data source
337        let parent_filters = child_pushdown_result
338            .parent_filters
339            .into_iter()
340            .map(|f| f.filter)
341            .collect_vec();
342        let res = self
343            .data_source
344            .try_pushdown_filters(parent_filters.clone(), config)?;
345        match res.updated_node {
346            Some(data_source) => {
347                let mut new_node = self.clone();
348                new_node.data_source = data_source;
349                new_node.cache =
350                    Self::compute_properties(Arc::clone(&new_node.data_source));
351
352                // Recompute equivalence info using new filters
353                let filter = conjunction(
354                    res.filters
355                        .iter()
356                        .zip(parent_filters)
357                        .filter_map(|(s, f)| match s {
358                            PushedDown::Yes => Some(f),
359                            PushedDown::No => None,
360                        })
361                        .collect_vec(),
362                );
363                new_node = new_node.add_filter_equivalence_info(filter)?;
364                Ok(FilterPushdownPropagation {
365                    filters: res.filters,
366                    updated_node: Some(Arc::new(new_node)),
367                })
368            }
369            None => Ok(FilterPushdownPropagation {
370                filters: res.filters,
371                updated_node: None,
372            }),
373        }
374    }
375}
376
377impl DataSourceExec {
378    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
379        Arc::new(Self::new(Arc::new(data_source)))
380    }
381
382    // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
383    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
384        let cache = Self::compute_properties(Arc::clone(&data_source));
385        Self { data_source, cache }
386    }
387
388    /// Return the source object
389    pub fn data_source(&self) -> &Arc<dyn DataSource> {
390        &self.data_source
391    }
392
393    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
394        self.cache = Self::compute_properties(Arc::clone(&data_source));
395        self.data_source = data_source;
396        self
397    }
398
399    /// Assign constraints
400    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
401        self.cache = self.cache.with_constraints(constraints);
402        self
403    }
404
405    /// Assign output partitioning
406    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
407        self.cache = self.cache.with_partitioning(partitioning);
408        self
409    }
410
411    /// Add filters' equivalence info
412    fn add_filter_equivalence_info(
413        mut self,
414        filter: Arc<dyn PhysicalExpr>,
415    ) -> Result<Self> {
416        let (equal_pairs, _) = collect_columns_from_predicate(&filter);
417        for (lhs, rhs) in equal_pairs {
418            self.cache
419                .eq_properties
420                .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
421        }
422        Ok(self)
423    }
424
425    fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
426        PlanProperties::new(
427            data_source.eq_properties(),
428            data_source.output_partitioning(),
429            EmissionType::Incremental,
430            Boundedness::Bounded,
431        )
432        .with_scheduling_type(data_source.scheduling_type())
433    }
434
435    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
436    ///
437    /// Returns `None` if
438    /// 1. the datasource is not scanning files (`FileScanConfig`)
439    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
440    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
441        self.data_source()
442            .as_any()
443            .downcast_ref::<FileScanConfig>()
444            .and_then(|file_scan_conf| {
445                file_scan_conf
446                    .file_source()
447                    .as_any()
448                    .downcast_ref::<T>()
449                    .map(|source| (file_scan_conf, source))
450            })
451    }
452}
453
454/// Create a new `DataSourceExec` from a `DataSource`
455impl<S> From<S> for DataSourceExec
456where
457    S: DataSource + 'static,
458{
459    fn from(source: S) -> Self {
460        Self::new(Arc::new(source))
461    }
462}