datafusion_datasource/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataSource`] and [`DataSourceExec`]
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{
26 Boundedness, EmissionType, SchedulingType,
27};
28use datafusion_physical_plan::metrics::SplitMetrics;
29use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
30use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
31use datafusion_physical_plan::stream::BatchSplitStream;
32use datafusion_physical_plan::{
33 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
34};
35use itertools::Itertools;
36
37use crate::file_scan_config::FileScanConfig;
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{Constraints, Result, Statistics};
40use datafusion_execution::{SendableRecordBatchStream, TaskContext};
41use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
42use datafusion_physical_expr_common::sort_expr::LexOrdering;
43use datafusion_physical_plan::filter_pushdown::{
44 ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
45};
46
47/// A source of data, typically a list of files or memory
48///
49/// This trait provides common behaviors for abstract sources of data. It has
50/// two common implementations:
51///
52/// 1. [`FileScanConfig`]: lists of files
53/// 2. [`MemorySourceConfig`]: in memory list of `RecordBatch`
54///
55/// File format specific behaviors are defined by [`FileSource`]
56///
57/// # See Also
58/// * [`FileSource`] for file format specific implementations (Parquet, Json, etc)
59/// * [`DataSourceExec`]: The [`ExecutionPlan`] that reads from a `DataSource`
60///
61/// # Notes
62///
63/// Requires `Debug` to assist debugging
64///
65/// [`FileScanConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html
66/// [`MemorySourceConfig`]: https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemorySourceConfig.html
67/// [`FileSource`]: crate::file::FileSource
68/// [`FileFormat``]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/index.html
69/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
70///
71/// The following diagram shows how DataSource, FileSource, and DataSourceExec are related
72/// ```text
73/// ┌─────────────────────┐ -----► execute path
74/// │ │ ┄┄┄┄┄► init path
75/// │ DataSourceExec │
76/// │ │
77/// └───────▲─────────────┘
78/// ┊ │
79/// ┊ │
80/// ┌──────────▼──────────┐ ┌──────────-──────────┐
81/// │ │ | |
82/// │ DataSource(trait) │ | TableProvider(trait)|
83/// │ │ | |
84/// └───────▲─────────────┘ └─────────────────────┘
85/// ┊ │ ┊
86/// ┌───────────────┿──┴────────────────┐ ┊
87/// | ┌┄┄┄┄┄┄┄┄┄┄┄┘ | ┊
88/// | ┊ | ┊
89/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┊
90/// │ │ │ │ ┌──────────▼──────────┐
91/// │ FileScanConfig │ │ MemorySourceConfig │ | |
92/// │ │ │ │ | FileFormat(trait) |
93/// └──────────────▲──────┘ └─────────────────────┘ | |
94/// │ ┊ └─────────────────────┘
95/// │ ┊ ┊
96/// │ ┊ ┊
97/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
98/// │ │ │ ArrowSource │
99/// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │
100/// │ │ │ ParquetSource │
101/// └─────────────────────┘ └─────────────────────┘
102/// │
103/// │
104/// │
105/// │
106/// ┌──────────▼──────────┐
107/// │ ArrowSource │
108/// │ ... │
109/// │ ParquetSource │
110/// └─────────────────────┘
111/// |
112/// FileOpener (called by FileStream)
113/// │
114/// ┌──────────▼──────────┐
115/// │ │
116/// │ RecordBatch │
117/// │ │
118/// └─────────────────────┘
119/// ```
120pub trait DataSource: Send + Sync + Debug {
121 fn open(
122 &self,
123 partition: usize,
124 context: Arc<TaskContext>,
125 ) -> Result<SendableRecordBatchStream>;
126 fn as_any(&self) -> &dyn Any;
127 /// Format this source for display in explain plans
128 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
129
130 /// Return a copy of this DataSource with a new partitioning scheme.
131 ///
132 /// Returns `Ok(None)` (the default) if the partitioning cannot be changed.
133 /// Refer to [`ExecutionPlan::repartitioned`] for details on when None should be returned.
134 ///
135 /// Repartitioning should not change the output ordering, if this ordering exists.
136 /// Refer to [`MemorySourceConfig::repartition_preserving_order`](crate::memory::MemorySourceConfig)
137 /// and the FileSource's
138 /// [`FileGroupPartitioner::repartition_file_groups`](crate::file_groups::FileGroupPartitioner::repartition_file_groups)
139 /// for examples.
140 fn repartitioned(
141 &self,
142 _target_partitions: usize,
143 _repartition_file_min_size: usize,
144 _output_ordering: Option<LexOrdering>,
145 ) -> Result<Option<Arc<dyn DataSource>>> {
146 Ok(None)
147 }
148
149 fn output_partitioning(&self) -> Partitioning;
150 fn eq_properties(&self) -> EquivalenceProperties;
151 fn scheduling_type(&self) -> SchedulingType {
152 SchedulingType::NonCooperative
153 }
154 fn statistics(&self) -> Result<Statistics>;
155 /// Return a copy of this DataSource with a new fetch limit
156 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
157 fn fetch(&self) -> Option<usize>;
158 fn metrics(&self) -> ExecutionPlanMetricsSet {
159 ExecutionPlanMetricsSet::new()
160 }
161 fn try_swapping_with_projection(
162 &self,
163 _projection: &[ProjectionExpr],
164 ) -> Result<Option<Arc<dyn DataSource>>>;
165 /// Try to push down filters into this DataSource.
166 /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
167 ///
168 /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
169 fn try_pushdown_filters(
170 &self,
171 filters: Vec<Arc<dyn PhysicalExpr>>,
172 _config: &ConfigOptions,
173 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
174 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
175 vec![PushedDown::No; filters.len()],
176 ))
177 }
178}
179
180/// [`ExecutionPlan`] that reads one or more files
181///
182/// `DataSourceExec` implements common functionality such as applying
183/// projections, and caching plan properties.
184///
185/// The [`DataSource`] describes where to find the data for this data source
186/// (for example in files or what in memory partitions).
187///
188/// For file based [`DataSource`]s, format specific behavior is implemented in
189/// the [`FileSource`] trait.
190///
191/// [`FileSource`]: crate::file::FileSource
192#[derive(Clone, Debug)]
193pub struct DataSourceExec {
194 /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig`
195 data_source: Arc<dyn DataSource>,
196 /// Cached plan properties such as sort order
197 cache: PlanProperties,
198}
199
200impl DisplayAs for DataSourceExec {
201 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
202 match t {
203 DisplayFormatType::Default | DisplayFormatType::Verbose => {
204 write!(f, "DataSourceExec: ")?;
205 }
206 DisplayFormatType::TreeRender => {}
207 }
208 self.data_source.fmt_as(t, f)
209 }
210}
211
212impl ExecutionPlan for DataSourceExec {
213 fn name(&self) -> &'static str {
214 "DataSourceExec"
215 }
216
217 fn as_any(&self) -> &dyn Any {
218 self
219 }
220
221 fn properties(&self) -> &PlanProperties {
222 &self.cache
223 }
224
225 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
226 Vec::new()
227 }
228
229 fn with_new_children(
230 self: Arc<Self>,
231 _: Vec<Arc<dyn ExecutionPlan>>,
232 ) -> Result<Arc<dyn ExecutionPlan>> {
233 Ok(self)
234 }
235
236 /// Implementation of [`ExecutionPlan::repartitioned`] which relies upon the inner [`DataSource::repartitioned`].
237 ///
238 /// If the data source does not support changing its partitioning, returns `Ok(None)` (the default). Refer
239 /// to [`ExecutionPlan::repartitioned`] for more details.
240 fn repartitioned(
241 &self,
242 target_partitions: usize,
243 config: &ConfigOptions,
244 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
245 let data_source = self.data_source.repartitioned(
246 target_partitions,
247 config.optimizer.repartition_file_min_size,
248 self.properties().eq_properties.output_ordering(),
249 )?;
250
251 if let Some(source) = data_source {
252 let output_partitioning = source.output_partitioning();
253 let plan = self
254 .clone()
255 .with_data_source(source)
256 // Changing source partitioning may invalidate output partitioning. Update it also
257 .with_partitioning(output_partitioning);
258 Ok(Some(Arc::new(plan)))
259 } else {
260 Ok(Some(Arc::new(self.clone())))
261 }
262 }
263
264 fn execute(
265 &self,
266 partition: usize,
267 context: Arc<TaskContext>,
268 ) -> Result<SendableRecordBatchStream> {
269 let stream = self.data_source.open(partition, Arc::clone(&context))?;
270 let batch_size = context.session_config().batch_size();
271 log::debug!(
272 "Batch splitting enabled for partition {partition}: batch_size={batch_size}"
273 );
274 let metrics = self.data_source.metrics();
275 let split_metrics = SplitMetrics::new(&metrics, partition);
276 Ok(Box::pin(BatchSplitStream::new(
277 stream,
278 batch_size,
279 split_metrics,
280 )))
281 }
282
283 fn metrics(&self) -> Option<MetricsSet> {
284 Some(self.data_source.metrics().clone_inner())
285 }
286
287 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
288 if let Some(partition) = partition {
289 let mut statistics = Statistics::new_unknown(&self.schema());
290 if let Some(file_config) =
291 self.data_source.as_any().downcast_ref::<FileScanConfig>()
292 {
293 if let Some(file_group) = file_config.file_groups.get(partition) {
294 if let Some(stat) = file_group.file_statistics(None) {
295 statistics = stat.clone();
296 }
297 }
298 }
299 Ok(statistics)
300 } else {
301 Ok(self.data_source.statistics()?)
302 }
303 }
304
305 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
306 let data_source = self.data_source.with_fetch(limit)?;
307 let cache = self.cache.clone();
308
309 Some(Arc::new(Self { data_source, cache }))
310 }
311
312 fn fetch(&self) -> Option<usize> {
313 self.data_source.fetch()
314 }
315
316 fn try_swapping_with_projection(
317 &self,
318 projection: &ProjectionExec,
319 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
320 match self
321 .data_source
322 .try_swapping_with_projection(projection.expr())?
323 {
324 Some(new_data_source) => {
325 Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
326 }
327 None => Ok(None),
328 }
329 }
330
331 fn handle_child_pushdown_result(
332 &self,
333 _phase: FilterPushdownPhase,
334 child_pushdown_result: ChildPushdownResult,
335 config: &ConfigOptions,
336 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
337 // Push any remaining filters into our data source
338 let parent_filters = child_pushdown_result
339 .parent_filters
340 .into_iter()
341 .map(|f| f.filter)
342 .collect_vec();
343 let res = self
344 .data_source
345 .try_pushdown_filters(parent_filters.clone(), config)?;
346 match res.updated_node {
347 Some(data_source) => {
348 let mut new_node = self.clone();
349 new_node.data_source = data_source;
350 // Re-compute properties since we have new filters which will impact equivalence info
351 new_node.cache =
352 Self::compute_properties(Arc::clone(&new_node.data_source));
353
354 Ok(FilterPushdownPropagation {
355 filters: res.filters,
356 updated_node: Some(Arc::new(new_node)),
357 })
358 }
359 None => Ok(FilterPushdownPropagation {
360 filters: res.filters,
361 updated_node: None,
362 }),
363 }
364 }
365}
366
367impl DataSourceExec {
368 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
369 Arc::new(Self::new(Arc::new(data_source)))
370 }
371
372 // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
373 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
374 let cache = Self::compute_properties(Arc::clone(&data_source));
375 Self { data_source, cache }
376 }
377
378 /// Return the source object
379 pub fn data_source(&self) -> &Arc<dyn DataSource> {
380 &self.data_source
381 }
382
383 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
384 self.cache = Self::compute_properties(Arc::clone(&data_source));
385 self.data_source = data_source;
386 self
387 }
388
389 /// Assign constraints
390 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
391 self.cache = self.cache.with_constraints(constraints);
392 self
393 }
394
395 /// Assign output partitioning
396 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
397 self.cache = self.cache.with_partitioning(partitioning);
398 self
399 }
400
401 fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
402 PlanProperties::new(
403 data_source.eq_properties(),
404 data_source.output_partitioning(),
405 EmissionType::Incremental,
406 Boundedness::Bounded,
407 )
408 .with_scheduling_type(data_source.scheduling_type())
409 }
410
411 /// Downcast the `DataSourceExec`'s `data_source` to a specific file source
412 ///
413 /// Returns `None` if
414 /// 1. the datasource is not scanning files (`FileScanConfig`)
415 /// 2. The [`FileScanConfig::file_source`] is not of type `T`
416 pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
417 self.data_source()
418 .as_any()
419 .downcast_ref::<FileScanConfig>()
420 .and_then(|file_scan_conf| {
421 file_scan_conf
422 .file_source()
423 .as_any()
424 .downcast_ref::<T>()
425 .map(|source| (file_scan_conf, source))
426 })
427 }
428}
429
430/// Create a new `DataSourceExec` from a `DataSource`
431impl<S> From<S> for DataSourceExec
432where
433 S: DataSource + 'static,
434{
435 fn from(source: S) -> Self {
436 Self::new(Arc::new(source))
437 }
438}