datafusion_datasource/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::{Arc, OnceLock};
24
25use datafusion_physical_expr::projection::ProjectionExprs;
26use datafusion_physical_plan::execution_plan::{
27 Boundedness, EmissionType, SchedulingType,
28};
29use datafusion_physical_plan::metrics::SplitMetrics;
30use datafusion_physical_plan::metrics::{
31 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
32};
33use datafusion_physical_plan::projection::ProjectionExec;
34use datafusion_physical_plan::stream::BatchSplitStream;
35use datafusion_physical_plan::{
36 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
37};
38use itertools::Itertools;
39
40use crate::file::FileSource;
41use crate::file_scan_config::FileScanConfig;
42use datafusion_common::config::ConfigOptions;
43use datafusion_common::{Constraints, Result, Statistics};
44use datafusion_execution::{SendableRecordBatchStream, TaskContext};
45use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
46use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
47use datafusion_physical_plan::SortOrderPushdownResult;
48use datafusion_physical_plan::filter_pushdown::{
49 ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
50};
51
52/// A source of data, typically a list of files or memory
53///
54/// This trait provides common behaviors for abstract sources of data. It has
55/// two common implementations:
56///
57/// 1. [`FileScanConfig`]: lists of files
58/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
59///
60/// File format specific behaviors are defined by [`FileSource`]
61///
62/// # See Also
63/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
64/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
65///
66/// # Notes
67///
68/// Requires `Debug` to assist debugging
69///
70/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
71/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
72/// [`FileSource`]: crate::file::FileSource
73/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
74/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
75///
76/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
77/// ```text
78/// ┌─────────────────────┐ -----► execute path
79/// │ │ ┄┄┄┄┄► init path
80/// │ DataSourceExec │
81/// │ │
82/// └───────▲─────────────┘
83/// ┊ │
84/// ┊ │
85/// ┌──────────▼──────────┐ ┌──────────-──────────┐
86/// │ │ | |
87/// │ DataSource(trait) │ | TableProvider(trait)|
88/// │ │ | |
89/// └───────▲─────────────┘ └─────────────────────┘
90/// ┊ │ ┊
91/// ┌───────────────┿──┴────────────────┐ ┊
92/// | ┌┄┄┄┄┄┄┄┄┄┄┄┘ | ┊
93/// | ┊ | ┊
94/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┊
95/// │ │ │ │ ┌──────────▼──────────┐
96/// │ FileScanConfig │ │ MemorySourceConfig │ | |
97/// │ │ │ │ | FileFormat(trait) |
98/// └──────────────▲──────┘ └─────────────────────┘ | |
99/// │ ┊ └─────────────────────┘
100/// │ ┊ ┊
101/// │ ┊ ┊
102/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
103/// │ │ │ ArrowSource │
104/// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │
105/// │ │ │ ParquetSource │
106/// └─────────────────────┘ └─────────────────────┘
107/// │
108/// │
109/// │
110/// │
111/// ┌──────────▼──────────┐
112/// │ ArrowSource │
113/// │ ... │
114/// │ ParquetSource │
115/// └─────────────────────┘
116/// |
117/// FileOpener (called by FileStream)
118/// │
119/// ┌──────────▼──────────┐
120/// │ │
121/// │ RecordBatch │
122/// │ │
123/// └─────────────────────┘
124/// ```
125pub trait DataSource: Any + Send + Sync + Debug {
126 /// Open the specified output partition and return its stream of
127 /// [`RecordBatch`]es.
128 ///
129 /// This should be used by data sources that do not need any sibling
130 /// coordination. Data sources that want to use per-execution shared state
131 /// (for example, to reorder work across partitions at runtime) should
132 /// implement [`Self::open_with_args`] instead.
133 ///
134 /// [`RecordBatch`]: arrow::record_batch::RecordBatch
135 fn open(
136 &self,
137 partition: usize,
138 context: Arc<TaskContext>,
139 ) -> Result<SendableRecordBatchStream>;
140
141 /// Format this source for display in explain plans
142 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
143
144 /// Return a copy of this DataSource with a new partitioning scheme.
145 ///
146 /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
147 /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
148 ///
149 /// Repartitioning should not change the output ordering, if this ordering exists.
150 /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
151 /// and the FileSource's
152 /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
153 /// for examples.
154 fn repartitioned(
155 &self,
156 _target_partitions: usize,
157 _repartition_file_min_size: usize,
158 _output_ordering: Option<LexOrdering>,
159 ) -> Result<Option<Arc<dyn DataSource>>> {
160 Ok(None)
161 }
162
163 fn output_partitioning(&self) -> Partitioning;
164 fn eq_properties(&self) -> EquivalenceProperties;
165 fn scheduling_type(&self) -> SchedulingType {
166 SchedulingType::NonCooperative
167 }
168
169 /// Returns statistics for a specific partition, or aggregate statistics
170 /// across all partitions if `partition` is `None`.
171 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>>;
172
173 /// Return a copy of this DataSource with a new fetch limit
174 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
175 fn fetch(&self) -> Option<usize>;
176 fn metrics(&self) -> ExecutionPlanMetricsSet {
177 ExecutionPlanMetricsSet::new()
178 }
179 fn try_swapping_with_projection(
180 &self,
181 _projection: &ProjectionExprs,
182 ) -> Result<Option<Arc<dyn DataSource>>>;
183
184 /// Try to push down filters into this DataSource.
185 ///
186 /// These filters are in terms of the output schema of this DataSource (e.g.
187 /// [`Self::eq_properties`] and output of any projections pushed into the
188 /// source), not the original table schema.
189 ///
190 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
191 ///
192 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
193 fn try_pushdown_filters(
194 &self,
195 filters: Vec<Arc<dyn PhysicalExpr>>,
196 _config: &ConfigOptions,
197 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
198 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
199 vec![PushedDown::No; filters.len()],
200 ))
201 }
202
203 /// Try to create a new DataSource that produces data in the specified sort order.
204 ///
205 /// # Arguments
206 /// * `order` - The desired output ordering
207 ///
208 /// # Returns
209 /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering
210 /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering
211 /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering
212 /// * `Err(e)` - Error occurred
213 ///
214 /// Default implementation returns `Unsupported`.
215 fn try_pushdown_sort(
216 &self,
217 _order: &[PhysicalSortExpr],
218 ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
219 Ok(SortOrderPushdownResult::Unsupported)
220 }
221
222 /// Returns a variant of this `DataSource` that is aware of order-sensitivity.
223 fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn DataSource>> {
224 None
225 }
226
227 /// Injects arbitrary run-time state into this DataSource, returning a new instance
228 /// that incorporates that state *if* it is relevant to the concrete DataSource implementation.
229 ///
230 /// This is a generic entry point: the `state` can be any type wrapped in
231 /// `Arc<dyn Any + Send + Sync>`. A data source that cares about the state should
232 /// down-cast it to the concrete type it expects and, if successful, return a
233 /// modified copy of itself that captures the provided value. If the state is
234 /// not applicable, the default behaviour is to return `None` so that parent
235 /// nodes can continue propagating the attempt further down the plan tree.
236 fn with_new_state(
237 &self,
238 _state: Arc<dyn Any + Send + Sync>,
239 ) -> Option<Arc<dyn DataSource>> {
240 None
241 }
242
243 /// Create per execution state to share across sibling instances of this
244 /// data source during one execution.
245 ///
246 /// Returns `None` (the default) if this data source has
247 /// no sibling-shared execution state.
248 fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
249 None
250 }
251
252 /// Open a partition using optional sibling-shared execution state.
253 ///
254 /// The default implementation ignores the additional state and delegates to
255 /// [`Self::open`].
256 fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> {
257 self.open(args.partition, args.context)
258 }
259}
260
261/// Arguments for [`DataSource::open_with_args`]
262#[derive(Debug, Clone)]
263pub struct OpenArgs {
264 /// Which partition to open
265 pub partition: usize,
266 /// The task context for execution
267 pub context: Arc<TaskContext>,
268 /// Optional sibling-shared execution state, see
269 /// [`DataSource::create_sibling_state`] for details.
270 pub sibling_state: Option<Arc<dyn Any + Send + Sync>>,
271}
272
273impl OpenArgs {
274 /// Create a new OpenArgs with required arguments
275 pub fn new(partition: usize, context: Arc<TaskContext>) -> Self {
276 Self {
277 partition,
278 context,
279 sibling_state: None,
280 }
281 }
282
283 /// Set sibling shared state
284 pub fn with_shared_state(
285 mut self,
286 sibling_state: Option<Arc<dyn Any + Send + Sync>>,
287 ) -> Self {
288 self.sibling_state = sibling_state;
289 self
290 }
291}
292
293impl dyn DataSource {
294 pub fn is<T: DataSource>(&self) -> bool {
295 (self as &dyn Any).is::<T>()
296 }
297
298 pub fn downcast_ref<T: DataSource>(&self) -> Option<&T> {
299 (self as &dyn Any).downcast_ref()
300 }
301}
302
303/// [`ExecutionPlan`] that reads one or more files
304///
305/// `DataSourceExec` implements common functionality such as applying
306/// projections, and caching plan properties.
307///
308/// The [`DataSource`] describes where to find the data for this data source
309/// (for example in files or what in memory partitions).
310///
311/// For file based [`DataSource`]s, format specific behavior is implemented in
312/// the [`FileSource`] trait.
313///
314/// [`FileSource`]: crate::file::FileSource
315#[derive(Clone, Debug)]
316pub struct DataSourceExec {
317 /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
318 data_source: Arc<dyn DataSource>,
319 /// Cached plan properties such as sort order
320 cache: Arc<PlanProperties>,
321 /// Per execution state shared across partitions of this plan.
322 ///
323 /// Created by [`DataSource::create_sibling_state`]
324 /// and then passed to
325 /// [`DataSource::open_with_args`].
326 execution_state: Arc<OnceLock<Option<Arc<dyn Any + Send + Sync>>>>,
327}
328
329impl DisplayAs for DataSourceExec {
330 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
331 match t {
332 DisplayFormatType::Default | DisplayFormatType::Verbose => {
333 write!(f, "DataSourceExec: ")?;
334 }
335 DisplayFormatType::TreeRender => {}
336 }
337 self.data_source.fmt_as(t, f)
338 }
339}
340
341impl ExecutionPlan for DataSourceExec {
342 fn name(&self) -> &'static str {
343 "DataSourceExec"
344 }
345
346 fn properties(&self) -> &Arc<PlanProperties> {
347 &self.cache
348 }
349
350 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
351 Vec::new()
352 }
353
354 fn with_new_children(
355 self: Arc<Self>,
356 _: Vec<Arc<dyn ExecutionPlan>>,
357 ) -> Result<Arc<dyn ExecutionPlan>> {
358 Ok(self)
359 }
360
361 /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
362 ///
363 /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
364 /// to [`ExecutionPlan::repartitioned`] for more details.
365 fn repartitioned(
366 &self,
367 target_partitions: usize,
368 config: &ConfigOptions,
369 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
370 let data_source = self.data_source.repartitioned(
371 target_partitions,
372 config.optimizer.repartition_file_min_size,
373 self.properties().eq_properties.output_ordering(),
374 )?;
375
376 Ok(data_source.map(|source| {
377 let output_partitioning = source.output_partitioning();
378 let plan = self
379 .clone()
380 .with_data_source(source)
381 // Changing source partitioning may invalidate output partitioning. Update it also
382 .with_partitioning(output_partitioning);
383 Arc::new(plan) as _
384 }))
385 }
386
387 fn execute(
388 &self,
389 partition: usize,
390 context: Arc<TaskContext>,
391 ) -> Result<SendableRecordBatchStream> {
392 let shared_state = self
393 .execution_state
394 .get_or_init(|| self.data_source.create_sibling_state())
395 .clone();
396 let args = OpenArgs::new(partition, Arc::clone(&context))
397 .with_shared_state(shared_state);
398 let stream = self.data_source.open_with_args(args)?;
399 let batch_size = context.session_config().batch_size();
400
401 log::debug!(
402 "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
403 );
404 let metrics = self.data_source.metrics();
405 let split_metrics = SplitMetrics::new(&metrics, partition);
406 Ok(Box::pin(BatchSplitStream::new(
407 stream,
408 batch_size,
409 split_metrics,
410 )))
411 }
412
413 fn metrics(&self) -> Option<MetricsSet> {
414 let mut metrics = self.data_source.metrics().clone_inner();
415
416 // Add `output_rows_skew` metric to the metrics set.
417 // Done here because it's a derived metric from output_rows metric.
418 if let Some(file_scan_config) = self.data_source.downcast_ref::<FileScanConfig>()
419 && file_scan_config.file_source().file_type() == "parquet"
420 && let Some(output_rows_skew) =
421 BaselineMetrics::output_rows_skew_metric(&metrics)
422 {
423 metrics.push(output_rows_skew);
424 }
425
426 Some(metrics)
427 }
428
429 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
430 self.data_source.partition_statistics(partition)
431 }
432
433 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
434 let data_source = self.data_source.with_fetch(limit)?;
435 let cache = Arc::clone(&self.cache);
436 let execution_state = Arc::new(OnceLock::new());
437
438 Some(Arc::new(Self {
439 data_source,
440 cache,
441 execution_state,
442 }))
443 }
444
445 fn fetch(&self) -> Option<usize> {
446 self.data_source.fetch()
447 }
448
449 fn try_swapping_with_projection(
450 &self,
451 projection: &ProjectionExec,
452 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
453 match self
454 .data_source
455 .try_swapping_with_projection(projection.projection_expr())?
456 {
457 Some(new_data_source) => {
458 Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
459 }
460 None => Ok(None),
461 }
462 }
463
464 fn handle_child_pushdown_result(
465 &self,
466 _phase: FilterPushdownPhase,
467 child_pushdown_result: ChildPushdownResult,
468 config: &ConfigOptions,
469 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
470 // Push any remaining filters into our data source
471 let parent_filters = child_pushdown_result
472 .parent_filters
473 .into_iter()
474 .map(|f| f.filter)
475 .collect_vec();
476 let res = self
477 .data_source
478 .try_pushdown_filters(parent_filters, config)?;
479 match res.updated_node {
480 Some(data_source) => {
481 let mut new_node = self.clone();
482 new_node.data_source = data_source;
483 // Re-compute properties since we have new filters which will impact equivalence info
484 new_node.cache =
485 Arc::new(Self::compute_properties(&new_node.data_source));
486
487 Ok(FilterPushdownPropagation {
488 filters: res.filters,
489 updated_node: Some(Arc::new(new_node)),
490 })
491 }
492 None => Ok(FilterPushdownPropagation {
493 filters: res.filters,
494 updated_node: None,
495 }),
496 }
497 }
498
499 fn try_pushdown_sort(
500 &self,
501 order: &[PhysicalSortExpr],
502 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
503 // Delegate to the data source and wrap result with DataSourceExec
504 self.data_source
505 .try_pushdown_sort(order)?
506 .try_map(|new_data_source| {
507 let new_exec = self.clone().with_data_source(new_data_source);
508 Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
509 })
510 }
511
512 fn with_preserve_order(
513 &self,
514 preserve_order: bool,
515 ) -> Option<Arc<dyn ExecutionPlan>> {
516 self.data_source
517 .with_preserve_order(preserve_order)
518 .map(|new_data_source| {
519 Arc::new(self.clone().with_data_source(new_data_source))
520 as Arc<dyn ExecutionPlan>
521 })
522 }
523
524 fn with_new_state(
525 &self,
526 state: Arc<dyn Any + Send + Sync>,
527 ) -> Option<Arc<dyn ExecutionPlan>> {
528 self.data_source
529 .with_new_state(state)
530 .map(|new_data_source| {
531 Arc::new(self.clone().with_data_source(new_data_source))
532 as Arc<dyn ExecutionPlan>
533 })
534 }
535
536 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
537 let mut new_exec = Arc::unwrap_or_clone(self);
538 new_exec.execution_state = Arc::new(OnceLock::new());
539 Ok(Arc::new(new_exec))
540 }
541}
542
543impl DataSourceExec {
544 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
545 Arc::new(Self::new(Arc::new(data_source)))
546 }
547
548 // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
549 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
550 let cache = Self::compute_properties(&data_source);
551 Self {
552 data_source,
553 cache: Arc::new(cache),
554 execution_state: Arc::new(OnceLock::new()),
555 }
556 }
557
558 /// Return the source object
559 pub fn data_source(&self) -> &Arc<dyn DataSource> {
560 &self.data_source
561 }
562
563 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
564 self.cache = Arc::new(Self::compute_properties(&data_source));
565 self.data_source = data_source;
566 self.execution_state = Arc::new(OnceLock::new());
567 self
568 }
569
570 /// Assign constraints
571 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
572 Arc::make_mut(&mut self.cache).set_constraints(constraints);
573 self
574 }
575
576 /// Assign output partitioning
577 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
578 Arc::make_mut(&mut self.cache).partitioning = partitioning;
579 self
580 }
581
582 fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
583 PlanProperties::new(
584 data_source.eq_properties(),
585 data_source.output_partitioning(),
586 EmissionType::Incremental,
587 Boundedness::Bounded,
588 )
589 .with_scheduling_type(data_source.scheduling_type())
590 }
591
592 /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
593 ///
594 /// Returns `None` if
595 /// 1. the datasource is not scanning files (`FileScanConfig`)
596 /// 2. The [`FileScanConfig::file_source`] is not of type `T`
597 pub fn downcast_to_file_source<T: FileSource>(
598 &self,
599 ) -> Option<(&FileScanConfig, &T)> {
600 self.data_source()
601 .downcast_ref::<FileScanConfig>()
602 .and_then(|file_scan_conf| {
603 file_scan_conf
604 .file_source()
605 .downcast_ref::<T>()
606 .map(|source| (file_scan_conf, source))
607 })
608 }
609}
610
611/// Create a new `DataSourceExec` from a `DataSource`
612impl<S> From<S> for DataSourceExec
613where
614 S: DataSource + 'static,
615{
616 fn from(source: S) -> Self {
617 Self::new(Arc::new(source))
618 }
619}