Skip to main content

datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::{Arc, OnceLock};
24
25use datafusion_physical_expr::projection::ProjectionExprs;
26use datafusion_physical_plan::execution_plan::{
27    Boundedness, EmissionType, SchedulingType,
28};
29use datafusion_physical_plan::metrics::SplitMetrics;
30use datafusion_physical_plan::metrics::{
31    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
32};
33use datafusion_physical_plan::projection::ProjectionExec;
34use datafusion_physical_plan::stream::BatchSplitStream;
35use datafusion_physical_plan::{
36    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
37};
38use itertools::Itertools;
39
40use crate::file::FileSource;
41use crate::file_scan_config::FileScanConfig;
42use datafusion_common::config::ConfigOptions;
43use datafusion_common::{Constraints, Result, Statistics};
44use datafusion_execution::{SendableRecordBatchStream, TaskContext};
45use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
46use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
47use datafusion_physical_plan::SortOrderPushdownResult;
48use datafusion_physical_plan::filter_pushdown::{
49    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
50};
51
52/// A source of data, typically a list of files or memory
53///
54/// This trait provides common behaviors for abstract sources of data. It has
55/// two common implementations:
56///
57/// 1. [`FileScanConfig`]: lists of files
58/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
59///
60/// File format specific behaviors are defined by [`FileSource`]
61///
62/// # See Also
63/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
64/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
65///
66/// # Notes
67///
68/// Requires `Debug` to assist debugging
69///
70/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
71/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
72/// [`FileSource`]: crate::file::FileSource
73/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
74/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
75///
76/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
77/// ```text
78///                       ┌─────────────────────┐                              -----► execute path
79///                       │                     │                              ┄┄┄┄┄► init path
80///                       │   DataSourceExec    │
81///                       │                     │
82///                       └───────▲─────────────┘
83///                               ┊  │
84///                               ┊  │
85///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
86///                       │                     │                            |                     |
87///                       │  DataSource(trait)  │                            | TableProvider(trait)|
88///                       │                     │                            |                     |
89///                       └───────▲─────────────┘                            └─────────────────────┘
90///                               ┊  │                                                  ┊
91///               ┌───────────────┿──┴────────────────┐                                 ┊
92///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
93///               |   ┊                               |                                 ┊
94///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
95///    │                     │             │                     │           ┌──────────▼──────────┐
96///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
97///    │                     │             │                     │           |  FileFormat(trait)  |
98///    └──────────────▲──────┘             └─────────────────────┘           |                     |
99///               │   ┊                                                      └─────────────────────┘
100///               │   ┊                                                                 ┊
101///               │   ┊                                                                 ┊
102///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
103///    │                     │                                               │     ArrowSource     │
104///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
105///    │                     │                                               │    ParquetSource    │
106///    └─────────────────────┘                                               └─────────────────────┘
107///               │
108///               │
109///               │
110///               │
111///    ┌──────────▼──────────┐
112///    │     ArrowSource     │
113///    │          ...        │
114///    │    ParquetSource    │
115///    └─────────────────────┘
116///               |
117/// FileOpener (called by FileStream)
118///               │
119///    ┌──────────▼──────────┐
120///    │                     │
121///    │     RecordBatch     │
122///    │                     │
123///    └─────────────────────┘
124/// ```
125pub trait DataSource: Any + Send + Sync + Debug {
126    /// Open the specified output partition and return its stream of
127    /// [`RecordBatch`]es.
128    ///
129    /// This should be used by data sources that do not need any sibling
130    /// coordination. Data sources that want to use per-execution shared state
131    /// (for example, to reorder work across partitions at runtime) should
132    /// implement [`Self::open_with_args`] instead.
133    ///
134    /// [`RecordBatch`]: arrow::record_batch::RecordBatch
135    fn open(
136        &self,
137        partition: usize,
138        context: Arc<TaskContext>,
139    ) -> Result<SendableRecordBatchStream>;
140
141    /// Format this source for display in explain plans
142    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
143
144    /// Return a copy of this DataSource with a new partitioning scheme.
145    ///
146    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
147    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
148    ///
149    /// Repartitioning should not change the output ordering, if this ordering exists.
150    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
151    /// and the FileSource's
152    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
153    /// for examples.
154    fn repartitioned(
155        &self,
156        _target_partitions: usize,
157        _repartition_file_min_size: usize,
158        _output_ordering: Option<LexOrdering>,
159    ) -> Result<Option<Arc<dyn DataSource>>> {
160        Ok(None)
161    }
162
163    fn output_partitioning(&self) -> Partitioning;
164    fn eq_properties(&self) -> EquivalenceProperties;
165    fn scheduling_type(&self) -> SchedulingType {
166        SchedulingType::NonCooperative
167    }
168
169    /// Returns statistics for a specific partition, or aggregate statistics
170    /// across all partitions if `partition` is `None`.
171    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>>;
172
173    /// Return a copy of this DataSource with a new fetch limit
174    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
175    fn fetch(&self) -> Option<usize>;
176    fn metrics(&self) -> ExecutionPlanMetricsSet {
177        ExecutionPlanMetricsSet::new()
178    }
179    fn try_swapping_with_projection(
180        &self,
181        _projection: &ProjectionExprs,
182    ) -> Result<Option<Arc<dyn DataSource>>>;
183
184    /// Try to push down filters into this DataSource.
185    ///
186    /// These filters are in terms of the output schema of this DataSource (e.g.
187    /// [`Self::eq_properties`] and output of any projections pushed into the
188    /// source), not the original table schema.
189    ///
190    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
191    ///
192    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
193    fn try_pushdown_filters(
194        &self,
195        filters: Vec<Arc<dyn PhysicalExpr>>,
196        _config: &ConfigOptions,
197    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
198        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
199            vec![PushedDown::No; filters.len()],
200        ))
201    }
202
203    /// Try to create a new DataSource that produces data in the specified sort order.
204    ///
205    /// # Arguments
206    /// * `order` - The desired output ordering
207    ///
208    /// # Returns
209    /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering
210    /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering
211    /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering
212    /// * `Err(e)` - Error occurred
213    ///
214    /// Default implementation returns `Unsupported`.
215    fn try_pushdown_sort(
216        &self,
217        _order: &[PhysicalSortExpr],
218    ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
219        Ok(SortOrderPushdownResult::Unsupported)
220    }
221
222    /// Returns a variant of this `DataSource` that is aware of order-sensitivity.
223    fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn DataSource>> {
224        None
225    }
226
227    /// Injects arbitrary run-time state into this DataSource, returning a new instance
228    /// that incorporates that state *if* it is relevant to the concrete DataSource implementation.
229    ///
230    /// This is a generic entry point: the `state` can be any type wrapped in
231    /// `Arc<dyn Any + Send + Sync>`.  A data source that cares about the state should
232    /// down-cast it to the concrete type it expects and, if successful, return a
233    /// modified copy of itself that captures the provided value.  If the state is
234    /// not applicable, the default behaviour is to return `None` so that parent
235    /// nodes can continue propagating the attempt further down the plan tree.
236    fn with_new_state(
237        &self,
238        _state: Arc<dyn Any + Send + Sync>,
239    ) -> Option<Arc<dyn DataSource>> {
240        None
241    }
242
243    /// Create per execution state to share across sibling instances of this
244    /// data source during one execution.
245    ///
246    /// Returns `None` (the default) if this data source has
247    /// no sibling-shared execution state.
248    fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
249        None
250    }
251
252    /// Open a partition using optional sibling-shared execution state.
253    ///
254    /// The default implementation ignores the additional state and delegates to
255    /// [`Self::open`].
256    fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> {
257        self.open(args.partition, args.context)
258    }
259}
260
261/// Arguments for [`DataSource::open_with_args`]
262#[derive(Debug, Clone)]
263pub struct OpenArgs {
264    /// Which partition to open
265    pub partition: usize,
266    /// The task context for execution
267    pub context: Arc<TaskContext>,
268    /// Optional sibling-shared execution state, see
269    /// [`DataSource::create_sibling_state`] for details.
270    pub sibling_state: Option<Arc<dyn Any + Send + Sync>>,
271}
272
273impl OpenArgs {
274    /// Create a new OpenArgs with required arguments
275    pub fn new(partition: usize, context: Arc<TaskContext>) -> Self {
276        Self {
277            partition,
278            context,
279            sibling_state: None,
280        }
281    }
282
283    /// Set sibling shared state
284    pub fn with_shared_state(
285        mut self,
286        sibling_state: Option<Arc<dyn Any + Send + Sync>>,
287    ) -> Self {
288        self.sibling_state = sibling_state;
289        self
290    }
291}
292
293impl dyn DataSource {
294    pub fn is<T: DataSource>(&self) -> bool {
295        (self as &dyn Any).is::<T>()
296    }
297
298    pub fn downcast_ref<T: DataSource>(&self) -> Option<&T> {
299        (self as &dyn Any).downcast_ref()
300    }
301}
302
303/// [`ExecutionPlan`] that reads one or more files
304///
305/// `DataSourceExec` implements common functionality such as applying
306/// projections, and caching plan properties.
307///
308/// The [`DataSource`] describes where to find the data for this data source
309/// (for example in files or what in memory partitions).
310///
311/// For file based [`DataSource`]s, format specific behavior is implemented in
312/// the [`FileSource`] trait.
313///
314/// [`FileSource`]: crate::file::FileSource
315#[derive(Clone, Debug)]
316pub struct DataSourceExec {
317    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
318    data_source: Arc<dyn DataSource>,
319    /// Cached plan properties such as sort order
320    cache: Arc<PlanProperties>,
321    /// Per execution state shared across partitions of this plan.
322    ///
323    /// Created by [`DataSource::create_sibling_state`]
324    /// and then passed to
325    /// [`DataSource::open_with_args`].
326    execution_state: Arc<OnceLock<Option<Arc<dyn Any + Send + Sync>>>>,
327}
328
329impl DisplayAs for DataSourceExec {
330    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
331        match t {
332            DisplayFormatType::Default | DisplayFormatType::Verbose => {
333                write!(f, "DataSourceExec: ")?;
334            }
335            DisplayFormatType::TreeRender => {}
336        }
337        self.data_source.fmt_as(t, f)
338    }
339}
340
341impl ExecutionPlan for DataSourceExec {
342    fn name(&self) -> &'static str {
343        "DataSourceExec"
344    }
345
346    fn properties(&self) -> &Arc<PlanProperties> {
347        &self.cache
348    }
349
350    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
351        Vec::new()
352    }
353
354    fn with_new_children(
355        self: Arc<Self>,
356        _: Vec<Arc<dyn ExecutionPlan>>,
357    ) -> Result<Arc<dyn ExecutionPlan>> {
358        Ok(self)
359    }
360
361    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
362    ///
363    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
364    /// to [`ExecutionPlan::repartitioned`] for more details.
365    fn repartitioned(
366        &self,
367        target_partitions: usize,
368        config: &ConfigOptions,
369    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
370        let data_source = self.data_source.repartitioned(
371            target_partitions,
372            config.optimizer.repartition_file_min_size,
373            self.properties().eq_properties.output_ordering(),
374        )?;
375
376        Ok(data_source.map(|source| {
377            let output_partitioning = source.output_partitioning();
378            let plan = self
379                .clone()
380                .with_data_source(source)
381                // Changing source partitioning may invalidate output partitioning. Update it also
382                .with_partitioning(output_partitioning);
383            Arc::new(plan) as _
384        }))
385    }
386
387    fn execute(
388        &self,
389        partition: usize,
390        context: Arc<TaskContext>,
391    ) -> Result<SendableRecordBatchStream> {
392        let shared_state = self
393            .execution_state
394            .get_or_init(|| self.data_source.create_sibling_state())
395            .clone();
396        let args = OpenArgs::new(partition, Arc::clone(&context))
397            .with_shared_state(shared_state);
398        let stream = self.data_source.open_with_args(args)?;
399        let batch_size = context.session_config().batch_size();
400
401        log::debug!(
402            "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
403        );
404        let metrics = self.data_source.metrics();
405        let split_metrics = SplitMetrics::new(&metrics, partition);
406        Ok(Box::pin(BatchSplitStream::new(
407            stream,
408            batch_size,
409            split_metrics,
410        )))
411    }
412
413    fn metrics(&self) -> Option<MetricsSet> {
414        let mut metrics = self.data_source.metrics().clone_inner();
415
416        // Add `output_rows_skew` metric to the metrics set.
417        // Done here because it's a derived metric from output_rows metric.
418        if let Some(file_scan_config) = self.data_source.downcast_ref::<FileScanConfig>()
419            && file_scan_config.file_source().file_type() == "parquet"
420            && let Some(output_rows_skew) =
421                BaselineMetrics::output_rows_skew_metric(&metrics)
422        {
423            metrics.push(output_rows_skew);
424        }
425
426        Some(metrics)
427    }
428
429    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
430        self.data_source.partition_statistics(partition)
431    }
432
433    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
434        let data_source = self.data_source.with_fetch(limit)?;
435        let cache = Arc::clone(&self.cache);
436        let execution_state = Arc::new(OnceLock::new());
437
438        Some(Arc::new(Self {
439            data_source,
440            cache,
441            execution_state,
442        }))
443    }
444
445    fn fetch(&self) -> Option<usize> {
446        self.data_source.fetch()
447    }
448
449    fn try_swapping_with_projection(
450        &self,
451        projection: &ProjectionExec,
452    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
453        match self
454            .data_source
455            .try_swapping_with_projection(projection.projection_expr())?
456        {
457            Some(new_data_source) => {
458                Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
459            }
460            None => Ok(None),
461        }
462    }
463
464    fn handle_child_pushdown_result(
465        &self,
466        _phase: FilterPushdownPhase,
467        child_pushdown_result: ChildPushdownResult,
468        config: &ConfigOptions,
469    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
470        // Push any remaining filters into our data source
471        let parent_filters = child_pushdown_result
472            .parent_filters
473            .into_iter()
474            .map(|f| f.filter)
475            .collect_vec();
476        let res = self
477            .data_source
478            .try_pushdown_filters(parent_filters, config)?;
479        match res.updated_node {
480            Some(data_source) => {
481                let mut new_node = self.clone();
482                new_node.data_source = data_source;
483                // Re-compute properties since we have new filters which will impact equivalence info
484                new_node.cache =
485                    Arc::new(Self::compute_properties(&new_node.data_source));
486
487                Ok(FilterPushdownPropagation {
488                    filters: res.filters,
489                    updated_node: Some(Arc::new(new_node)),
490                })
491            }
492            None => Ok(FilterPushdownPropagation {
493                filters: res.filters,
494                updated_node: None,
495            }),
496        }
497    }
498
499    fn try_pushdown_sort(
500        &self,
501        order: &[PhysicalSortExpr],
502    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
503        // Delegate to the data source and wrap result with DataSourceExec
504        self.data_source
505            .try_pushdown_sort(order)?
506            .try_map(|new_data_source| {
507                let new_exec = self.clone().with_data_source(new_data_source);
508                Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
509            })
510    }
511
512    fn with_preserve_order(
513        &self,
514        preserve_order: bool,
515    ) -> Option<Arc<dyn ExecutionPlan>> {
516        self.data_source
517            .with_preserve_order(preserve_order)
518            .map(|new_data_source| {
519                Arc::new(self.clone().with_data_source(new_data_source))
520                    as Arc<dyn ExecutionPlan>
521            })
522    }
523
524    fn with_new_state(
525        &self,
526        state: Arc<dyn Any + Send + Sync>,
527    ) -> Option<Arc<dyn ExecutionPlan>> {
528        self.data_source
529            .with_new_state(state)
530            .map(|new_data_source| {
531                Arc::new(self.clone().with_data_source(new_data_source))
532                    as Arc<dyn ExecutionPlan>
533            })
534    }
535
536    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
537        let mut new_exec = Arc::unwrap_or_clone(self);
538        new_exec.execution_state = Arc::new(OnceLock::new());
539        Ok(Arc::new(new_exec))
540    }
541}
542
543impl DataSourceExec {
544    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
545        Arc::new(Self::new(Arc::new(data_source)))
546    }
547
548    // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
549    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
550        let cache = Self::compute_properties(&data_source);
551        Self {
552            data_source,
553            cache: Arc::new(cache),
554            execution_state: Arc::new(OnceLock::new()),
555        }
556    }
557
558    /// Return the source object
559    pub fn data_source(&self) -> &Arc<dyn DataSource> {
560        &self.data_source
561    }
562
563    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
564        self.cache = Arc::new(Self::compute_properties(&data_source));
565        self.data_source = data_source;
566        self.execution_state = Arc::new(OnceLock::new());
567        self
568    }
569
570    /// Assign constraints
571    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
572        Arc::make_mut(&mut self.cache).set_constraints(constraints);
573        self
574    }
575
576    /// Assign output partitioning
577    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
578        Arc::make_mut(&mut self.cache).partitioning = partitioning;
579        self
580    }
581
582    fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
583        PlanProperties::new(
584            data_source.eq_properties(),
585            data_source.output_partitioning(),
586            EmissionType::Incremental,
587            Boundedness::Bounded,
588        )
589        .with_scheduling_type(data_source.scheduling_type())
590    }
591
592    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
593    ///
594    /// Returns `None` if
595    /// 1. the datasource is not scanning files (`FileScanConfig`)
596    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
597    pub fn downcast_to_file_source<T: FileSource>(
598        &self,
599    ) -> Option<(&FileScanConfig, &T)> {
600        self.data_source()
601            .downcast_ref::<FileScanConfig>()
602            .and_then(|file_scan_conf| {
603                file_scan_conf
604                    .file_source()
605                    .downcast_ref::<T>()
606                    .map(|source| (file_scan_conf, source))
607            })
608    }
609}
610
611/// Create a new `DataSourceExec` from a `DataSource`
612impl<S> From<S> for DataSourceExec
613where
614    S: DataSource + 'static,
615{
616    fn from(source: S) -> Self {
617        Self::new(Arc::new(source))
618    }
619}