datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_expr::equivalence::ProjectionMapping;
26use datafusion_physical_plan::execution_plan::{
27    Boundedness, EmissionType, SchedulingType,
28};
29use datafusion_physical_plan::metrics::SplitMetrics;
30use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
31use datafusion_physical_plan::projection::ProjectionExec;
32use datafusion_physical_plan::stream::BatchSplitStream;
33use datafusion_physical_plan::{
34    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
35};
36use itertools::Itertools;
37
38use crate::file_scan_config::FileScanConfig;
39use datafusion_common::config::ConfigOptions;
40use datafusion_common::{Constraints, Result, Statistics};
41use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42use datafusion_physical_expr::{
43    conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
44};
45use datafusion_physical_expr_common::sort_expr::LexOrdering;
46use datafusion_physical_plan::filter::collect_columns_from_predicate;
47use datafusion_physical_plan::filter_pushdown::{
48    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
49};
50
51/// A source of data, typically a list of files or memory
52///
53/// This trait provides common behaviors for abstract sources of data. It has
54/// two common implementations:
55///
56/// 1. [`FileScanConfig`]: lists of files
57/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
58///
59/// File format specific behaviors are defined by [`FileSource`]
60///
61/// # See Also
62/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
63/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
64///
65/// # Notes
66///
67/// Requires `Debug` to assist debugging
68///
69/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
70/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
71/// [`FileSource`]: crate::file::FileSource
72/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
73/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
74///
75/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
76/// ```text
77///                       ┌─────────────────────┐                              -----► execute path
78///                       │                     │                              ┄┄┄┄┄► init path
79///                       │   DataSourceExec    │  
80///                       │                     │    
81///                       └───────▲─────────────┘
82///                               ┊  │
83///                               ┊  │
84///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
85///                       │                     │                            |                     |
86///                       │  DataSource(trait)  │                            | TableProvider(trait)|
87///                       │                     │                            |                     |
88///                       └───────▲─────────────┘                            └─────────────────────┘
89///                               ┊  │                                                  ┊
90///               ┌───────────────┿──┴────────────────┐                                 ┊
91///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
92///               |   ┊                               |                                 ┊
93///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
94///    │                     │             │                     │           ┌──────────▼──────────┐
95///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
96///    │                     │             │                     │           |  FileFormat(trait)  |
97///    └──────────────▲──────┘             └─────────────────────┘           |                     |
98///               │   ┊                                                      └─────────────────────┘
99///               │   ┊                                                                 ┊
100///               │   ┊                                                                 ┊
101///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
102///    │                     │                                               │     ArrowSource     │
103///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
104///    │                     │                                               │    ParquetSource    │
105///    └─────────────────────┘                                               └─────────────────────┘
106///               │
107///               │
108///               │
109///               │
110///    ┌──────────▼──────────┐
111///    │     ArrowSource     │
112///    │          ...        │
113///    │    ParquetSource    │
114///    └─────────────────────┘
115///               |
116/// FileOpener (called by FileStream)
117///               │
118///    ┌──────────▼──────────┐
119///    │                     │
120///    │     RecordBatch     │
121///    │                     │
122///    └─────────────────────┘
123/// ```
124pub trait DataSource: Send + Sync + Debug {
125    fn open(
126        &self,
127        partition: usize,
128        context: Arc<TaskContext>,
129    ) -> Result<SendableRecordBatchStream>;
130    fn as_any(&self) -> &dyn Any;
131    /// Format this source for display in explain plans
132    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
133
134    /// Return a copy of this DataSource with a new partitioning scheme.
135    ///
136    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
137    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
138    ///
139    /// Repartitioning should not change the output ordering, if this ordering exists.
140    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
141    /// and the FileSource's
142    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
143    /// for examples.
144    fn repartitioned(
145        &self,
146        _target_partitions: usize,
147        _repartition_file_min_size: usize,
148        _output_ordering: Option<LexOrdering>,
149    ) -> Result<Option<Arc<dyn DataSource>>> {
150        Ok(None)
151    }
152
153    fn output_partitioning(&self) -> Partitioning;
154    fn eq_properties(&self) -> EquivalenceProperties;
155    fn scheduling_type(&self) -> SchedulingType {
156        SchedulingType::NonCooperative
157    }
158    fn statistics(&self) -> Result<Statistics>;
159    /// Return a copy of this DataSource with a new fetch limit
160    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
161    fn fetch(&self) -> Option<usize>;
162    fn metrics(&self) -> ExecutionPlanMetricsSet {
163        ExecutionPlanMetricsSet::new()
164    }
165    fn try_swapping_with_projection(
166        &self,
167        _projection: &ProjectionExec,
168    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
169    /// Try to push down filters into this DataSource.
170    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
171    ///
172    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
173    fn try_pushdown_filters(
174        &self,
175        filters: Vec<Arc<dyn PhysicalExpr>>,
176        _config: &ConfigOptions,
177    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
178        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
179            vec![PushedDown::No; filters.len()],
180        ))
181    }
182}
183
184/// [`ExecutionPlan`] that reads one or more files
185///
186/// `DataSourceExec` implements common functionality such as applying
187/// projections, and caching plan properties.
188///
189/// The [`DataSource`] describes where to find the data for this data source
190/// (for example in files or what in memory partitions).
191///
192/// For file based [`DataSource`]s, format specific behavior is implemented in
193/// the [`FileSource`] trait.
194///
195/// [`FileSource`]: crate::file::FileSource
196#[derive(Clone, Debug)]
197pub struct DataSourceExec {
198    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
199    data_source: Arc<dyn DataSource>,
200    /// Cached plan properties such as sort order
201    cache: PlanProperties,
202}
203
204impl DisplayAs for DataSourceExec {
205    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
206        match t {
207            DisplayFormatType::Default | DisplayFormatType::Verbose => {
208                write!(f, "DataSourceExec: ")?;
209            }
210            DisplayFormatType::TreeRender => {}
211        }
212        self.data_source.fmt_as(t, f)
213    }
214}
215
216impl ExecutionPlan for DataSourceExec {
217    fn name(&self) -> &'static str {
218        "DataSourceExec"
219    }
220
221    fn as_any(&self) -> &dyn Any {
222        self
223    }
224
225    fn properties(&self) -> &PlanProperties {
226        &self.cache
227    }
228
229    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
230        Vec::new()
231    }
232
233    fn with_new_children(
234        self: Arc<Self>,
235        _: Vec<Arc<dyn ExecutionPlan>>,
236    ) -> Result<Arc<dyn ExecutionPlan>> {
237        Ok(self)
238    }
239
240    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
241    ///
242    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
243    /// to [`ExecutionPlan::repartitioned`] for more details.
244    fn repartitioned(
245        &self,
246        target_partitions: usize,
247        config: &ConfigOptions,
248    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
249        let data_source = self.data_source.repartitioned(
250            target_partitions,
251            config.optimizer.repartition_file_min_size,
252            self.properties().eq_properties.output_ordering(),
253        )?;
254
255        if let Some(source) = data_source {
256            let output_partitioning = source.output_partitioning();
257            let plan = self
258                .clone()
259                .with_data_source(source)
260                // Changing source partitioning may invalidate output partitioning. Update it also
261                .with_partitioning(output_partitioning);
262            Ok(Some(Arc::new(plan)))
263        } else {
264            Ok(Some(Arc::new(self.clone())))
265        }
266    }
267
268    fn execute(
269        &self,
270        partition: usize,
271        context: Arc<TaskContext>,
272    ) -> Result<SendableRecordBatchStream> {
273        let stream = self.data_source.open(partition, Arc::clone(&context))?;
274        let batch_size = context.session_config().batch_size();
275        log::debug!(
276            "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
277        );
278        let metrics = self.data_source.metrics();
279        let split_metrics = SplitMetrics::new(&metrics, partition);
280        Ok(Box::pin(BatchSplitStream::new(
281            stream,
282            batch_size,
283            split_metrics,
284        )))
285    }
286
287    fn metrics(&self) -> Option<MetricsSet> {
288        Some(self.data_source.metrics().clone_inner())
289    }
290
291    fn statistics(&self) -> Result<Statistics> {
292        self.data_source.statistics()
293    }
294
295    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
296        if let Some(partition) = partition {
297            let mut statistics = Statistics::new_unknown(&self.schema());
298            if let Some(file_config) =
299                self.data_source.as_any().downcast_ref::<FileScanConfig>()
300            {
301                if let Some(file_group) = file_config.file_groups.get(partition) {
302                    if let Some(stat) = file_group.file_statistics(None) {
303                        statistics = stat.clone();
304                    }
305                }
306            }
307            Ok(statistics)
308        } else {
309            Ok(self.data_source.statistics()?)
310        }
311    }
312
313    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
314        let data_source = self.data_source.with_fetch(limit)?;
315        let cache = self.cache.clone();
316
317        Some(Arc::new(Self { data_source, cache }))
318    }
319
320    fn fetch(&self) -> Option<usize> {
321        self.data_source.fetch()
322    }
323
324    fn try_swapping_with_projection(
325        &self,
326        projection: &ProjectionExec,
327    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
328        match self.data_source.try_swapping_with_projection(projection)? {
329            Some(new_plan) => {
330                if let Some(new_data_source_exec) =
331                    new_plan.as_any().downcast_ref::<DataSourceExec>()
332                {
333                    let projection_mapping = ProjectionMapping::try_new(
334                        projection.expr().iter().cloned(),
335                        &self.schema(),
336                    )?;
337
338                    // Project the equivalence properties to the new schema
339                    let projected_eq_properties = self
340                        .cache
341                        .eq_properties
342                        .project(&projection_mapping, new_data_source_exec.schema());
343
344                    let preserved_exec = DataSourceExec {
345                        data_source: Arc::clone(&new_data_source_exec.data_source),
346                        cache: PlanProperties::new(
347                            projected_eq_properties,
348                            new_data_source_exec.cache.partitioning.clone(),
349                            new_data_source_exec.cache.emission_type,
350                            new_data_source_exec.cache.boundedness,
351                        )
352                        .with_scheduling_type(new_data_source_exec.cache.scheduling_type),
353                    };
354                    Ok(Some(Arc::new(preserved_exec)))
355                } else {
356                    Ok(Some(new_plan))
357                }
358            }
359            None => Ok(None),
360        }
361    }
362
363    fn handle_child_pushdown_result(
364        &self,
365        _phase: FilterPushdownPhase,
366        child_pushdown_result: ChildPushdownResult,
367        config: &ConfigOptions,
368    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
369        // Push any remaining filters into our data source
370        let parent_filters = child_pushdown_result
371            .parent_filters
372            .into_iter()
373            .map(|f| f.filter)
374            .collect_vec();
375        let res = self
376            .data_source
377            .try_pushdown_filters(parent_filters.clone(), config)?;
378        match res.updated_node {
379            Some(data_source) => {
380                let mut new_node = self.clone();
381                new_node.data_source = data_source;
382                new_node.cache =
383                    Self::compute_properties(Arc::clone(&new_node.data_source));
384
385                // Recompute equivalence info using new filters
386                let filter = conjunction(
387                    res.filters
388                        .iter()
389                        .zip(parent_filters)
390                        .filter_map(|(s, f)| match s {
391                            PushedDown::Yes => Some(f),
392                            PushedDown::No => None,
393                        })
394                        .collect_vec(),
395                );
396                new_node = new_node.add_filter_equivalence_info(filter)?;
397                Ok(FilterPushdownPropagation {
398                    filters: res.filters,
399                    updated_node: Some(Arc::new(new_node)),
400                })
401            }
402            None => Ok(FilterPushdownPropagation {
403                filters: res.filters,
404                updated_node: None,
405            }),
406        }
407    }
408}
409
410impl DataSourceExec {
411    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
412        Arc::new(Self::new(Arc::new(data_source)))
413    }
414
415    // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
416    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
417        let cache = Self::compute_properties(Arc::clone(&data_source));
418        Self { data_source, cache }
419    }
420
421    /// Return the source object
422    pub fn data_source(&self) -> &Arc<dyn DataSource> {
423        &self.data_source
424    }
425
426    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
427        self.cache = Self::compute_properties(Arc::clone(&data_source));
428        self.data_source = data_source;
429        self
430    }
431
432    /// Assign constraints
433    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
434        self.cache = self.cache.with_constraints(constraints);
435        self
436    }
437
438    /// Assign output partitioning
439    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
440        self.cache = self.cache.with_partitioning(partitioning);
441        self
442    }
443
444    /// Add filters' equivalence info
445    fn add_filter_equivalence_info(
446        mut self,
447        filter: Arc<dyn PhysicalExpr>,
448    ) -> Result<Self> {
449        let (equal_pairs, _) = collect_columns_from_predicate(&filter);
450        for (lhs, rhs) in equal_pairs {
451            self.cache
452                .eq_properties
453                .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
454        }
455        Ok(self)
456    }
457
458    fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
459        PlanProperties::new(
460            data_source.eq_properties(),
461            data_source.output_partitioning(),
462            EmissionType::Incremental,
463            Boundedness::Bounded,
464        )
465        .with_scheduling_type(data_source.scheduling_type())
466    }
467
468    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
469    ///
470    /// Returns `None` if
471    /// 1. the datasource is not scanning files (`FileScanConfig`)
472    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
473    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
474        self.data_source()
475            .as_any()
476            .downcast_ref::<FileScanConfig>()
477            .and_then(|file_scan_conf| {
478                file_scan_conf
479                    .file_source()
480                    .as_any()
481                    .downcast_ref::<T>()
482                    .map(|source| (file_scan_conf, source))
483            })
484    }
485}
486
487/// Create a new `DataSourceExec` from a `DataSource`
488impl<S> From<S> for DataSourceExec
489where
490    S: DataSource + 'static,
491{
492    fn from(source: S) -> Self {
493        Self::new(Arc::new(source))
494    }
495}