datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use datafusion_physical_plan::projection::ProjectionExec;
28use datafusion_physical_plan::{
29    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
30};
31
32use crate::file_scan_config::FileScanConfig;
33use datafusion_common::config::ConfigOptions;
34use datafusion_common::{Constraints, Result, Statistics};
35use datafusion_execution::{SendableRecordBatchStream, TaskContext};
36use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
37use datafusion_physical_expr_common::sort_expr::LexOrdering;
38use datafusion_physical_plan::filter_pushdown::{
39    ChildPushdownResult, FilterPushdownPropagation,
40};
41
42/// A source of data, typically a list of files or memory
43///
44/// This trait provides common behaviors for abstract sources of data. It has
45/// two common implementations:
46///
47/// 1. [`FileScanConfig`]: lists of files
48/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
49///
50/// File format specific behaviors are defined by [`FileSource`]
51///
52/// # See Also
53/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
54/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
55///
56/// # Notes
57///
58/// Requires `Debug` to assist debugging
59///
60/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
61/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
62/// [`FileSource`]: crate::file::FileSource
63/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
64/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
65///
66/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
67/// ```text
68///                       ┌─────────────────────┐                              -----► execute path
69///                       │                     │                              ┄┄┄┄┄► init path
70///                       │   DataSourceExec    │  
71///                       │                     │    
72///                       └───────▲─────────────┘
73///                               ┊  │
74///                               ┊  │
75///                       ┌──────────▼──────────┐                            ┌──────────-──────────┐
76///                       │                     │                            |                     |
77///                       │  DataSource(trait)  │                            | TableProvider(trait)|
78///                       │                     │                            |                     |
79///                       └───────▲─────────────┘                            └─────────────────────┘
80///                               ┊  │                                                  ┊
81///               ┌───────────────┿──┴────────────────┐                                 ┊
82///               |   ┌┄┄┄┄┄┄┄┄┄┄┄┘                   |                                 ┊
83///               |   ┊                               |                                 ┊
84///    ┌──────────▼──────────┐             ┌──────────▼──────────┐                      ┊
85///    │                     │             │                     │           ┌──────────▼──────────┐
86///    │   FileScanConfig    │             │ MemorySourceConfig  │           |                     |
87///    │                     │             │                     │           |  FileFormat(trait)  |
88///    └──────────────▲──────┘             └─────────────────────┘           |                     |
89///               │   ┊                                                      └─────────────────────┘
90///               │   ┊                                                                 ┊
91///               │   ┊                                                                 ┊
92///    ┌──────────▼──────────┐                                               ┌──────────▼──────────┐
93///    │                     │                                               │     ArrowSource     │
94///    │ FileSource(trait)   ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│          ...        │
95///    │                     │                                               │    ParquetSource    │
96///    └─────────────────────┘                                               └─────────────────────┘
97///               │
98///               │
99///               │
100///               │
101///    ┌──────────▼──────────┐
102///    │     ArrowSource     │
103///    │          ...        │
104///    │    ParquetSource    │
105///    └─────────────────────┘
106///               |
107/// FileOpener (called by FileStream)
108///               │
109///    ┌──────────▼──────────┐
110///    │                     │
111///    │     RecordBatch     │
112///    │                     │
113///    └─────────────────────┘
114/// ```
115pub trait DataSource: Send + Sync + Debug {
116    fn open(
117        &self,
118        partition: usize,
119        context: Arc<TaskContext>,
120    ) -> Result<SendableRecordBatchStream>;
121    fn as_any(&self) -> &dyn Any;
122    /// Format this source for display in explain plans
123    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
124
125    /// Return a copy of this DataSource with a new partitioning scheme.
126    ///
127    /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
128    /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
129    ///
130    /// Repartitioning should not change the output ordering, if this ordering exists.
131    /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
132    /// and the FileSource's
133    /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
134    /// for examples.
135    fn repartitioned(
136        &self,
137        _target_partitions: usize,
138        _repartition_file_min_size: usize,
139        _output_ordering: Option<LexOrdering>,
140    ) -> Result<Option<Arc<dyn DataSource>>> {
141        Ok(None)
142    }
143
144    fn output_partitioning(&self) -> Partitioning;
145    fn eq_properties(&self) -> EquivalenceProperties;
146    fn statistics(&self) -> Result<Statistics>;
147    /// Return a copy of this DataSource with a new fetch limit
148    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
149    fn fetch(&self) -> Option<usize>;
150    fn metrics(&self) -> ExecutionPlanMetricsSet {
151        ExecutionPlanMetricsSet::new()
152    }
153    fn try_swapping_with_projection(
154        &self,
155        _projection: &ProjectionExec,
156    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
157    /// Try to push down filters into this DataSource.
158    /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
159    ///
160    /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
161    fn try_pushdown_filters(
162        &self,
163        filters: Vec<Arc<dyn PhysicalExpr>>,
164        _config: &ConfigOptions,
165    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
166        Ok(FilterPushdownPropagation::unsupported(filters))
167    }
168}
169
170/// [`ExecutionPlan`] that reads one or more files
171///
172/// `DataSourceExec` implements common functionality such as applying
173/// projections, and caching plan properties.
174///
175/// The [`DataSource`] describes where to find the data for this data source
176/// (for example in files or what in memory partitions).
177///
178/// For file based [`DataSource`]s, format specific behavior is implemented in
179/// the [`FileSource`] trait.
180///
181/// [`FileSource`]: crate::file::FileSource
182#[derive(Clone, Debug)]
183pub struct DataSourceExec {
184    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
185    data_source: Arc<dyn DataSource>,
186    /// Cached plan properties such as sort order
187    cache: PlanProperties,
188}
189
190impl DisplayAs for DataSourceExec {
191    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
192        match t {
193            DisplayFormatType::Default | DisplayFormatType::Verbose => {
194                write!(f, "DataSourceExec: ")?;
195            }
196            DisplayFormatType::TreeRender => {}
197        }
198        self.data_source.fmt_as(t, f)
199    }
200}
201
202impl ExecutionPlan for DataSourceExec {
203    fn name(&self) -> &'static str {
204        "DataSourceExec"
205    }
206
207    fn as_any(&self) -> &dyn Any {
208        self
209    }
210
211    fn properties(&self) -> &PlanProperties {
212        &self.cache
213    }
214
215    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
216        Vec::new()
217    }
218
219    fn with_new_children(
220        self: Arc<Self>,
221        _: Vec<Arc<dyn ExecutionPlan>>,
222    ) -> Result<Arc<dyn ExecutionPlan>> {
223        Ok(self)
224    }
225
226    /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
227    ///
228    /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
229    /// to [`ExecutionPlan::repartitioned`] for more details.
230    fn repartitioned(
231        &self,
232        target_partitions: usize,
233        config: &ConfigOptions,
234    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
235        let data_source = self.data_source.repartitioned(
236            target_partitions,
237            config.optimizer.repartition_file_min_size,
238            self.properties().eq_properties.output_ordering(),
239        )?;
240
241        if let Some(source) = data_source {
242            let output_partitioning = source.output_partitioning();
243            let plan = self
244                .clone()
245                .with_data_source(source)
246                // Changing source partitioning may invalidate output partitioning. Update it also
247                .with_partitioning(output_partitioning);
248            Ok(Some(Arc::new(plan)))
249        } else {
250            Ok(Some(Arc::new(self.clone())))
251        }
252    }
253
254    fn execute(
255        &self,
256        partition: usize,
257        context: Arc<TaskContext>,
258    ) -> Result<SendableRecordBatchStream> {
259        self.data_source.open(partition, context)
260    }
261
262    fn metrics(&self) -> Option<MetricsSet> {
263        Some(self.data_source.metrics().clone_inner())
264    }
265
266    fn statistics(&self) -> Result<Statistics> {
267        self.data_source.statistics()
268    }
269
270    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
271        if let Some(partition) = partition {
272            let mut statistics = Statistics::new_unknown(&self.schema());
273            if let Some(file_config) =
274                self.data_source.as_any().downcast_ref::<FileScanConfig>()
275            {
276                if let Some(file_group) = file_config.file_groups.get(partition) {
277                    if let Some(stat) = file_group.file_statistics(None) {
278                        statistics = stat.clone();
279                    }
280                }
281            }
282            Ok(statistics)
283        } else {
284            Ok(self.data_source.statistics()?)
285        }
286    }
287
288    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
289        let data_source = self.data_source.with_fetch(limit)?;
290        let cache = self.cache.clone();
291
292        Some(Arc::new(Self { data_source, cache }))
293    }
294
295    fn fetch(&self) -> Option<usize> {
296        self.data_source.fetch()
297    }
298
299    fn try_swapping_with_projection(
300        &self,
301        projection: &ProjectionExec,
302    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
303        self.data_source.try_swapping_with_projection(projection)
304    }
305
306    fn handle_child_pushdown_result(
307        &self,
308        child_pushdown_result: ChildPushdownResult,
309        config: &ConfigOptions,
310    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
311        // Push any remaining filters into our data source
312        let res = self.data_source.try_pushdown_filters(
313            child_pushdown_result.parent_filters.collect_all(),
314            config,
315        )?;
316        match res.updated_node {
317            Some(data_source) => {
318                let mut new_node = self.clone();
319                new_node.data_source = data_source;
320                new_node.cache =
321                    Self::compute_properties(Arc::clone(&new_node.data_source));
322                Ok(FilterPushdownPropagation {
323                    filters: res.filters,
324                    updated_node: Some(Arc::new(new_node)),
325                })
326            }
327            None => Ok(FilterPushdownPropagation {
328                filters: res.filters,
329                updated_node: None,
330            }),
331        }
332    }
333}
334
335impl DataSourceExec {
336    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
337        Arc::new(Self::new(Arc::new(data_source)))
338    }
339
340    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
341        let cache = Self::compute_properties(Arc::clone(&data_source));
342        Self { data_source, cache }
343    }
344
345    /// Return the source object
346    pub fn data_source(&self) -> &Arc<dyn DataSource> {
347        &self.data_source
348    }
349
350    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
351        self.cache = Self::compute_properties(Arc::clone(&data_source));
352        self.data_source = data_source;
353        self
354    }
355
356    /// Assign constraints
357    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
358        self.cache = self.cache.with_constraints(constraints);
359        self
360    }
361
362    /// Assign output partitioning
363    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
364        self.cache = self.cache.with_partitioning(partitioning);
365        self
366    }
367
368    fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
369        PlanProperties::new(
370            data_source.eq_properties(),
371            data_source.output_partitioning(),
372            EmissionType::Incremental,
373            Boundedness::Bounded,
374        )
375    }
376
377    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
378    ///
379    /// Returns `None` if
380    /// 1. the datasource is not scanning files (`FileScanConfig`)
381    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
382    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
383        self.data_source()
384            .as_any()
385            .downcast_ref::<FileScanConfig>()
386            .and_then(|file_scan_conf| {
387                file_scan_conf
388                    .file_source()
389                    .as_any()
390                    .downcast_ref::<T>()
391                    .map(|source| (file_scan_conf, source))
392            })
393    }
394}
395
396/// Create a new `DataSourceExec` from a `DataSource`
397impl<S> From<S> for DataSourceExec
398where
399    S: DataSource + 'static,
400{
401    fn from(source: S) -> Self {
402        Self::new(Arc::new(source))
403    }
404}