datafusion_datasource/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use datafusion_physical_plan::projection::ProjectionExec;
28use datafusion_physical_plan::{
29    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
30};
31
32use datafusion_common::config::ConfigOptions;
33use datafusion_common::{Constraints, Statistics};
34use datafusion_execution::{SendableRecordBatchStream, TaskContext};
35use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
36use datafusion_physical_expr_common::sort_expr::LexOrdering;
37
38/// Common behaviors in Data Sources for both from Files and Memory.
39///
40/// # See Also
41/// * [`DataSourceExec`] for physical plan implementation
42/// * [`FileSource`] for file format implementations (Parquet, Json, etc)
43///
44/// # Notes
45/// Requires `Debug` to assist debugging
46///
47/// [`FileSource`]: crate::file::FileSource
48pub trait DataSource: Send + Sync + Debug {
49    fn open(
50        &self,
51        partition: usize,
52        context: Arc<TaskContext>,
53    ) -> datafusion_common::Result<SendableRecordBatchStream>;
54    fn as_any(&self) -> &dyn Any;
55    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
56
57    /// Return a copy of this DataSource with a new partitioning scheme
58    fn repartitioned(
59        &self,
60        _target_partitions: usize,
61        _repartition_file_min_size: usize,
62        _output_ordering: Option<LexOrdering>,
63    ) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
64        Ok(None)
65    }
66
67    fn output_partitioning(&self) -> Partitioning;
68    fn eq_properties(&self) -> EquivalenceProperties;
69    fn statistics(&self) -> datafusion_common::Result<Statistics>;
70    /// Return a copy of this DataSource with a new fetch limit
71    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
72    fn fetch(&self) -> Option<usize>;
73    fn metrics(&self) -> ExecutionPlanMetricsSet {
74        ExecutionPlanMetricsSet::new()
75    }
76    fn try_swapping_with_projection(
77        &self,
78        _projection: &ProjectionExec,
79    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
80}
81
82/// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET
83///
84/// `DataSourceExec` implements common functionality such as applying projections,
85/// and caching plan properties.
86///
87/// The [`DataSource`] trait describes where to find the data for this data
88/// source (for example what files or what in memory partitions). Format
89/// specifics are implemented with the [`FileSource`] trait.
90///
91/// [`FileSource`]: crate::file::FileSource
92#[derive(Clone, Debug)]
93pub struct DataSourceExec {
94    /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
95    data_source: Arc<dyn DataSource>,
96    /// Cached plan properties such as sort order
97    cache: PlanProperties,
98}
99
100impl DisplayAs for DataSourceExec {
101    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
102        write!(f, "DataSourceExec: ")?;
103        self.data_source.fmt_as(t, f)
104    }
105}
106
107impl ExecutionPlan for DataSourceExec {
108    fn name(&self) -> &'static str {
109        "DataSourceExec"
110    }
111
112    fn as_any(&self) -> &dyn Any {
113        self
114    }
115
116    fn properties(&self) -> &PlanProperties {
117        &self.cache
118    }
119
120    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
121        Vec::new()
122    }
123
124    fn with_new_children(
125        self: Arc<Self>,
126        _: Vec<Arc<dyn ExecutionPlan>>,
127    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
128        Ok(self)
129    }
130
131    fn repartitioned(
132        &self,
133        target_partitions: usize,
134        config: &ConfigOptions,
135    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
136        let data_source = self.data_source.repartitioned(
137            target_partitions,
138            config.optimizer.repartition_file_min_size,
139            self.properties().eq_properties.output_ordering(),
140        )?;
141
142        if let Some(source) = data_source {
143            let output_partitioning = source.output_partitioning();
144            let plan = self
145                .clone()
146                .with_data_source(source)
147                // Changing source partitioning may invalidate output partitioning. Update it also
148                .with_partitioning(output_partitioning);
149            Ok(Some(Arc::new(plan)))
150        } else {
151            Ok(Some(Arc::new(self.clone())))
152        }
153    }
154
155    fn execute(
156        &self,
157        partition: usize,
158        context: Arc<TaskContext>,
159    ) -> datafusion_common::Result<SendableRecordBatchStream> {
160        self.data_source.open(partition, context)
161    }
162
163    fn metrics(&self) -> Option<MetricsSet> {
164        Some(self.data_source.metrics().clone_inner())
165    }
166
167    fn statistics(&self) -> datafusion_common::Result<Statistics> {
168        self.data_source.statistics()
169    }
170
171    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
172        let data_source = self.data_source.with_fetch(limit)?;
173        let cache = self.cache.clone();
174
175        Some(Arc::new(Self { data_source, cache }))
176    }
177
178    fn fetch(&self) -> Option<usize> {
179        self.data_source.fetch()
180    }
181
182    fn try_swapping_with_projection(
183        &self,
184        projection: &ProjectionExec,
185    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
186        self.data_source.try_swapping_with_projection(projection)
187    }
188}
189
190impl DataSourceExec {
191    pub fn new(data_source: Arc<dyn DataSource>) -> Self {
192        let cache = Self::compute_properties(Arc::clone(&data_source));
193        Self { data_source, cache }
194    }
195
196    /// Return the source object
197    pub fn data_source(&self) -> &Arc<dyn DataSource> {
198        &self.data_source
199    }
200
201    pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
202        self.cache = Self::compute_properties(Arc::clone(&data_source));
203        self.data_source = data_source;
204        self
205    }
206
207    /// Assign constraints
208    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
209        self.cache = self.cache.with_constraints(constraints);
210        self
211    }
212
213    /// Assign output partitioning
214    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
215        self.cache = self.cache.with_partitioning(partitioning);
216        self
217    }
218
219    fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
220        PlanProperties::new(
221            data_source.eq_properties(),
222            data_source.output_partitioning(),
223            EmissionType::Incremental,
224            Boundedness::Bounded,
225        )
226    }
227}