datafusion_datasource/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{
26 Boundedness, EmissionType, SchedulingType,
27};
28use datafusion_physical_plan::metrics::SplitMetrics;
29use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
30use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
31use datafusion_physical_plan::stream::BatchSplitStream;
32use datafusion_physical_plan::{
33 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use itertools::Itertools;
36
37use crate::file_scan_config::FileScanConfig;
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{Constraints, Result, Statistics};
40use datafusion_execution::{SendableRecordBatchStream, TaskContext};
41use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
42use datafusion_physical_expr_common::sort_expr::LexOrdering;
43use datafusion_physical_plan::filter_pushdown::{
44 ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
45};
46
47/// A source of data, typically a list of files or memory
48///
49/// This trait provides common behaviors for abstract sources of data. It has
50/// two common implementations:
51///
52/// 1. [`FileScanConfig`]: lists of files
53/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
54///
55/// File format specific behaviors are defined by [`FileSource`]
56///
57/// # See Also
58/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
59/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
60///
61/// # Notes
62///
63/// Requires `Debug` to assist debugging
64///
65/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
66/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
67/// [`FileSource`]: crate::file::FileSource
68/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
69/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
70///
71/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
72/// ```text
73/// ┌─────────────────────┐ -----► execute path
74/// │ │ ┄┄┄┄┄► init path
75/// │ DataSourceExec │
76/// │ │
77/// └───────▲─────────────┘
78/// ┊ │
79/// ┊ │
80/// ┌──────────▼──────────┐ ┌──────────-──────────┐
81/// │ │ | |
82/// │ DataSource(trait) │ | TableProvider(trait)|
83/// │ │ | |
84/// └───────▲─────────────┘ └─────────────────────┘
85/// ┊ │ ┊
86/// ┌───────────────┿──┴────────────────┐ ┊
87/// | ┌┄┄┄┄┄┄┄┄┄┄┄┘ | ┊
88/// | ┊ | ┊
89/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┊
90/// │ │ │ │ ┌──────────▼──────────┐
91/// │ FileScanConfig │ │ MemorySourceConfig │ | |
92/// │ │ │ │ | FileFormat(trait) |
93/// └──────────────▲──────┘ └─────────────────────┘ | |
94/// │ ┊ └─────────────────────┘
95/// │ ┊ ┊
96/// │ ┊ ┊
97/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
98/// │ │ │ ArrowSource │
99/// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │
100/// │ │ │ ParquetSource │
101/// └─────────────────────┘ └─────────────────────┘
102/// │
103/// │
104/// │
105/// │
106/// ┌──────────▼──────────┐
107/// │ ArrowSource │
108/// │ ... │
109/// │ ParquetSource │
110/// └─────────────────────┘
111/// |
112/// FileOpener (called by FileStream)
113/// │
114/// ┌──────────▼──────────┐
115/// │ │
116/// │ RecordBatch │
117/// │ │
118/// └─────────────────────┘
119/// ```
120pub trait DataSource: Send + Sync + Debug {
121 fn open(
122 &self,
123 partition: usize,
124 context: Arc<TaskContext>,
125 ) -> Result<SendableRecordBatchStream>;
126 fn as_any(&self) -> &dyn Any;
127 /// Format this source for display in explain plans
128 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
129
130 /// Return a copy of this DataSource with a new partitioning scheme.
131 ///
132 /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
133 /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
134 ///
135 /// Repartitioning should not change the output ordering, if this ordering exists.
136 /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
137 /// and the FileSource's
138 /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
139 /// for examples.
140 fn repartitioned(
141 &self,
142 _target_partitions: usize,
143 _repartition_file_min_size: usize,
144 _output_ordering: Option<LexOrdering>,
145 ) -> Result<Option<Arc<dyn DataSource>>> {
146 Ok(None)
147 }
148
149 fn output_partitioning(&self) -> Partitioning;
150 fn eq_properties(&self) -> EquivalenceProperties;
151 fn scheduling_type(&self) -> SchedulingType {
152 SchedulingType::NonCooperative
153 }
154
155 /// Returns statistics for a specific partition, or aggregate statistics
156 /// across all partitions if `partition` is `None`.
157 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
158
159 /// Returns aggregate statistics across all partitions.
160 ///
161 /// # Deprecated
162 /// Use [`Self::partition_statistics`] instead, which provides more fine-grained
163 /// control over statistics retrieval (per-partition or aggregate).
164 #[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
165 fn statistics(&self) -> Result<Statistics> {
166 self.partition_statistics(None)
167 }
168
169 /// Return a copy of this DataSource with a new fetch limit
170 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
171 fn fetch(&self) -> Option<usize>;
172 fn metrics(&self) -> ExecutionPlanMetricsSet {
173 ExecutionPlanMetricsSet::new()
174 }
175 fn try_swapping_with_projection(
176 &self,
177 _projection: &[ProjectionExpr],
178 ) -> Result<Option<Arc<dyn DataSource>>>;
179 /// Try to push down filters into this DataSource.
180 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
181 ///
182 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
183 fn try_pushdown_filters(
184 &self,
185 filters: Vec<Arc<dyn PhysicalExpr>>,
186 _config: &ConfigOptions,
187 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
188 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
189 vec![PushedDown::No; filters.len()],
190 ))
191 }
192}
193
194/// [`ExecutionPlan`] that reads one or more files
195///
196/// `DataSourceExec` implements common functionality such as applying
197/// projections, and caching plan properties.
198///
199/// The [`DataSource`] describes where to find the data for this data source
200/// (for example in files or what in memory partitions).
201///
202/// For file based [`DataSource`]s, format specific behavior is implemented in
203/// the [`FileSource`] trait.
204///
205/// [`FileSource`]: crate::file::FileSource
206#[derive(Clone, Debug)]
207pub struct DataSourceExec {
208 /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
209 data_source: Arc<dyn DataSource>,
210 /// Cached plan properties such as sort order
211 cache: PlanProperties,
212}
213
214impl DisplayAs for DataSourceExec {
215 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
216 match t {
217 DisplayFormatType::Default | DisplayFormatType::Verbose => {
218 write!(f, "DataSourceExec: ")?;
219 }
220 DisplayFormatType::TreeRender => {}
221 }
222 self.data_source.fmt_as(t, f)
223 }
224}
225
226impl ExecutionPlan for DataSourceExec {
227 fn name(&self) -> &'static str {
228 "DataSourceExec"
229 }
230
231 fn as_any(&self) -> &dyn Any {
232 self
233 }
234
235 fn properties(&self) -> &PlanProperties {
236 &self.cache
237 }
238
239 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
240 Vec::new()
241 }
242
243 fn with_new_children(
244 self: Arc<Self>,
245 _: Vec<Arc<dyn ExecutionPlan>>,
246 ) -> Result<Arc<dyn ExecutionPlan>> {
247 Ok(self)
248 }
249
250 /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
251 ///
252 /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
253 /// to [`ExecutionPlan::repartitioned`] for more details.
254 fn repartitioned(
255 &self,
256 target_partitions: usize,
257 config: &ConfigOptions,
258 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
259 let data_source = self.data_source.repartitioned(
260 target_partitions,
261 config.optimizer.repartition_file_min_size,
262 self.properties().eq_properties.output_ordering(),
263 )?;
264
265 if let Some(source) = data_source {
266 let output_partitioning = source.output_partitioning();
267 let plan = self
268 .clone()
269 .with_data_source(source)
270 // Changing source partitioning may invalidate output partitioning. Update it also
271 .with_partitioning(output_partitioning);
272 Ok(Some(Arc::new(plan)))
273 } else {
274 Ok(Some(Arc::new(self.clone())))
275 }
276 }
277
278 fn execute(
279 &self,
280 partition: usize,
281 context: Arc<TaskContext>,
282 ) -> Result<SendableRecordBatchStream> {
283 let stream = self.data_source.open(partition, Arc::clone(&context))?;
284 let batch_size = context.session_config().batch_size();
285 log::debug!(
286 "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
287 );
288 let metrics = self.data_source.metrics();
289 let split_metrics = SplitMetrics::new(&metrics, partition);
290 Ok(Box::pin(BatchSplitStream::new(
291 stream,
292 batch_size,
293 split_metrics,
294 )))
295 }
296
297 fn metrics(&self) -> Option<MetricsSet> {
298 Some(self.data_source.metrics().clone_inner())
299 }
300
301 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
302 self.data_source.partition_statistics(partition)
303 }
304
305 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
306 let data_source = self.data_source.with_fetch(limit)?;
307 let cache = self.cache.clone();
308
309 Some(Arc::new(Self { data_source, cache }))
310 }
311
312 fn fetch(&self) -> Option<usize> {
313 self.data_source.fetch()
314 }
315
316 fn try_swapping_with_projection(
317 &self,
318 projection: &ProjectionExec,
319 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
320 match self
321 .data_source
322 .try_swapping_with_projection(projection.expr())?
323 {
324 Some(new_data_source) => {
325 Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
326 }
327 None => Ok(None),
328 }
329 }
330
331 fn handle_child_pushdown_result(
332 &self,
333 _phase: FilterPushdownPhase,
334 child_pushdown_result: ChildPushdownResult,
335 config: &ConfigOptions,
336 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
337 // Push any remaining filters into our data source
338 let parent_filters = child_pushdown_result
339 .parent_filters
340 .into_iter()
341 .map(|f| f.filter)
342 .collect_vec();
343 let res = self
344 .data_source
345 .try_pushdown_filters(parent_filters.clone(), config)?;
346 match res.updated_node {
347 Some(data_source) => {
348 let mut new_node = self.clone();
349 new_node.data_source = data_source;
350 // Re-compute properties since we have new filters which will impact equivalence info
351 new_node.cache =
352 Self::compute_properties(Arc::clone(&new_node.data_source));
353
354 Ok(FilterPushdownPropagation {
355 filters: res.filters,
356 updated_node: Some(Arc::new(new_node)),
357 })
358 }
359 None => Ok(FilterPushdownPropagation {
360 filters: res.filters,
361 updated_node: None,
362 }),
363 }
364 }
365}
366
367impl DataSourceExec {
368 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
369 Arc::new(Self::new(Arc::new(data_source)))
370 }
371
372 // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
373 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
374 let cache = Self::compute_properties(Arc::clone(&data_source));
375 Self { data_source, cache }
376 }
377
378 /// Return the source object
379 pub fn data_source(&self) -> &Arc<dyn DataSource> {
380 &self.data_source
381 }
382
383 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
384 self.cache = Self::compute_properties(Arc::clone(&data_source));
385 self.data_source = data_source;
386 self
387 }
388
389 /// Assign constraints
390 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
391 self.cache = self.cache.with_constraints(constraints);
392 self
393 }
394
395 /// Assign output partitioning
396 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
397 self.cache = self.cache.with_partitioning(partitioning);
398 self
399 }
400
401 fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
402 PlanProperties::new(
403 data_source.eq_properties(),
404 data_source.output_partitioning(),
405 EmissionType::Incremental,
406 Boundedness::Bounded,
407 )
408 .with_scheduling_type(data_source.scheduling_type())
409 }
410
411 /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
412 ///
413 /// Returns `None` if
414 /// 1. the datasource is not scanning files (`FileScanConfig`)
415 /// 2. The [`FileScanConfig::file_source`] is not of type `T`
416 pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
417 self.data_source()
418 .as_any()
419 .downcast_ref::<FileScanConfig>()
420 .and_then(|file_scan_conf| {
421 file_scan_conf
422 .file_source()
423 .as_any()
424 .downcast_ref::<T>()
425 .map(|source| (file_scan_conf, source))
426 })
427 }
428}
429
430/// Create a new `DataSourceExec` from a `DataSource`
431impl<S> From<S> for DataSourceExec
432where
433 S: DataSource + 'static,
434{
435 fn from(source: S) -> Self {
436 Self::new(Arc::new(source))
437 }
438}