datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use datafusion_physical_plan::projection::ProjectionExec;
28use datafusion_physical_plan::{
29    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
30};
31
32use crate::file_scan_config::FileScanConfig;
33use datafusion_common::config::ConfigOptions;
34use datafusion_common::{Constraints, Statistics};
35use datafusion_execution::{SendableRecordBatchStream, TaskContext};
36use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
37use datafusion_physical_expr_common::sort_expr::LexOrdering;
38
39/// Common behaviors in Data Sources for both from Files and Memory.
40///
41/// # See Also
42/// * [`DataSourceExec`] for physical plan implementation
43/// * [`FileSource`] for file format implementations (Parquet, Json, etc)
44///
45/// # Notes
46/// Requires `Debug` to assist debugging
47///
48/// [`FileSource`]: crate::file::FileSource
49pub trait DataSource: Send + Sync + Debug {
50    fn open(
51        &self,
52        partition: usize,
53        context: Arc<TaskContext>,
54    ) -> datafusion_common::Result<SendableRecordBatchStream>;
55    fn as_any(&self) -> &dyn Any;
56    /// Format this source for display in explain plans
57    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
58
59    /// Return a copy of this DataSource with a new partitioning scheme
60    fn repartitioned(
61        &self,
62        _target_partitions: usize,
63        _repartition_file_min_size: usize,
64        _output_ordering: Option<LexOrdering>,
65    ) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
66        Ok(None)
67    }
68
69    fn output_partitioning(&self) -> Partitioning;
70    fn eq_properties(&self) -> EquivalenceProperties;
71    fn statistics(&self) -> datafusion_common::Result<Statistics>;
72    /// Return a copy of this DataSource with a new fetch limit
73    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
74    fn fetch(&self) -> Option<usize>;
75    fn metrics(&self) -> ExecutionPlanMetricsSet {
76        ExecutionPlanMetricsSet::new()
77    }
78    fn try_swapping_with_projection(
79        &self,
80        _projection: &ProjectionExec,
81    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
82}
83
84/// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET
85///
86/// `DataSourceExec` implements common functionality such as applying projections,
87/// and caching plan properties.
88///
89/// The [`DataSource`] trait describes where to find the data for this data
90/// source (for example what files or what in memory partitions). Format
91/// specifics are implemented with the [`FileSource`] trait.
92///
93/// [`FileSource`]: crate::file::FileSource
94#[derive(Clone, Debug)]
95pub struct DataSourceExec {
96    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
97    data_source: Arc<dyn DataSource>,
98    /// Cached plan properties such as sort order
99    cache: PlanProperties,
100}
101
102impl DisplayAs for DataSourceExec {
103    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
104        match t {
105            DisplayFormatType::Default | DisplayFormatType::Verbose => {
106                write!(f, "DataSourceExec: ")?;
107            }
108            DisplayFormatType::TreeRender => {}
109        }
110        self.data_source.fmt_as(t, f)
111    }
112}
113
114impl ExecutionPlan for DataSourceExec {
115    fn name(&self) -> &'static str {
116        "DataSourceExec"
117    }
118
119    fn as_any(&self) -> &dyn Any {
120        self
121    }
122
123    fn properties(&self) -> &PlanProperties {
124        &self.cache
125    }
126
127    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
128        Vec::new()
129    }
130
131    fn with_new_children(
132        self: Arc<Self>,
133        _: Vec<Arc<dyn ExecutionPlan>>,
134    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
135        Ok(self)
136    }
137
138    fn repartitioned(
139        &self,
140        target_partitions: usize,
141        config: &ConfigOptions,
142    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
143        let data_source = self.data_source.repartitioned(
144            target_partitions,
145            config.optimizer.repartition_file_min_size,
146            self.properties().eq_properties.output_ordering(),
147        )?;
148
149        if let Some(source) = data_source {
150            let output_partitioning = source.output_partitioning();
151            let plan = self
152                .clone()
153                .with_data_source(source)
154                // Changing source partitioning may invalidate output partitioning. Update it also
155                .with_partitioning(output_partitioning);
156            Ok(Some(Arc::new(plan)))
157        } else {
158            Ok(Some(Arc::new(self.clone())))
159        }
160    }
161
162    fn execute(
163        &self,
164        partition: usize,
165        context: Arc<TaskContext>,
166    ) -> datafusion_common::Result<SendableRecordBatchStream> {
167        self.data_source.open(partition, context)
168    }
169
170    fn metrics(&self) -> Option<MetricsSet> {
171        Some(self.data_source.metrics().clone_inner())
172    }
173
174    fn statistics(&self) -> datafusion_common::Result<Statistics> {
175        self.data_source.statistics()
176    }
177
178    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
179        let data_source = self.data_source.with_fetch(limit)?;
180        let cache = self.cache.clone();
181
182        Some(Arc::new(Self { data_source, cache }))
183    }
184
185    fn fetch(&self) -> Option<usize> {
186        self.data_source.fetch()
187    }
188
189    fn try_swapping_with_projection(
190        &self,
191        projection: &ProjectionExec,
192    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
193        self.data_source.try_swapping_with_projection(projection)
194    }
195}
196
197impl DataSourceExec {
198    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
199        Arc::new(Self::new(Arc::new(data_source)))
200    }
201
202    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
203        let cache = Self::compute_properties(Arc::clone(&data_source));
204        Self { data_source, cache }
205    }
206
207    /// Return the source object
208    pub fn data_source(&self) -> &Arc<dyn DataSource> {
209        &self.data_source
210    }
211
212    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
213        self.cache = Self::compute_properties(Arc::clone(&data_source));
214        self.data_source = data_source;
215        self
216    }
217
218    /// Assign constraints
219    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
220        self.cache = self.cache.with_constraints(constraints);
221        self
222    }
223
224    /// Assign output partitioning
225    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
226        self.cache = self.cache.with_partitioning(partitioning);
227        self
228    }
229
230    fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
231        PlanProperties::new(
232            data_source.eq_properties(),
233            data_source.output_partitioning(),
234            EmissionType::Incremental,
235            Boundedness::Bounded,
236        )
237    }
238
239    /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
240    ///
241    /// Returns `None` if
242    /// 1. the datasource is not scanning files (`FileScanConfig`)
243    /// 2. The [`FileScanConfig::file_source`] is not of type `T`
244    pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
245        self.data_source()
246            .as_any()
247            .downcast_ref::<FileScanConfig>()
248            .and_then(|file_scan_conf| {
249                file_scan_conf
250                    .file_source()
251                    .as_any()
252                    .downcast_ref::<T>()
253                    .map(|source| (file_scan_conf, source))
254            })
255    }
256}