datafusion_datasource/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{
26 Boundedness, EmissionType, SchedulingType,
27};
28use datafusion_physical_plan::metrics::SplitMetrics;
29use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
30use datafusion_physical_plan::projection::ProjectionExec;
31use datafusion_physical_plan::stream::BatchSplitStream;
32use datafusion_physical_plan::{
33 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use itertools::Itertools;
36
37use crate::file_scan_config::FileScanConfig;
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{Constraints, Result, Statistics};
40use datafusion_execution::{SendableRecordBatchStream, TaskContext};
41use datafusion_physical_expr::{
42 conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
43};
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::filter::collect_columns_from_predicate;
46use datafusion_physical_plan::filter_pushdown::{
47 ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
48};
49
50/// A source of data, typically a list of files or memory
51///
52/// This trait provides common behaviors for abstract sources of data. It has
53/// two common implementations:
54///
55/// 1. [`FileScanConfig`]: lists of files
56/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
57///
58/// File format specific behaviors are defined by [`FileSource`]
59///
60/// # See Also
61/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
62/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
63///
64/// # Notes
65///
66/// Requires `Debug` to assist debugging
67///
68/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
69/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
70/// [`FileSource`]: crate::file::FileSource
71/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
72/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
73///
74/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
75/// ```text
76/// ┌─────────────────────┐ -----► execute path
77/// │ │ ┄┄┄┄┄► init path
78/// │ DataSourceExec │
79/// │ │
80/// └───────▲─────────────┘
81/// ┊ │
82/// ┊ │
83/// ┌──────────▼──────────┐ ┌──────────-──────────┐
84/// │ │ | |
85/// │ DataSource(trait) │ | TableProvider(trait)|
86/// │ │ | |
87/// └───────▲─────────────┘ └─────────────────────┘
88/// ┊ │ ┊
89/// ┌───────────────┿──┴────────────────┐ ┊
90/// | ┌┄┄┄┄┄┄┄┄┄┄┄┘ | ┊
91/// | ┊ | ┊
92/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┊
93/// │ │ │ │ ┌──────────▼──────────┐
94/// │ FileScanConfig │ │ MemorySourceConfig │ | |
95/// │ │ │ │ | FileFormat(trait) |
96/// └──────────────▲──────┘ └─────────────────────┘ | |
97/// │ ┊ └─────────────────────┘
98/// │ ┊ ┊
99/// │ ┊ ┊
100/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
101/// │ │ │ ArrowSource │
102/// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │
103/// │ │ │ ParquetSource │
104/// └─────────────────────┘ └─────────────────────┘
105/// │
106/// │
107/// │
108/// │
109/// ┌──────────▼──────────┐
110/// │ ArrowSource │
111/// │ ... │
112/// │ ParquetSource │
113/// └─────────────────────┘
114/// |
115/// FileOpener (called by FileStream)
116/// │
117/// ┌──────────▼──────────┐
118/// │ │
119/// │ RecordBatch │
120/// │ │
121/// └─────────────────────┘
122/// ```
123pub trait DataSource: Send + Sync + Debug {
124 fn open(
125 &self,
126 partition: usize,
127 context: Arc<TaskContext>,
128 ) -> Result<SendableRecordBatchStream>;
129 fn as_any(&self) -> &dyn Any;
130 /// Format this source for display in explain plans
131 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
132
133 /// Return a copy of this DataSource with a new partitioning scheme.
134 ///
135 /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
136 /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
137 ///
138 /// Repartitioning should not change the output ordering, if this ordering exists.
139 /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
140 /// and the FileSource's
141 /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
142 /// for examples.
143 fn repartitioned(
144 &self,
145 _target_partitions: usize,
146 _repartition_file_min_size: usize,
147 _output_ordering: Option<LexOrdering>,
148 ) -> Result<Option<Arc<dyn DataSource>>> {
149 Ok(None)
150 }
151
152 fn output_partitioning(&self) -> Partitioning;
153 fn eq_properties(&self) -> EquivalenceProperties;
154 fn scheduling_type(&self) -> SchedulingType {
155 SchedulingType::NonCooperative
156 }
157 fn statistics(&self) -> Result<Statistics>;
158 /// Return a copy of this DataSource with a new fetch limit
159 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
160 fn fetch(&self) -> Option<usize>;
161 fn metrics(&self) -> ExecutionPlanMetricsSet {
162 ExecutionPlanMetricsSet::new()
163 }
164 fn try_swapping_with_projection(
165 &self,
166 _projection: &ProjectionExec,
167 ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
168 /// Try to push down filters into this DataSource.
169 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
170 ///
171 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
172 fn try_pushdown_filters(
173 &self,
174 filters: Vec<Arc<dyn PhysicalExpr>>,
175 _config: &ConfigOptions,
176 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
177 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
178 vec![PushedDown::No; filters.len()],
179 ))
180 }
181}
182
183/// [`ExecutionPlan`] that reads one or more files
184///
185/// `DataSourceExec` implements common functionality such as applying
186/// projections, and caching plan properties.
187///
188/// The [`DataSource`] describes where to find the data for this data source
189/// (for example in files or what in memory partitions).
190///
191/// For file based [`DataSource`]s, format specific behavior is implemented in
192/// the [`FileSource`] trait.
193///
194/// [`FileSource`]: crate::file::FileSource
195#[derive(Clone, Debug)]
196pub struct DataSourceExec {
197 /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
198 data_source: Arc<dyn DataSource>,
199 /// Cached plan properties such as sort order
200 cache: PlanProperties,
201}
202
203impl DisplayAs for DataSourceExec {
204 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
205 match t {
206 DisplayFormatType::Default | DisplayFormatType::Verbose => {
207 write!(f, "DataSourceExec: ")?;
208 }
209 DisplayFormatType::TreeRender => {}
210 }
211 self.data_source.fmt_as(t, f)
212 }
213}
214
215impl ExecutionPlan for DataSourceExec {
216 fn name(&self) -> &'static str {
217 "DataSourceExec"
218 }
219
220 fn as_any(&self) -> &dyn Any {
221 self
222 }
223
224 fn properties(&self) -> &PlanProperties {
225 &self.cache
226 }
227
228 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229 Vec::new()
230 }
231
232 fn with_new_children(
233 self: Arc<Self>,
234 _: Vec<Arc<dyn ExecutionPlan>>,
235 ) -> Result<Arc<dyn ExecutionPlan>> {
236 Ok(self)
237 }
238
239 /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
240 ///
241 /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
242 /// to [`ExecutionPlan::repartitioned`] for more details.
243 fn repartitioned(
244 &self,
245 target_partitions: usize,
246 config: &ConfigOptions,
247 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
248 let data_source = self.data_source.repartitioned(
249 target_partitions,
250 config.optimizer.repartition_file_min_size,
251 self.properties().eq_properties.output_ordering(),
252 )?;
253
254 if let Some(source) = data_source {
255 let output_partitioning = source.output_partitioning();
256 let plan = self
257 .clone()
258 .with_data_source(source)
259 // Changing source partitioning may invalidate output partitioning. Update it also
260 .with_partitioning(output_partitioning);
261 Ok(Some(Arc::new(plan)))
262 } else {
263 Ok(Some(Arc::new(self.clone())))
264 }
265 }
266
267 fn execute(
268 &self,
269 partition: usize,
270 context: Arc<TaskContext>,
271 ) -> Result<SendableRecordBatchStream> {
272 let stream = self.data_source.open(partition, Arc::clone(&context))?;
273 let batch_size = context.session_config().batch_size();
274 log::debug!(
275 "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
276 );
277 let metrics = self.data_source.metrics();
278 let split_metrics = SplitMetrics::new(&metrics, partition);
279 Ok(Box::pin(BatchSplitStream::new(
280 stream,
281 batch_size,
282 split_metrics,
283 )))
284 }
285
286 fn metrics(&self) -> Option<MetricsSet> {
287 Some(self.data_source.metrics().clone_inner())
288 }
289
290 fn statistics(&self) -> Result<Statistics> {
291 self.data_source.statistics()
292 }
293
294 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
295 if let Some(partition) = partition {
296 let mut statistics = Statistics::new_unknown(&self.schema());
297 if let Some(file_config) =
298 self.data_source.as_any().downcast_ref::<FileScanConfig>()
299 {
300 if let Some(file_group) = file_config.file_groups.get(partition) {
301 if let Some(stat) = file_group.file_statistics(None) {
302 statistics = stat.clone();
303 }
304 }
305 }
306 Ok(statistics)
307 } else {
308 Ok(self.data_source.statistics()?)
309 }
310 }
311
312 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
313 let data_source = self.data_source.with_fetch(limit)?;
314 let cache = self.cache.clone();
315
316 Some(Arc::new(Self { data_source, cache }))
317 }
318
319 fn fetch(&self) -> Option<usize> {
320 self.data_source.fetch()
321 }
322
323 fn try_swapping_with_projection(
324 &self,
325 projection: &ProjectionExec,
326 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
327 self.data_source.try_swapping_with_projection(projection)
328 }
329
330 fn handle_child_pushdown_result(
331 &self,
332 _phase: FilterPushdownPhase,
333 child_pushdown_result: ChildPushdownResult,
334 config: &ConfigOptions,
335 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
336 // Push any remaining filters into our data source
337 let parent_filters = child_pushdown_result
338 .parent_filters
339 .into_iter()
340 .map(|f| f.filter)
341 .collect_vec();
342 let res = self
343 .data_source
344 .try_pushdown_filters(parent_filters.clone(), config)?;
345 match res.updated_node {
346 Some(data_source) => {
347 let mut new_node = self.clone();
348 new_node.data_source = data_source;
349 new_node.cache =
350 Self::compute_properties(Arc::clone(&new_node.data_source));
351
352 // Recompute equivalence info using new filters
353 let filter = conjunction(
354 res.filters
355 .iter()
356 .zip(parent_filters)
357 .filter_map(|(s, f)| match s {
358 PushedDown::Yes => Some(f),
359 PushedDown::No => None,
360 })
361 .collect_vec(),
362 );
363 new_node = new_node.add_filter_equivalence_info(filter)?;
364 Ok(FilterPushdownPropagation {
365 filters: res.filters,
366 updated_node: Some(Arc::new(new_node)),
367 })
368 }
369 None => Ok(FilterPushdownPropagation {
370 filters: res.filters,
371 updated_node: None,
372 }),
373 }
374 }
375}
376
377impl DataSourceExec {
378 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
379 Arc::new(Self::new(Arc::new(data_source)))
380 }
381
382 // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
383 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
384 let cache = Self::compute_properties(Arc::clone(&data_source));
385 Self { data_source, cache }
386 }
387
388 /// Return the source object
389 pub fn data_source(&self) -> &Arc<dyn DataSource> {
390 &self.data_source
391 }
392
393 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
394 self.cache = Self::compute_properties(Arc::clone(&data_source));
395 self.data_source = data_source;
396 self
397 }
398
399 /// Assign constraints
400 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
401 self.cache = self.cache.with_constraints(constraints);
402 self
403 }
404
405 /// Assign output partitioning
406 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
407 self.cache = self.cache.with_partitioning(partitioning);
408 self
409 }
410
411 /// Add filters' equivalence info
412 fn add_filter_equivalence_info(
413 mut self,
414 filter: Arc<dyn PhysicalExpr>,
415 ) -> Result<Self> {
416 let (equal_pairs, _) = collect_columns_from_predicate(&filter);
417 for (lhs, rhs) in equal_pairs {
418 self.cache
419 .eq_properties
420 .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
421 }
422 Ok(self)
423 }
424
425 fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
426 PlanProperties::new(
427 data_source.eq_properties(),
428 data_source.output_partitioning(),
429 EmissionType::Incremental,
430 Boundedness::Bounded,
431 )
432 .with_scheduling_type(data_source.scheduling_type())
433 }
434
435 /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
436 ///
437 /// Returns `None` if
438 /// 1. the datasource is not scanning files (`FileScanConfig`)
439 /// 2. The [`FileScanConfig::file_source`] is not of type `T`
440 pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
441 self.data_source()
442 .as_any()
443 .downcast_ref::<FileScanConfig>()
444 .and_then(|file_scan_conf| {
445 file_scan_conf
446 .file_source()
447 .as_any()
448 .downcast_ref::<T>()
449 .map(|source| (file_scan_conf, source))
450 })
451 }
452}
453
454/// Create a new `DataSourceExec` from a `DataSource`
455impl<S> From<S> for DataSourceExec
456where
457 S: DataSource + 'static,
458{
459 fn from(source: S) -> Self {
460 Self::new(Arc::new(source))
461 }
462}