datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{
26    Boundedness, EmissionType, SchedulingType,
27};
28use datafusion_physical_plan::metrics::SplitMetrics;
29use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
30use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
31use datafusion_physical_plan::stream::BatchSplitStream;
32use datafusion_physical_plan::{
33    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use itertools::Itertools;
36
37use crate::file_scan_config::FileScanConfig;
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{Constraints, Result, Statistics};
40use datafusion_execution::{SendableRecordBatchStream, TaskContext};
41use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
42use datafusion_physical_expr_common::sort_expr::LexOrdering;
43use datafusion_physical_plan::filter_pushdown::{
44    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
45};
46
47/// A source of data, typically a list of files or memory
48///
49/// This trait provides common behaviors for abstract sources of data. It has
50/// two common implementations:
51///
52/// 1. [`FileScanConfig`]: lists of files
53/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
54///
55/// File format specific behaviors are defined by [`FileSource`]
56///
57/// # See Also
58/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
59/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
60///
61/// # Notes
62///
63/// Requires `Debug` to assist debugging
64///
65/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
66/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
67/// [`FileSource`]: crate::file::FileSource
68/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
69/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
70///
71/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
72/// ```text
73///                       ┌─────────────────────┐                              -----► execute path
74///                       │                     │                              ┄┄┄┄┄► init path
75///                       │   DataSourceExec    │  
76///                       │                     │    
77///                       └───────▲─────────────┘
78///                               ┊  │
79///                               ┊  │
80///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
81///                       │                     │                            |                     |
82///                       │  DataSource(trait)  │                            | TableProvider(trait)|
83///                       │                     │                            |                     |
84///                       └───────▲─────────────┘                            └─────────────────────┘
85///                               ┊  │                                                  ┊
86///               ┌───────────────┿──┴────────────────┐                                 ┊
87///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
88///               |   ┊                               |                                 ┊
89///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
90///    │                     │             │                     │           ┌──────────▼──────────┐
91///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
92///    │                     │             │                     │           |  FileFormat(trait)  |
93///    └──────────────▲──────┘             └─────────────────────┘           |                     |
94///               │   ┊                                                      └─────────────────────┘
95///               │   ┊                                                                 ┊
96///               │   ┊                                                                 ┊
97///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
98///    │                     │                                               │     ArrowSource     │
99///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
100///    │                     │                                               │    ParquetSource    │
101///    └─────────────────────┘                                               └─────────────────────┘
102///               │
103///               │
104///               │
105///               │
106///    ┌──────────▼──────────┐
107///    │     ArrowSource     │
108///    │          ...        │
109///    │    ParquetSource    │
110///    └─────────────────────┘
111///               |
112/// FileOpener (called by FileStream)
113///               │
114///    ┌──────────▼──────────┐
115///    │                     │
116///    │     RecordBatch     │
117///    │                     │
118///    └─────────────────────┘
119/// ```
120pub trait DataSource: Send + Sync + Debug {
121    fn open(
122        &self,
123        partition: usize,
124        context: Arc<TaskContext>,
125    ) -> Result<SendableRecordBatchStream>;
126    fn as_any(&self) -> &dyn Any;
127    /// Format this source for display in explain plans
128    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
129
130    /// Return a copy of this DataSource with a new partitioning scheme.
131    ///
132    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
133    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
134    ///
135    /// Repartitioning should not change the output ordering, if this ordering exists.
136    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
137    /// and the FileSource's
138    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
139    /// for examples.
140    fn repartitioned(
141        &self,
142        _target_partitions: usize,
143        _repartition_file_min_size: usize,
144        _output_ordering: Option<LexOrdering>,
145    ) -> Result<Option<Arc<dyn DataSource>>> {
146        Ok(None)
147    }
148
149    fn output_partitioning(&self) -> Partitioning;
150    fn eq_properties(&self) -> EquivalenceProperties;
151    fn scheduling_type(&self) -> SchedulingType {
152        SchedulingType::NonCooperative
153    }
154    fn statistics(&self) -> Result<Statistics>;
155    /// Return a copy of this DataSource with a new fetch limit
156    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
157    fn fetch(&self) -> Option<usize>;
158    fn metrics(&self) -> ExecutionPlanMetricsSet {
159        ExecutionPlanMetricsSet::new()
160    }
161    fn try_swapping_with_projection(
162        &self,
163        _projection: &[ProjectionExpr],
164    ) -> Result<Option<Arc<dyn DataSource>>>;
165    /// Try to push down filters into this DataSource.
166    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
167    ///
168    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
169    fn try_pushdown_filters(
170        &self,
171        filters: Vec<Arc<dyn PhysicalExpr>>,
172        _config: &ConfigOptions,
173    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
174        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
175            vec![PushedDown::No; filters.len()],
176        ))
177    }
178}
179
180/// [`ExecutionPlan`] that reads one or more files
181///
182/// `DataSourceExec` implements common functionality such as applying
183/// projections, and caching plan properties.
184///
185/// The [`DataSource`] describes where to find the data for this data source
186/// (for example in files or what in memory partitions).
187///
188/// For file based [`DataSource`]s, format specific behavior is implemented in
189/// the [`FileSource`] trait.
190///
191/// [`FileSource`]: crate::file::FileSource
192#[derive(Clone, Debug)]
193pub struct DataSourceExec {
194    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
195    data_source: Arc<dyn DataSource>,
196    /// Cached plan properties such as sort order
197    cache: PlanProperties,
198}
199
200impl DisplayAs for DataSourceExec {
201    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
202        match t {
203            DisplayFormatType::Default | DisplayFormatType::Verbose => {
204                write!(f, "DataSourceExec: ")?;
205            }
206            DisplayFormatType::TreeRender => {}
207        }
208        self.data_source.fmt_as(t, f)
209    }
210}
211
212impl ExecutionPlan for DataSourceExec {
213    fn name(&self) -> &'static str {
214        "DataSourceExec"
215    }
216
217    fn as_any(&self) -> &dyn Any {
218        self
219    }
220
221    fn properties(&self) -> &PlanProperties {
222        &self.cache
223    }
224
225    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
226        Vec::new()
227    }
228
229    fn with_new_children(
230        self: Arc<Self>,
231        _: Vec<Arc<dyn ExecutionPlan>>,
232    ) -> Result<Arc<dyn ExecutionPlan>> {
233        Ok(self)
234    }
235
236    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
237    ///
238    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
239    /// to [`ExecutionPlan::repartitioned`] for more details.
240    fn repartitioned(
241        &self,
242        target_partitions: usize,
243        config: &ConfigOptions,
244    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
245        let data_source = self.data_source.repartitioned(
246            target_partitions,
247            config.optimizer.repartition_file_min_size,
248            self.properties().eq_properties.output_ordering(),
249        )?;
250
251        if let Some(source) = data_source {
252            let output_partitioning = source.output_partitioning();
253            let plan = self
254                .clone()
255                .with_data_source(source)
256                // Changing source partitioning may invalidate output partitioning. Update it also
257                .with_partitioning(output_partitioning);
258            Ok(Some(Arc::new(plan)))
259        } else {
260            Ok(Some(Arc::new(self.clone())))
261        }
262    }
263
264    fn execute(
265        &self,
266        partition: usize,
267        context: Arc<TaskContext>,
268    ) -> Result<SendableRecordBatchStream> {
269        let stream = self.data_source.open(partition, Arc::clone(&context))?;
270        let batch_size = context.session_config().batch_size();
271        log::debug!(
272            "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
273        );
274        let metrics = self.data_source.metrics();
275        let split_metrics = SplitMetrics::new(&metrics, partition);
276        Ok(Box::pin(BatchSplitStream::new(
277            stream,
278            batch_size,
279            split_metrics,
280        )))
281    }
282
283    fn metrics(&self) -> Option<MetricsSet> {
284        Some(self.data_source.metrics().clone_inner())
285    }
286
287    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
288        if let Some(partition) = partition {
289            let mut statistics = Statistics::new_unknown(&self.schema());
290            if let Some(file_config) =
291                self.data_source.as_any().downcast_ref::<FileScanConfig>()
292            {
293                if let Some(file_group) = file_config.file_groups.get(partition) {
294                    if let Some(stat) = file_group.file_statistics(None) {
295                        statistics = stat.clone();
296                    }
297                }
298            }
299            Ok(statistics)
300        } else {
301            Ok(self.data_source.statistics()?)
302        }
303    }
304
305    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
306        let data_source = self.data_source.with_fetch(limit)?;
307        let cache = self.cache.clone();
308
309        Some(Arc::new(Self { data_source, cache }))
310    }
311
312    fn fetch(&self) -> Option<usize> {
313        self.data_source.fetch()
314    }
315
316    fn try_swapping_with_projection(
317        &self,
318        projection: &ProjectionExec,
319    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
320        match self
321            .data_source
322            .try_swapping_with_projection(projection.expr())?
323        {
324            Some(new_data_source) => {
325                Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
326            }
327            None => Ok(None),
328        }
329    }
330
331    fn handle_child_pushdown_result(
332        &self,
333        _phase: FilterPushdownPhase,
334        child_pushdown_result: ChildPushdownResult,
335        config: &ConfigOptions,
336    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
337        // Push any remaining filters into our data source
338        let parent_filters = child_pushdown_result
339            .parent_filters
340            .into_iter()
341            .map(|f| f.filter)
342            .collect_vec();
343        let res = self
344            .data_source
345            .try_pushdown_filters(parent_filters.clone(), config)?;
346        match res.updated_node {
347            Some(data_source) => {
348                let mut new_node = self.clone();
349                new_node.data_source = data_source;
350                // Re-compute properties since we have new filters which will impact equivalence info
351                new_node.cache =
352                    Self::compute_properties(Arc::clone(&new_node.data_source));
353
354                Ok(FilterPushdownPropagation {
355                    filters: res.filters,
356                    updated_node: Some(Arc::new(new_node)),
357                })
358            }
359            None => Ok(FilterPushdownPropagation {
360                filters: res.filters,
361                updated_node: None,
362            }),
363        }
364    }
365}
366
367impl DataSourceExec {
368    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
369        Arc::new(Self::new(Arc::new(data_source)))
370    }
371
372    // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
373    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
374        let cache = Self::compute_properties(Arc::clone(&data_source));
375        Self { data_source, cache }
376    }
377
378    /// Return the source object
379    pub fn data_source(&self) -> &Arc<dyn DataSource> {
380        &self.data_source
381    }
382
383    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
384        self.cache = Self::compute_properties(Arc::clone(&data_source));
385        self.data_source = data_source;
386        self
387    }
388
389    /// Assign constraints
390    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
391        self.cache = self.cache.with_constraints(constraints);
392        self
393    }
394
395    /// Assign output partitioning
396    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
397        self.cache = self.cache.with_partitioning(partitioning);
398        self
399    }
400
401    fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
402        PlanProperties::new(
403            data_source.eq_properties(),
404            data_source.output_partitioning(),
405            EmissionType::Incremental,
406            Boundedness::Bounded,
407        )
408        .with_scheduling_type(data_source.scheduling_type())
409    }
410
411    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
412    ///
413    /// Returns `None` if
414    /// 1. the datasource is not scanning files (`FileScanConfig`)
415    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
416    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
417        self.data_source()
418            .as_any()
419            .downcast_ref::<FileScanConfig>()
420            .and_then(|file_scan_conf| {
421                file_scan_conf
422                    .file_source()
423                    .as_any()
424                    .downcast_ref::<T>()
425                    .map(|source| (file_scan_conf, source))
426            })
427    }
428}
429
430/// Create a new `DataSourceExec` from a `DataSource`
431impl<S> From<S> for DataSourceExec
432where
433    S: DataSource + 'static,
434{
435    fn from(source: S) -> Self {
436        Self::new(Arc::new(source))
437    }
438}