datafusion_datasource/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_expr::projection::ProjectionExprs;
26use datafusion_physical_plan::execution_plan::{
27 Boundedness, EmissionType, SchedulingType,
28};
29use datafusion_physical_plan::metrics::SplitMetrics;
30use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
31use datafusion_physical_plan::projection::ProjectionExec;
32use datafusion_physical_plan::stream::BatchSplitStream;
33use datafusion_physical_plan::{
34 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
35};
36use itertools::Itertools;
37
38use crate::file_scan_config::FileScanConfig;
39use datafusion_common::config::ConfigOptions;
40use datafusion_common::{Constraints, Result, Statistics};
41use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
43use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
44use datafusion_physical_plan::SortOrderPushdownResult;
45use datafusion_physical_plan::filter_pushdown::{
46 ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
47};
48
49/// A source of data, typically a list of files or memory
50///
51/// This trait provides common behaviors for abstract sources of data. It has
52/// two common implementations:
53///
54/// 1. [`FileScanConfig`]: lists of files
55/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
56///
57/// File format specific behaviors are defined by [`FileSource`]
58///
59/// # See Also
60/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
61/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
62///
63/// # Notes
64///
65/// Requires `Debug` to assist debugging
66///
67/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
68/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
69/// [`FileSource`]: crate::file::FileSource
70/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
71/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
72///
73/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
74/// ```text
75/// ┌─────────────────────┐ -----► execute path
76/// │ │ ┄┄┄┄┄► init path
77/// │ DataSourceExec │
78/// │ │
79/// └───────▲─────────────┘
80/// ┊ │
81/// ┊ │
82/// ┌──────────▼──────────┐ ┌──────────-──────────┐
83/// │ │ | |
84/// │ DataSource(trait) │ | TableProvider(trait)|
85/// │ │ | |
86/// └───────▲─────────────┘ └─────────────────────┘
87/// ┊ │ ┊
88/// ┌───────────────┿──┴────────────────┐ ┊
89/// | ┌┄┄┄┄┄┄┄┄┄┄┄┘ | ┊
90/// | ┊ | ┊
91/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┊
92/// │ │ │ │ ┌──────────▼──────────┐
93/// │ FileScanConfig │ │ MemorySourceConfig │ | |
94/// │ │ │ │ | FileFormat(trait) |
95/// └──────────────▲──────┘ └─────────────────────┘ | |
96/// │ ┊ └─────────────────────┘
97/// │ ┊ ┊
98/// │ ┊ ┊
99/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
100/// │ │ │ ArrowSource │
101/// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │
102/// │ │ │ ParquetSource │
103/// └─────────────────────┘ └─────────────────────┘
104/// │
105/// │
106/// │
107/// │
108/// ┌──────────▼──────────┐
109/// │ ArrowSource │
110/// │ ... │
111/// │ ParquetSource │
112/// └─────────────────────┘
113/// |
114/// FileOpener (called by FileStream)
115/// │
116/// ┌──────────▼──────────┐
117/// │ │
118/// │ RecordBatch │
119/// │ │
120/// └─────────────────────┘
121/// ```
122pub trait DataSource: Send + Sync + Debug {
123 fn open(
124 &self,
125 partition: usize,
126 context: Arc<TaskContext>,
127 ) -> Result<SendableRecordBatchStream>;
128 fn as_any(&self) -> &dyn Any;
129 /// Format this source for display in explain plans
130 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
131
132 /// Return a copy of this DataSource with a new partitioning scheme.
133 ///
134 /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
135 /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
136 ///
137 /// Repartitioning should not change the output ordering, if this ordering exists.
138 /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
139 /// and the FileSource's
140 /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
141 /// for examples.
142 fn repartitioned(
143 &self,
144 _target_partitions: usize,
145 _repartition_file_min_size: usize,
146 _output_ordering: Option<LexOrdering>,
147 ) -> Result<Option<Arc<dyn DataSource>>> {
148 Ok(None)
149 }
150
151 fn output_partitioning(&self) -> Partitioning;
152 fn eq_properties(&self) -> EquivalenceProperties;
153 fn scheduling_type(&self) -> SchedulingType {
154 SchedulingType::NonCooperative
155 }
156
157 /// Returns statistics for a specific partition, or aggregate statistics
158 /// across all partitions if `partition` is `None`.
159 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
160
161 /// Return a copy of this DataSource with a new fetch limit
162 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
163 fn fetch(&self) -> Option<usize>;
164 fn metrics(&self) -> ExecutionPlanMetricsSet {
165 ExecutionPlanMetricsSet::new()
166 }
167 fn try_swapping_with_projection(
168 &self,
169 _projection: &ProjectionExprs,
170 ) -> Result<Option<Arc<dyn DataSource>>>;
171
172 /// Try to push down filters into this DataSource.
173 ///
174 /// These filters are in terms of the output schema of this DataSource (e.g.
175 /// [`Self::eq_properties`] and output of any projections pushed into the
176 /// source), not the original table schema.
177 ///
178 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
179 ///
180 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
181 fn try_pushdown_filters(
182 &self,
183 filters: Vec<Arc<dyn PhysicalExpr>>,
184 _config: &ConfigOptions,
185 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
186 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
187 vec![PushedDown::No; filters.len()],
188 ))
189 }
190
191 /// Try to create a new DataSource that produces data in the specified sort order.
192 ///
193 /// # Arguments
194 /// * `order` - The desired output ordering
195 ///
196 /// # Returns
197 /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering
198 /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering
199 /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering
200 /// * `Err(e)` - Error occurred
201 ///
202 /// Default implementation returns `Unsupported`.
203 fn try_pushdown_sort(
204 &self,
205 _order: &[PhysicalSortExpr],
206 ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
207 Ok(SortOrderPushdownResult::Unsupported)
208 }
209
210 /// Returns a variant of this `DataSource` that is aware of order-sensitivity.
211 fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn DataSource>> {
212 None
213 }
214}
215
216/// [`ExecutionPlan`] that reads one or more files
217///
218/// `DataSourceExec` implements common functionality such as applying
219/// projections, and caching plan properties.
220///
221/// The [`DataSource`] describes where to find the data for this data source
222/// (for example in files or what in memory partitions).
223///
224/// For file based [`DataSource`]s, format specific behavior is implemented in
225/// the [`FileSource`] trait.
226///
227/// [`FileSource`]: crate::file::FileSource
228#[derive(Clone, Debug)]
229pub struct DataSourceExec {
230 /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
231 data_source: Arc<dyn DataSource>,
232 /// Cached plan properties such as sort order
233 cache: Arc<PlanProperties>,
234}
235
236impl DisplayAs for DataSourceExec {
237 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
238 match t {
239 DisplayFormatType::Default | DisplayFormatType::Verbose => {
240 write!(f, "DataSourceExec: ")?;
241 }
242 DisplayFormatType::TreeRender => {}
243 }
244 self.data_source.fmt_as(t, f)
245 }
246}
247
248impl ExecutionPlan for DataSourceExec {
249 fn name(&self) -> &'static str {
250 "DataSourceExec"
251 }
252
253 fn as_any(&self) -> &dyn Any {
254 self
255 }
256
257 fn properties(&self) -> &Arc<PlanProperties> {
258 &self.cache
259 }
260
261 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
262 Vec::new()
263 }
264
265 fn with_new_children(
266 self: Arc<Self>,
267 _: Vec<Arc<dyn ExecutionPlan>>,
268 ) -> Result<Arc<dyn ExecutionPlan>> {
269 Ok(self)
270 }
271
272 /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
273 ///
274 /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
275 /// to [`ExecutionPlan::repartitioned`] for more details.
276 fn repartitioned(
277 &self,
278 target_partitions: usize,
279 config: &ConfigOptions,
280 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
281 let data_source = self.data_source.repartitioned(
282 target_partitions,
283 config.optimizer.repartition_file_min_size,
284 self.properties().eq_properties.output_ordering(),
285 )?;
286
287 Ok(data_source.map(|source| {
288 let output_partitioning = source.output_partitioning();
289 let plan = self
290 .clone()
291 .with_data_source(source)
292 // Changing source partitioning may invalidate output partitioning. Update it also
293 .with_partitioning(output_partitioning);
294 Arc::new(plan) as _
295 }))
296 }
297
298 fn execute(
299 &self,
300 partition: usize,
301 context: Arc<TaskContext>,
302 ) -> Result<SendableRecordBatchStream> {
303 let stream = self.data_source.open(partition, Arc::clone(&context))?;
304 let batch_size = context.session_config().batch_size();
305 log::debug!(
306 "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
307 );
308 let metrics = self.data_source.metrics();
309 let split_metrics = SplitMetrics::new(&metrics, partition);
310 Ok(Box::pin(BatchSplitStream::new(
311 stream,
312 batch_size,
313 split_metrics,
314 )))
315 }
316
317 fn metrics(&self) -> Option<MetricsSet> {
318 Some(self.data_source.metrics().clone_inner())
319 }
320
321 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
322 self.data_source.partition_statistics(partition)
323 }
324
325 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
326 let data_source = self.data_source.with_fetch(limit)?;
327 let cache = Arc::clone(&self.cache);
328
329 Some(Arc::new(Self { data_source, cache }))
330 }
331
332 fn fetch(&self) -> Option<usize> {
333 self.data_source.fetch()
334 }
335
336 fn try_swapping_with_projection(
337 &self,
338 projection: &ProjectionExec,
339 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
340 match self
341 .data_source
342 .try_swapping_with_projection(projection.projection_expr())?
343 {
344 Some(new_data_source) => {
345 Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
346 }
347 None => Ok(None),
348 }
349 }
350
351 fn handle_child_pushdown_result(
352 &self,
353 _phase: FilterPushdownPhase,
354 child_pushdown_result: ChildPushdownResult,
355 config: &ConfigOptions,
356 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
357 // Push any remaining filters into our data source
358 let parent_filters = child_pushdown_result
359 .parent_filters
360 .into_iter()
361 .map(|f| f.filter)
362 .collect_vec();
363 let res = self
364 .data_source
365 .try_pushdown_filters(parent_filters, config)?;
366 match res.updated_node {
367 Some(data_source) => {
368 let mut new_node = self.clone();
369 new_node.data_source = data_source;
370 // Re-compute properties since we have new filters which will impact equivalence info
371 new_node.cache =
372 Arc::new(Self::compute_properties(&new_node.data_source));
373
374 Ok(FilterPushdownPropagation {
375 filters: res.filters,
376 updated_node: Some(Arc::new(new_node)),
377 })
378 }
379 None => Ok(FilterPushdownPropagation {
380 filters: res.filters,
381 updated_node: None,
382 }),
383 }
384 }
385
386 fn try_pushdown_sort(
387 &self,
388 order: &[PhysicalSortExpr],
389 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
390 // Delegate to the data source and wrap result with DataSourceExec
391 self.data_source
392 .try_pushdown_sort(order)?
393 .try_map(|new_data_source| {
394 let new_exec = self.clone().with_data_source(new_data_source);
395 Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
396 })
397 }
398
399 fn with_preserve_order(
400 &self,
401 preserve_order: bool,
402 ) -> Option<Arc<dyn ExecutionPlan>> {
403 self.data_source
404 .with_preserve_order(preserve_order)
405 .map(|new_data_source| {
406 Arc::new(self.clone().with_data_source(new_data_source))
407 as Arc<dyn ExecutionPlan>
408 })
409 }
410}
411
412impl DataSourceExec {
413 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
414 Arc::new(Self::new(Arc::new(data_source)))
415 }
416
417 // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
418 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
419 let cache = Self::compute_properties(&data_source);
420 Self {
421 data_source,
422 cache: Arc::new(cache),
423 }
424 }
425
426 /// Return the source object
427 pub fn data_source(&self) -> &Arc<dyn DataSource> {
428 &self.data_source
429 }
430
431 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
432 self.cache = Arc::new(Self::compute_properties(&data_source));
433 self.data_source = data_source;
434 self
435 }
436
437 /// Assign constraints
438 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
439 Arc::make_mut(&mut self.cache).set_constraints(constraints);
440 self
441 }
442
443 /// Assign output partitioning
444 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
445 Arc::make_mut(&mut self.cache).partitioning = partitioning;
446 self
447 }
448
449 fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
450 PlanProperties::new(
451 data_source.eq_properties(),
452 data_source.output_partitioning(),
453 EmissionType::Incremental,
454 Boundedness::Bounded,
455 )
456 .with_scheduling_type(data_source.scheduling_type())
457 }
458
459 /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
460 ///
461 /// Returns `None` if
462 /// 1. the datasource is not scanning files (`FileScanConfig`)
463 /// 2. The [`FileScanConfig::file_source`] is not of type `T`
464 pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
465 self.data_source()
466 .as_any()
467 .downcast_ref::<FileScanConfig>()
468 .and_then(|file_scan_conf| {
469 file_scan_conf
470 .file_source()
471 .as_any()
472 .downcast_ref::<T>()
473 .map(|source| (file_scan_conf, source))
474 })
475 }
476}
477
478/// Create a new `DataSourceExec` from a `DataSource`
479impl<S> From<S> for DataSourceExec
480where
481 S: DataSource + 'static,
482{
483 fn from(source: S) -> Self {
484 Self::new(Arc::new(source))
485 }
486}