Skip to main content

datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_expr::projection::ProjectionExprs;
26use datafusion_physical_plan::execution_plan::{
27    Boundedness, EmissionType, SchedulingType,
28};
29use datafusion_physical_plan::metrics::SplitMetrics;
30use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
31use datafusion_physical_plan::projection::ProjectionExec;
32use datafusion_physical_plan::stream::BatchSplitStream;
33use datafusion_physical_plan::{
34    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
35};
36use itertools::Itertools;
37
38use crate::file_scan_config::FileScanConfig;
39use datafusion_common::config::ConfigOptions;
40use datafusion_common::{Constraints, Result, Statistics};
41use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
43use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
44use datafusion_physical_plan::SortOrderPushdownResult;
45use datafusion_physical_plan::filter_pushdown::{
46    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
47};
48
49/// A source of data, typically a list of files or memory
50///
51/// This trait provides common behaviors for abstract sources of data. It has
52/// two common implementations:
53///
54/// 1. [`FileScanConfig`]: lists of files
55/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
56///
57/// File format specific behaviors are defined by [`FileSource`]
58///
59/// # See Also
60/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
61/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
62///
63/// # Notes
64///
65/// Requires `Debug` to assist debugging
66///
67/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
68/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
69/// [`FileSource`]: crate::file::FileSource
70/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
71/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
72///
73/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
74/// ```text
75///                       ┌─────────────────────┐                              -----► execute path
76///                       │                     │                              ┄┄┄┄┄► init path
77///                       │   DataSourceExec    │
78///                       │                     │
79///                       └───────▲─────────────┘
80///                               ┊  │
81///                               ┊  │
82///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
83///                       │                     │                            |                     |
84///                       │  DataSource(trait)  │                            | TableProvider(trait)|
85///                       │                     │                            |                     |
86///                       └───────▲─────────────┘                            └─────────────────────┘
87///                               ┊  │                                                  ┊
88///               ┌───────────────┿──┴────────────────┐                                 ┊
89///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
90///               |   ┊                               |                                 ┊
91///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
92///    │                     │             │                     │           ┌──────────▼──────────┐
93///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
94///    │                     │             │                     │           |  FileFormat(trait)  |
95///    └──────────────▲──────┘             └─────────────────────┘           |                     |
96///               │   ┊                                                      └─────────────────────┘
97///               │   ┊                                                                 ┊
98///               │   ┊                                                                 ┊
99///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
100///    │                     │                                               │     ArrowSource     │
101///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
102///    │                     │                                               │    ParquetSource    │
103///    └─────────────────────┘                                               └─────────────────────┘
104///               │
105///               │
106///               │
107///               │
108///    ┌──────────▼──────────┐
109///    │     ArrowSource     │
110///    │          ...        │
111///    │    ParquetSource    │
112///    └─────────────────────┘
113///               |
114/// FileOpener (called by FileStream)
115///               │
116///    ┌──────────▼──────────┐
117///    │                     │
118///    │     RecordBatch     │
119///    │                     │
120///    └─────────────────────┘
121/// ```
122pub trait DataSource: Send + Sync + Debug {
123    fn open(
124        &self,
125        partition: usize,
126        context: Arc<TaskContext>,
127    ) -> Result<SendableRecordBatchStream>;
128    fn as_any(&self) -> &dyn Any;
129    /// Format this source for display in explain plans
130    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
131
132    /// Return a copy of this DataSource with a new partitioning scheme.
133    ///
134    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
135    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
136    ///
137    /// Repartitioning should not change the output ordering, if this ordering exists.
138    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
139    /// and the FileSource's
140    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
141    /// for examples.
142    fn repartitioned(
143        &self,
144        _target_partitions: usize,
145        _repartition_file_min_size: usize,
146        _output_ordering: Option<LexOrdering>,
147    ) -> Result<Option<Arc<dyn DataSource>>> {
148        Ok(None)
149    }
150
151    fn output_partitioning(&self) -> Partitioning;
152    fn eq_properties(&self) -> EquivalenceProperties;
153    fn scheduling_type(&self) -> SchedulingType {
154        SchedulingType::NonCooperative
155    }
156
157    /// Returns statistics for a specific partition, or aggregate statistics
158    /// across all partitions if `partition` is `None`.
159    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
160
161    /// Return a copy of this DataSource with a new fetch limit
162    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
163    fn fetch(&self) -> Option<usize>;
164    fn metrics(&self) -> ExecutionPlanMetricsSet {
165        ExecutionPlanMetricsSet::new()
166    }
167    fn try_swapping_with_projection(
168        &self,
169        _projection: &ProjectionExprs,
170    ) -> Result<Option<Arc<dyn DataSource>>>;
171
172    /// Try to push down filters into this DataSource.
173    ///
174    /// These filters are in terms of the output schema of this DataSource (e.g.
175    /// [`Self::eq_properties`] and output of any projections pushed into the
176    /// source), not the original table schema.
177    ///
178    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
179    ///
180    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
181    fn try_pushdown_filters(
182        &self,
183        filters: Vec<Arc<dyn PhysicalExpr>>,
184        _config: &ConfigOptions,
185    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
186        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
187            vec![PushedDown::No; filters.len()],
188        ))
189    }
190
191    /// Try to create a new DataSource that produces data in the specified sort order.
192    ///
193    /// # Arguments
194    /// * `order` - The desired output ordering
195    ///
196    /// # Returns
197    /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering
198    /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering
199    /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering
200    /// * `Err(e)` - Error occurred
201    ///
202    /// Default implementation returns `Unsupported`.
203    fn try_pushdown_sort(
204        &self,
205        _order: &[PhysicalSortExpr],
206    ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
207        Ok(SortOrderPushdownResult::Unsupported)
208    }
209
210    /// Returns a variant of this `DataSource` that is aware of order-sensitivity.
211    fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn DataSource>> {
212        None
213    }
214}
215
216/// [`ExecutionPlan`] that reads one or more files
217///
218/// `DataSourceExec` implements common functionality such as applying
219/// projections, and caching plan properties.
220///
221/// The [`DataSource`] describes where to find the data for this data source
222/// (for example in files or what in memory partitions).
223///
224/// For file based [`DataSource`]s, format specific behavior is implemented in
225/// the [`FileSource`] trait.
226///
227/// [`FileSource`]: crate::file::FileSource
228#[derive(Clone, Debug)]
229pub struct DataSourceExec {
230    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
231    data_source: Arc<dyn DataSource>,
232    /// Cached plan properties such as sort order
233    cache: Arc<PlanProperties>,
234}
235
236impl DisplayAs for DataSourceExec {
237    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
238        match t {
239            DisplayFormatType::Default | DisplayFormatType::Verbose => {
240                write!(f, "DataSourceExec: ")?;
241            }
242            DisplayFormatType::TreeRender => {}
243        }
244        self.data_source.fmt_as(t, f)
245    }
246}
247
248impl ExecutionPlan for DataSourceExec {
249    fn name(&self) -> &'static str {
250        "DataSourceExec"
251    }
252
253    fn as_any(&self) -> &dyn Any {
254        self
255    }
256
257    fn properties(&self) -> &Arc<PlanProperties> {
258        &self.cache
259    }
260
261    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
262        Vec::new()
263    }
264
265    fn with_new_children(
266        self: Arc<Self>,
267        _: Vec<Arc<dyn ExecutionPlan>>,
268    ) -> Result<Arc<dyn ExecutionPlan>> {
269        Ok(self)
270    }
271
272    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
273    ///
274    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
275    /// to [`ExecutionPlan::repartitioned`] for more details.
276    fn repartitioned(
277        &self,
278        target_partitions: usize,
279        config: &ConfigOptions,
280    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
281        let data_source = self.data_source.repartitioned(
282            target_partitions,
283            config.optimizer.repartition_file_min_size,
284            self.properties().eq_properties.output_ordering(),
285        )?;
286
287        Ok(data_source.map(|source| {
288            let output_partitioning = source.output_partitioning();
289            let plan = self
290                .clone()
291                .with_data_source(source)
292                // Changing source partitioning may invalidate output partitioning. Update it also
293                .with_partitioning(output_partitioning);
294            Arc::new(plan) as _
295        }))
296    }
297
298    fn execute(
299        &self,
300        partition: usize,
301        context: Arc<TaskContext>,
302    ) -> Result<SendableRecordBatchStream> {
303        let stream = self.data_source.open(partition, Arc::clone(&context))?;
304        let batch_size = context.session_config().batch_size();
305        log::debug!(
306            "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
307        );
308        let metrics = self.data_source.metrics();
309        let split_metrics = SplitMetrics::new(&metrics, partition);
310        Ok(Box::pin(BatchSplitStream::new(
311            stream,
312            batch_size,
313            split_metrics,
314        )))
315    }
316
317    fn metrics(&self) -> Option<MetricsSet> {
318        Some(self.data_source.metrics().clone_inner())
319    }
320
321    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
322        self.data_source.partition_statistics(partition)
323    }
324
325    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
326        let data_source = self.data_source.with_fetch(limit)?;
327        let cache = Arc::clone(&self.cache);
328
329        Some(Arc::new(Self { data_source, cache }))
330    }
331
332    fn fetch(&self) -> Option<usize> {
333        self.data_source.fetch()
334    }
335
336    fn try_swapping_with_projection(
337        &self,
338        projection: &ProjectionExec,
339    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
340        match self
341            .data_source
342            .try_swapping_with_projection(projection.projection_expr())?
343        {
344            Some(new_data_source) => {
345                Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
346            }
347            None => Ok(None),
348        }
349    }
350
351    fn handle_child_pushdown_result(
352        &self,
353        _phase: FilterPushdownPhase,
354        child_pushdown_result: ChildPushdownResult,
355        config: &ConfigOptions,
356    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
357        // Push any remaining filters into our data source
358        let parent_filters = child_pushdown_result
359            .parent_filters
360            .into_iter()
361            .map(|f| f.filter)
362            .collect_vec();
363        let res = self
364            .data_source
365            .try_pushdown_filters(parent_filters, config)?;
366        match res.updated_node {
367            Some(data_source) => {
368                let mut new_node = self.clone();
369                new_node.data_source = data_source;
370                // Re-compute properties since we have new filters which will impact equivalence info
371                new_node.cache =
372                    Arc::new(Self::compute_properties(&new_node.data_source));
373
374                Ok(FilterPushdownPropagation {
375                    filters: res.filters,
376                    updated_node: Some(Arc::new(new_node)),
377                })
378            }
379            None => Ok(FilterPushdownPropagation {
380                filters: res.filters,
381                updated_node: None,
382            }),
383        }
384    }
385
386    fn try_pushdown_sort(
387        &self,
388        order: &[PhysicalSortExpr],
389    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
390        // Delegate to the data source and wrap result with DataSourceExec
391        self.data_source
392            .try_pushdown_sort(order)?
393            .try_map(|new_data_source| {
394                let new_exec = self.clone().with_data_source(new_data_source);
395                Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
396            })
397    }
398
399    fn with_preserve_order(
400        &self,
401        preserve_order: bool,
402    ) -> Option<Arc<dyn ExecutionPlan>> {
403        self.data_source
404            .with_preserve_order(preserve_order)
405            .map(|new_data_source| {
406                Arc::new(self.clone().with_data_source(new_data_source))
407                    as Arc<dyn ExecutionPlan>
408            })
409    }
410}
411
412impl DataSourceExec {
413    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
414        Arc::new(Self::new(Arc::new(data_source)))
415    }
416
417    // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
418    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
419        let cache = Self::compute_properties(&data_source);
420        Self {
421            data_source,
422            cache: Arc::new(cache),
423        }
424    }
425
426    /// Return the source object
427    pub fn data_source(&self) -> &Arc<dyn DataSource> {
428        &self.data_source
429    }
430
431    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
432        self.cache = Arc::new(Self::compute_properties(&data_source));
433        self.data_source = data_source;
434        self
435    }
436
437    /// Assign constraints
438    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
439        Arc::make_mut(&mut self.cache).set_constraints(constraints);
440        self
441    }
442
443    /// Assign output partitioning
444    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
445        Arc::make_mut(&mut self.cache).partitioning = partitioning;
446        self
447    }
448
449    fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
450        PlanProperties::new(
451            data_source.eq_properties(),
452            data_source.output_partitioning(),
453            EmissionType::Incremental,
454            Boundedness::Bounded,
455        )
456        .with_scheduling_type(data_source.scheduling_type())
457    }
458
459    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
460    ///
461    /// Returns `None` if
462    /// 1. the datasource is not scanning files (`FileScanConfig`)
463    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
464    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
465        self.data_source()
466            .as_any()
467            .downcast_ref::<FileScanConfig>()
468            .and_then(|file_scan_conf| {
469                file_scan_conf
470                    .file_source()
471                    .as_any()
472                    .downcast_ref::<T>()
473                    .map(|source| (file_scan_conf, source))
474            })
475    }
476}
477
478/// Create a new `DataSourceExec` from a `DataSource`
479impl<S> From<S> for DataSourceExec
480where
481    S: DataSource + 'static,
482{
483    fn from(source: S) -> Self {
484        Self::new(Arc::new(source))
485    }
486}