datafusion_datasource/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use datafusion_physical_plan::projection::ProjectionExec;
28use datafusion_physical_plan::{
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
30};
31
32use crate::file_scan_config::FileScanConfig;
33use datafusion_common::config::ConfigOptions;
34use datafusion_common::{Constraints, Result, Statistics};
35use datafusion_execution::{SendableRecordBatchStream, TaskContext};
36use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
37use datafusion_physical_expr_common::sort_expr::LexOrdering;
38use datafusion_physical_plan::filter_pushdown::{
39 ChildPushdownResult, FilterPushdownPropagation,
40};
41
42/// A source of data, typically a list of files or memory
43///
44/// This trait provides common behaviors for abstract sources of data. It has
45/// two common implementations:
46///
47/// 1. [`FileScanConfig`]: lists of files
48/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
49///
50/// File format specific behaviors are defined by [`FileSource`]
51///
52/// # See Also
53/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
54/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
55///
56/// # Notes
57///
58/// Requires `Debug` to assist debugging
59///
60/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
61/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
62/// [`FileSource`]: crate::file::FileSource
63/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
64/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
65///
66/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
67/// ```text
68/// ┌─────────────────────┐ -----► execute path
69/// │ │ ┄┄┄┄┄► init path
70/// │ DataSourceExec │
71/// │ │
72/// └───────▲─────────────┘
73/// ┊ │
74/// ┊ │
75/// ┌──────────▼──────────┐ ┌──────────-──────────┐
76/// │ │ | |
77/// │ DataSource(trait) │ | TableProvider(trait)|
78/// │ │ | |
79/// └───────▲─────────────┘ └─────────────────────┘
80/// ┊ │ ┊
81/// ┌───────────────┿──┴────────────────┐ ┊
82/// | ┌┄┄┄┄┄┄┄┄┄┄┄┘ | ┊
83/// | ┊ | ┊
84/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┊
85/// │ │ │ │ ┌──────────▼──────────┐
86/// │ FileScanConfig │ │ MemorySourceConfig │ | |
87/// │ │ │ │ | FileFormat(trait) |
88/// └──────────────▲──────┘ └─────────────────────┘ | |
89/// │ ┊ └─────────────────────┘
90/// │ ┊ ┊
91/// │ ┊ ┊
92/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
93/// │ │ │ ArrowSource │
94/// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │
95/// │ │ │ ParquetSource │
96/// └─────────────────────┘ └─────────────────────┘
97/// │
98/// │
99/// │
100/// │
101/// ┌──────────▼──────────┐
102/// │ ArrowSource │
103/// │ ... │
104/// │ ParquetSource │
105/// └─────────────────────┘
106/// |
107/// FileOpener (called by FileStream)
108/// │
109/// ┌──────────▼──────────┐
110/// │ │
111/// │ RecordBatch │
112/// │ │
113/// └─────────────────────┘
114/// ```
115pub trait DataSource: Send + Sync + Debug {
116 fn open(
117 &self,
118 partition: usize,
119 context: Arc<TaskContext>,
120 ) -> Result<SendableRecordBatchStream>;
121 fn as_any(&self) -> &dyn Any;
122 /// Format this source for display in explain plans
123 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
124
125 /// Return a copy of this DataSource with a new partitioning scheme.
126 ///
127 /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
128 /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
129 ///
130 /// Repartitioning should not change the output ordering, if this ordering exists.
131 /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
132 /// and the FileSource's
133 /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
134 /// for examples.
135 fn repartitioned(
136 &self,
137 _target_partitions: usize,
138 _repartition_file_min_size: usize,
139 _output_ordering: Option<LexOrdering>,
140 ) -> Result<Option<Arc<dyn DataSource>>> {
141 Ok(None)
142 }
143
144 fn output_partitioning(&self) -> Partitioning;
145 fn eq_properties(&self) -> EquivalenceProperties;
146 fn statistics(&self) -> Result<Statistics>;
147 /// Return a copy of this DataSource with a new fetch limit
148 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
149 fn fetch(&self) -> Option<usize>;
150 fn metrics(&self) -> ExecutionPlanMetricsSet {
151 ExecutionPlanMetricsSet::new()
152 }
153 fn try_swapping_with_projection(
154 &self,
155 _projection: &ProjectionExec,
156 ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
157 /// Try to push down filters into this DataSource.
158 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
159 ///
160 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
161 fn try_pushdown_filters(
162 &self,
163 filters: Vec<Arc<dyn PhysicalExpr>>,
164 _config: &ConfigOptions,
165 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
166 Ok(FilterPushdownPropagation::unsupported(filters))
167 }
168}
169
170/// [`ExecutionPlan`] that reads one or more files
171///
172/// `DataSourceExec` implements common functionality such as applying
173/// projections, and caching plan properties.
174///
175/// The [`DataSource`] describes where to find the data for this data source
176/// (for example in files or what in memory partitions).
177///
178/// For file based [`DataSource`]s, format specific behavior is implemented in
179/// the [`FileSource`] trait.
180///
181/// [`FileSource`]: crate::file::FileSource
182#[derive(Clone, Debug)]
183pub struct DataSourceExec {
184 /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
185 data_source: Arc<dyn DataSource>,
186 /// Cached plan properties such as sort order
187 cache: PlanProperties,
188}
189
190impl DisplayAs for DataSourceExec {
191 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
192 match t {
193 DisplayFormatType::Default | DisplayFormatType::Verbose => {
194 write!(f, "DataSourceExec: ")?;
195 }
196 DisplayFormatType::TreeRender => {}
197 }
198 self.data_source.fmt_as(t, f)
199 }
200}
201
202impl ExecutionPlan for DataSourceExec {
203 fn name(&self) -> &'static str {
204 "DataSourceExec"
205 }
206
207 fn as_any(&self) -> &dyn Any {
208 self
209 }
210
211 fn properties(&self) -> &PlanProperties {
212 &self.cache
213 }
214
215 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
216 Vec::new()
217 }
218
219 fn with_new_children(
220 self: Arc<Self>,
221 _: Vec<Arc<dyn ExecutionPlan>>,
222 ) -> Result<Arc<dyn ExecutionPlan>> {
223 Ok(self)
224 }
225
226 /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
227 ///
228 /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
229 /// to [`ExecutionPlan::repartitioned`] for more details.
230 fn repartitioned(
231 &self,
232 target_partitions: usize,
233 config: &ConfigOptions,
234 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
235 let data_source = self.data_source.repartitioned(
236 target_partitions,
237 config.optimizer.repartition_file_min_size,
238 self.properties().eq_properties.output_ordering(),
239 )?;
240
241 if let Some(source) = data_source {
242 let output_partitioning = source.output_partitioning();
243 let plan = self
244 .clone()
245 .with_data_source(source)
246 // Changing source partitioning may invalidate output partitioning. Update it also
247 .with_partitioning(output_partitioning);
248 Ok(Some(Arc::new(plan)))
249 } else {
250 Ok(Some(Arc::new(self.clone())))
251 }
252 }
253
254 fn execute(
255 &self,
256 partition: usize,
257 context: Arc<TaskContext>,
258 ) -> Result<SendableRecordBatchStream> {
259 self.data_source.open(partition, context)
260 }
261
262 fn metrics(&self) -> Option<MetricsSet> {
263 Some(self.data_source.metrics().clone_inner())
264 }
265
266 fn statistics(&self) -> Result<Statistics> {
267 self.data_source.statistics()
268 }
269
270 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
271 if let Some(partition) = partition {
272 let mut statistics = Statistics::new_unknown(&self.schema());
273 if let Some(file_config) =
274 self.data_source.as_any().downcast_ref::<FileScanConfig>()
275 {
276 if let Some(file_group) = file_config.file_groups.get(partition) {
277 if let Some(stat) = file_group.file_statistics(None) {
278 statistics = stat.clone();
279 }
280 }
281 }
282 Ok(statistics)
283 } else {
284 Ok(self.data_source.statistics()?)
285 }
286 }
287
288 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
289 let data_source = self.data_source.with_fetch(limit)?;
290 let cache = self.cache.clone();
291
292 Some(Arc::new(Self { data_source, cache }))
293 }
294
295 fn fetch(&self) -> Option<usize> {
296 self.data_source.fetch()
297 }
298
299 fn try_swapping_with_projection(
300 &self,
301 projection: &ProjectionExec,
302 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
303 self.data_source.try_swapping_with_projection(projection)
304 }
305
306 fn handle_child_pushdown_result(
307 &self,
308 child_pushdown_result: ChildPushdownResult,
309 config: &ConfigOptions,
310 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
311 // Push any remaining filters into our data source
312 let res = self.data_source.try_pushdown_filters(
313 child_pushdown_result.parent_filters.collect_all(),
314 config,
315 )?;
316 match res.updated_node {
317 Some(data_source) => {
318 let mut new_node = self.clone();
319 new_node.data_source = data_source;
320 new_node.cache =
321 Self::compute_properties(Arc::clone(&new_node.data_source));
322 Ok(FilterPushdownPropagation {
323 filters: res.filters,
324 updated_node: Some(Arc::new(new_node)),
325 })
326 }
327 None => Ok(FilterPushdownPropagation {
328 filters: res.filters,
329 updated_node: None,
330 }),
331 }
332 }
333}
334
335impl DataSourceExec {
336 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
337 Arc::new(Self::new(Arc::new(data_source)))
338 }
339
340 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
341 let cache = Self::compute_properties(Arc::clone(&data_source));
342 Self { data_source, cache }
343 }
344
345 /// Return the source object
346 pub fn data_source(&self) -> &Arc<dyn DataSource> {
347 &self.data_source
348 }
349
350 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
351 self.cache = Self::compute_properties(Arc::clone(&data_source));
352 self.data_source = data_source;
353 self
354 }
355
356 /// Assign constraints
357 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
358 self.cache = self.cache.with_constraints(constraints);
359 self
360 }
361
362 /// Assign output partitioning
363 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
364 self.cache = self.cache.with_partitioning(partitioning);
365 self
366 }
367
368 fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
369 PlanProperties::new(
370 data_source.eq_properties(),
371 data_source.output_partitioning(),
372 EmissionType::Incremental,
373 Boundedness::Bounded,
374 )
375 }
376
377 /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
378 ///
379 /// Returns `None` if
380 /// 1. the datasource is not scanning files (`FileScanConfig`)
381 /// 2. The [`FileScanConfig::file_source`] is not of type `T`
382 pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
383 self.data_source()
384 .as_any()
385 .downcast_ref::<FileScanConfig>()
386 .and_then(|file_scan_conf| {
387 file_scan_conf
388 .file_source()
389 .as_any()
390 .downcast_ref::<T>()
391 .map(|source| (file_scan_conf, source))
392 })
393 }
394}
395
396/// Create a new `DataSourceExec` from a `DataSource`
397impl<S> From<S> for DataSourceExec
398where
399 S: DataSource + 'static,
400{
401 fn from(source: S) -> Self {
402 Self::new(Arc::new(source))
403 }
404}