datafusion_datasource/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_expr::equivalence::ProjectionMapping;
26use datafusion_physical_plan::execution_plan::{
27 Boundedness, EmissionType, SchedulingType,
28};
29use datafusion_physical_plan::metrics::SplitMetrics;
30use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
31use datafusion_physical_plan::projection::ProjectionExec;
32use datafusion_physical_plan::stream::BatchSplitStream;
33use datafusion_physical_plan::{
34 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
35};
36use itertools::Itertools;
37
38use crate::file_scan_config::FileScanConfig;
39use datafusion_common::config::ConfigOptions;
40use datafusion_common::{Constraints, Result, Statistics};
41use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42use datafusion_physical_expr::{
43 conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
44};
45use datafusion_physical_expr_common::sort_expr::LexOrdering;
46use datafusion_physical_plan::filter::collect_columns_from_predicate;
47use datafusion_physical_plan::filter_pushdown::{
48 ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
49};
50
51/// A source of data, typically a list of files or memory
52///
53/// This trait provides common behaviors for abstract sources of data. It has
54/// two common implementations:
55///
56/// 1. [`FileScanConfig`]: lists of files
57/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
58///
59/// File format specific behaviors are defined by [`FileSource`]
60///
61/// # See Also
62/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
63/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
64///
65/// # Notes
66///
67/// Requires `Debug` to assist debugging
68///
69/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
70/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
71/// [`FileSource`]: crate::file::FileSource
72/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
73/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
74///
75/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
76/// ```text
77/// ┌─────────────────────┐ -----► execute path
78/// │ │ ┄┄┄┄┄► init path
79/// │ DataSourceExec │
80/// │ │
81/// └───────▲─────────────┘
82/// ┊ │
83/// ┊ │
84/// ┌──────────▼──────────┐ ┌──────────-──────────┐
85/// │ │ | |
86/// │ DataSource(trait) │ | TableProvider(trait)|
87/// │ │ | |
88/// └───────▲─────────────┘ └─────────────────────┘
89/// ┊ │ ┊
90/// ┌───────────────┿──┴────────────────┐ ┊
91/// | ┌┄┄┄┄┄┄┄┄┄┄┄┘ | ┊
92/// | ┊ | ┊
93/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┊
94/// │ │ │ │ ┌──────────▼──────────┐
95/// │ FileScanConfig │ │ MemorySourceConfig │ | |
96/// │ │ │ │ | FileFormat(trait) |
97/// └──────────────▲──────┘ └─────────────────────┘ | |
98/// │ ┊ └─────────────────────┘
99/// │ ┊ ┊
100/// │ ┊ ┊
101/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
102/// │ │ │ ArrowSource │
103/// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │
104/// │ │ │ ParquetSource │
105/// └─────────────────────┘ └─────────────────────┘
106/// │
107/// │
108/// │
109/// │
110/// ┌──────────▼──────────┐
111/// │ ArrowSource │
112/// │ ... │
113/// │ ParquetSource │
114/// └─────────────────────┘
115/// |
116/// FileOpener (called by FileStream)
117/// │
118/// ┌──────────▼──────────┐
119/// │ │
120/// │ RecordBatch │
121/// │ │
122/// └─────────────────────┘
123/// ```
124pub trait DataSource: Send + Sync + Debug {
125 fn open(
126 &self,
127 partition: usize,
128 context: Arc<TaskContext>,
129 ) -> Result<SendableRecordBatchStream>;
130 fn as_any(&self) -> &dyn Any;
131 /// Format this source for display in explain plans
132 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
133
134 /// Return a copy of this DataSource with a new partitioning scheme.
135 ///
136 /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
137 /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
138 ///
139 /// Repartitioning should not change the output ordering, if this ordering exists.
140 /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
141 /// and the FileSource's
142 /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
143 /// for examples.
144 fn repartitioned(
145 &self,
146 _target_partitions: usize,
147 _repartition_file_min_size: usize,
148 _output_ordering: Option<LexOrdering>,
149 ) -> Result<Option<Arc<dyn DataSource>>> {
150 Ok(None)
151 }
152
153 fn output_partitioning(&self) -> Partitioning;
154 fn eq_properties(&self) -> EquivalenceProperties;
155 fn scheduling_type(&self) -> SchedulingType {
156 SchedulingType::NonCooperative
157 }
158 fn statistics(&self) -> Result<Statistics>;
159 /// Return a copy of this DataSource with a new fetch limit
160 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
161 fn fetch(&self) -> Option<usize>;
162 fn metrics(&self) -> ExecutionPlanMetricsSet {
163 ExecutionPlanMetricsSet::new()
164 }
165 fn try_swapping_with_projection(
166 &self,
167 _projection: &ProjectionExec,
168 ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
169 /// Try to push down filters into this DataSource.
170 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
171 ///
172 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
173 fn try_pushdown_filters(
174 &self,
175 filters: Vec<Arc<dyn PhysicalExpr>>,
176 _config: &ConfigOptions,
177 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
178 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
179 vec![PushedDown::No; filters.len()],
180 ))
181 }
182}
183
184/// [`ExecutionPlan`] that reads one or more files
185///
186/// `DataSourceExec` implements common functionality such as applying
187/// projections, and caching plan properties.
188///
189/// The [`DataSource`] describes where to find the data for this data source
190/// (for example in files or what in memory partitions).
191///
192/// For file based [`DataSource`]s, format specific behavior is implemented in
193/// the [`FileSource`] trait.
194///
195/// [`FileSource`]: crate::file::FileSource
196#[derive(Clone, Debug)]
197pub struct DataSourceExec {
198 /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
199 data_source: Arc<dyn DataSource>,
200 /// Cached plan properties such as sort order
201 cache: PlanProperties,
202}
203
204impl DisplayAs for DataSourceExec {
205 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
206 match t {
207 DisplayFormatType::Default | DisplayFormatType::Verbose => {
208 write!(f, "DataSourceExec: ")?;
209 }
210 DisplayFormatType::TreeRender => {}
211 }
212 self.data_source.fmt_as(t, f)
213 }
214}
215
216impl ExecutionPlan for DataSourceExec {
217 fn name(&self) -> &'static str {
218 "DataSourceExec"
219 }
220
221 fn as_any(&self) -> &dyn Any {
222 self
223 }
224
225 fn properties(&self) -> &PlanProperties {
226 &self.cache
227 }
228
229 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
230 Vec::new()
231 }
232
233 fn with_new_children(
234 self: Arc<Self>,
235 _: Vec<Arc<dyn ExecutionPlan>>,
236 ) -> Result<Arc<dyn ExecutionPlan>> {
237 Ok(self)
238 }
239
240 /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
241 ///
242 /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
243 /// to [`ExecutionPlan::repartitioned`] for more details.
244 fn repartitioned(
245 &self,
246 target_partitions: usize,
247 config: &ConfigOptions,
248 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
249 let data_source = self.data_source.repartitioned(
250 target_partitions,
251 config.optimizer.repartition_file_min_size,
252 self.properties().eq_properties.output_ordering(),
253 )?;
254
255 if let Some(source) = data_source {
256 let output_partitioning = source.output_partitioning();
257 let plan = self
258 .clone()
259 .with_data_source(source)
260 // Changing source partitioning may invalidate output partitioning. Update it also
261 .with_partitioning(output_partitioning);
262 Ok(Some(Arc::new(plan)))
263 } else {
264 Ok(Some(Arc::new(self.clone())))
265 }
266 }
267
268 fn execute(
269 &self,
270 partition: usize,
271 context: Arc<TaskContext>,
272 ) -> Result<SendableRecordBatchStream> {
273 let stream = self.data_source.open(partition, Arc::clone(&context))?;
274 let batch_size = context.session_config().batch_size();
275 log::debug!(
276 "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
277 );
278 let metrics = self.data_source.metrics();
279 let split_metrics = SplitMetrics::new(&metrics, partition);
280 Ok(Box::pin(BatchSplitStream::new(
281 stream,
282 batch_size,
283 split_metrics,
284 )))
285 }
286
287 fn metrics(&self) -> Option<MetricsSet> {
288 Some(self.data_source.metrics().clone_inner())
289 }
290
291 fn statistics(&self) -> Result<Statistics> {
292 self.data_source.statistics()
293 }
294
295 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
296 if let Some(partition) = partition {
297 let mut statistics = Statistics::new_unknown(&self.schema());
298 if let Some(file_config) =
299 self.data_source.as_any().downcast_ref::<FileScanConfig>()
300 {
301 if let Some(file_group) = file_config.file_groups.get(partition) {
302 if let Some(stat) = file_group.file_statistics(None) {
303 statistics = stat.clone();
304 }
305 }
306 }
307 Ok(statistics)
308 } else {
309 Ok(self.data_source.statistics()?)
310 }
311 }
312
313 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
314 let data_source = self.data_source.with_fetch(limit)?;
315 let cache = self.cache.clone();
316
317 Some(Arc::new(Self { data_source, cache }))
318 }
319
320 fn fetch(&self) -> Option<usize> {
321 self.data_source.fetch()
322 }
323
324 fn try_swapping_with_projection(
325 &self,
326 projection: &ProjectionExec,
327 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
328 match self.data_source.try_swapping_with_projection(projection)? {
329 Some(new_plan) => {
330 if let Some(new_data_source_exec) =
331 new_plan.as_any().downcast_ref::<DataSourceExec>()
332 {
333 let projection_mapping = ProjectionMapping::try_new(
334 projection.expr().iter().cloned(),
335 &self.schema(),
336 )?;
337
338 // Project the equivalence properties to the new schema
339 let projected_eq_properties = self
340 .cache
341 .eq_properties
342 .project(&projection_mapping, new_data_source_exec.schema());
343
344 let preserved_exec = DataSourceExec {
345 data_source: Arc::clone(&new_data_source_exec.data_source),
346 cache: PlanProperties::new(
347 projected_eq_properties,
348 new_data_source_exec.cache.partitioning.clone(),
349 new_data_source_exec.cache.emission_type,
350 new_data_source_exec.cache.boundedness,
351 )
352 .with_scheduling_type(new_data_source_exec.cache.scheduling_type),
353 };
354 Ok(Some(Arc::new(preserved_exec)))
355 } else {
356 Ok(Some(new_plan))
357 }
358 }
359 None => Ok(None),
360 }
361 }
362
363 fn handle_child_pushdown_result(
364 &self,
365 _phase: FilterPushdownPhase,
366 child_pushdown_result: ChildPushdownResult,
367 config: &ConfigOptions,
368 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
369 // Push any remaining filters into our data source
370 let parent_filters = child_pushdown_result
371 .parent_filters
372 .into_iter()
373 .map(|f| f.filter)
374 .collect_vec();
375 let res = self
376 .data_source
377 .try_pushdown_filters(parent_filters.clone(), config)?;
378 match res.updated_node {
379 Some(data_source) => {
380 let mut new_node = self.clone();
381 new_node.data_source = data_source;
382 new_node.cache =
383 Self::compute_properties(Arc::clone(&new_node.data_source));
384
385 // Recompute equivalence info using new filters
386 let filter = conjunction(
387 res.filters
388 .iter()
389 .zip(parent_filters)
390 .filter_map(|(s, f)| match s {
391 PushedDown::Yes => Some(f),
392 PushedDown::No => None,
393 })
394 .collect_vec(),
395 );
396 new_node = new_node.add_filter_equivalence_info(filter)?;
397 Ok(FilterPushdownPropagation {
398 filters: res.filters,
399 updated_node: Some(Arc::new(new_node)),
400 })
401 }
402 None => Ok(FilterPushdownPropagation {
403 filters: res.filters,
404 updated_node: None,
405 }),
406 }
407 }
408}
409
410impl DataSourceExec {
411 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
412 Arc::new(Self::new(Arc::new(data_source)))
413 }
414
415 // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
416 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
417 let cache = Self::compute_properties(Arc::clone(&data_source));
418 Self { data_source, cache }
419 }
420
421 /// Return the source object
422 pub fn data_source(&self) -> &Arc<dyn DataSource> {
423 &self.data_source
424 }
425
426 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
427 self.cache = Self::compute_properties(Arc::clone(&data_source));
428 self.data_source = data_source;
429 self
430 }
431
432 /// Assign constraints
433 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
434 self.cache = self.cache.with_constraints(constraints);
435 self
436 }
437
438 /// Assign output partitioning
439 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
440 self.cache = self.cache.with_partitioning(partitioning);
441 self
442 }
443
444 /// Add filters' equivalence info
445 fn add_filter_equivalence_info(
446 mut self,
447 filter: Arc<dyn PhysicalExpr>,
448 ) -> Result<Self> {
449 let (equal_pairs, _) = collect_columns_from_predicate(&filter);
450 for (lhs, rhs) in equal_pairs {
451 self.cache
452 .eq_properties
453 .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
454 }
455 Ok(self)
456 }
457
458 fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
459 PlanProperties::new(
460 data_source.eq_properties(),
461 data_source.output_partitioning(),
462 EmissionType::Incremental,
463 Boundedness::Bounded,
464 )
465 .with_scheduling_type(data_source.scheduling_type())
466 }
467
468 /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
469 ///
470 /// Returns `None` if
471 /// 1. the datasource is not scanning files (`FileScanConfig`)
472 /// 2. The [`FileScanConfig::file_source`] is not of type `T`
473 pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
474 self.data_source()
475 .as_any()
476 .downcast_ref::<FileScanConfig>()
477 .and_then(|file_scan_conf| {
478 file_scan_conf
479 .file_source()
480 .as_any()
481 .downcast_ref::<T>()
482 .map(|source| (file_scan_conf, source))
483 })
484 }
485}
486
487/// Create a new `DataSourceExec` from a `DataSource`
488impl<S> From<S> for DataSourceExec
489where
490 S: DataSource + 'static,
491{
492 fn from(source: S) -> Self {
493 Self::new(Arc::new(source))
494 }
495}