datafusion_physical_plan/execution_plan.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19use crate::filter_pushdown::{
20 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21 FilterPushdownPropagation,
22};
23pub use crate::metrics::Metric;
24pub use crate::ordering::InputOrderMode;
25pub use crate::stream::EmptyRecordBatchStream;
26
27pub use datafusion_common::hash_utils;
28pub use datafusion_common::utils::project_schema;
29pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
30pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
31pub use datafusion_expr::{Accumulator, ColumnarValue};
32pub use datafusion_physical_expr::window::WindowExpr;
33pub use datafusion_physical_expr::{
34 expressions, Distribution, Partitioning, PhysicalExpr,
35};
36
37use std::any::Any;
38use std::fmt::Debug;
39use std::sync::Arc;
40
41use crate::coalesce_partitions::CoalescePartitionsExec;
42use crate::display::DisplayableExecutionPlan;
43use crate::metrics::MetricsSet;
44use crate::projection::ProjectionExec;
45use crate::stream::RecordBatchStreamAdapter;
46
47use arrow::array::{Array, RecordBatch};
48use arrow::datatypes::SchemaRef;
49use datafusion_common::config::ConfigOptions;
50use datafusion_common::{exec_err, Constraints, DataFusionError, Result};
51use datafusion_common_runtime::JoinSet;
52use datafusion_execution::TaskContext;
53use datafusion_physical_expr::EquivalenceProperties;
54use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
55
56use futures::stream::{StreamExt, TryStreamExt};
57
58/// Represent nodes in the DataFusion Physical Plan.
59///
60/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
61/// [`RecordBatch`] that incrementally computes a partition of the
62/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
63/// details on partitioning.
64///
65/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
66/// properties of the output to the DataFusion optimizer, and methods such as
67/// [`required_input_distribution`] and [`required_input_ordering`] express
68/// requirements of the `ExecutionPlan` from its input.
69///
70/// [`ExecutionPlan`] can be displayed in a simplified form using the
71/// return value from [`displayable`] in addition to the (normally
72/// quite verbose) `Debug` output.
73///
74/// [`execute`]: ExecutionPlan::execute
75/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
76/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
77///
78/// # Examples
79///
80/// See [`datafusion-examples`] for examples, including
81/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom
82/// `ExecutionPlan` with memory tracking and spilling support.
83///
84/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
85/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_execution_plan.rs
86pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
87 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
88 ///
89 /// Implementation note: this method can just proxy to
90 /// [`static_name`](ExecutionPlan::static_name) if no special action is
91 /// needed. It doesn't provide a default implementation like that because
92 /// this method doesn't require the `Sized` constrain to allow a wilder
93 /// range of use cases.
94 fn name(&self) -> &str;
95
96 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
97 /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
98 fn static_name() -> &'static str
99 where
100 Self: Sized,
101 {
102 let full_name = std::any::type_name::<Self>();
103 let maybe_start_idx = full_name.rfind(':');
104 match maybe_start_idx {
105 Some(start_idx) => &full_name[start_idx + 1..],
106 None => "UNKNOWN",
107 }
108 }
109
110 /// Returns the execution plan as [`Any`] so that it can be
111 /// downcast to a specific implementation.
112 fn as_any(&self) -> &dyn Any;
113
114 /// Get the schema for this execution plan
115 fn schema(&self) -> SchemaRef {
116 Arc::clone(self.properties().schema())
117 }
118
119 /// Return properties of the output of the `ExecutionPlan`, such as output
120 /// ordering(s), partitioning information etc.
121 ///
122 /// This information is available via methods on [`ExecutionPlanProperties`]
123 /// trait, which is implemented for all `ExecutionPlan`s.
124 fn properties(&self) -> &PlanProperties;
125
126 /// Returns an error if this individual node does not conform to its invariants.
127 /// These invariants are typically only checked in debug mode.
128 ///
129 /// A default set of invariants is provided in the [check_default_invariants] function.
130 /// The default implementation of `check_invariants` calls this function.
131 /// Extension nodes can provide their own invariants.
132 fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
133 check_default_invariants(self, check)
134 }
135
136 /// Specifies the data distribution requirements for all the
137 /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
138 fn required_input_distribution(&self) -> Vec<Distribution> {
139 vec![Distribution::UnspecifiedDistribution; self.children().len()]
140 }
141
142 /// Specifies the ordering required for all of the children of this
143 /// `ExecutionPlan`.
144 ///
145 /// For each child, it's the local ordering requirement within
146 /// each partition rather than the global ordering
147 ///
148 /// NOTE that checking `!is_empty()` does **not** check for a
149 /// required input ordering. Instead, the correct check is that at
150 /// least one entry must be `Some`
151 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
152 vec![None; self.children().len()]
153 }
154
155 /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
156 /// rows within or between partitions.
157 ///
158 /// For example, Projection, Filter, and Limit maintain the order
159 /// of inputs -- they may transform values (Projection) or not
160 /// produce the same number of rows that went in (Filter and
161 /// Limit), but the rows that are produced go in the same way.
162 ///
163 /// DataFusion uses this metadata to apply certain optimizations
164 /// such as automatically repartitioning correctly.
165 ///
166 /// The default implementation returns `false`
167 ///
168 /// WARNING: if you override this default, you *MUST* ensure that
169 /// the `ExecutionPlan`'s maintains the ordering invariant or else
170 /// DataFusion may produce incorrect results.
171 fn maintains_input_order(&self) -> Vec<bool> {
172 vec![false; self.children().len()]
173 }
174
175 /// Specifies whether the `ExecutionPlan` benefits from increased
176 /// parallelization at its input for each child.
177 ///
178 /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
179 /// its corresponding child (and thus from more parallelism). For
180 /// `ExecutionPlan` that do very little work the overhead of extra
181 /// parallelism may outweigh any benefits
182 ///
183 /// The default implementation returns `true` unless this `ExecutionPlan`
184 /// has signalled it requires a single child input partition.
185 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
186 // By default try to maximize parallelism with more CPUs if
187 // possible
188 self.required_input_distribution()
189 .into_iter()
190 .map(|dist| !matches!(dist, Distribution::SinglePartition))
191 .collect()
192 }
193
194 /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
195 /// The returned list will be empty for leaf nodes such as scans, will contain
196 /// a single value for unary nodes, or two values for binary nodes (such as
197 /// joins).
198 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
199
200 /// Returns a new `ExecutionPlan` where all existing children were replaced
201 /// by the `children`, in order
202 fn with_new_children(
203 self: Arc<Self>,
204 children: Vec<Arc<dyn ExecutionPlan>>,
205 ) -> Result<Arc<dyn ExecutionPlan>>;
206
207 /// Reset any internal state within this [`ExecutionPlan`].
208 ///
209 /// This method is called when an [`ExecutionPlan`] needs to be re-executed,
210 /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
211 /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
212 /// are reset to their initial state.
213 ///
214 /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
215 /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
216 /// necessarily resetting any internal state. Implementations that require resetting of some
217 /// internal state should override this method to provide the necessary logic.
218 ///
219 /// This method should *not* reset state recursively for children, as it is expected that
220 /// it will be called from within a walk of the execution plan tree so that it will be called on each child later
221 /// or was already called on each child.
222 ///
223 /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
224 /// thus it is expected that any cached plan properties will remain valid after the reset.
225 ///
226 /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
227 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
228 let children = self.children().into_iter().cloned().collect();
229 self.with_new_children(children)
230 }
231
232 /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
233 /// produce `target_partitions` partitions.
234 ///
235 /// If the `ExecutionPlan` does not support changing its partitioning,
236 /// returns `Ok(None)` (the default).
237 ///
238 /// It is the `ExecutionPlan` can increase its partitioning, but not to the
239 /// `target_partitions`, it may return an ExecutionPlan with fewer
240 /// partitions. This might happen, for example, if each new partition would
241 /// be too small to be efficiently processed individually.
242 ///
243 /// The DataFusion optimizer attempts to use as many threads as possible by
244 /// repartitioning its inputs to match the target number of threads
245 /// available (`target_partitions`). Some data sources, such as the built in
246 /// CSV and Parquet readers, implement this method as they are able to read
247 /// from their input files in parallel, regardless of how the source data is
248 /// split amongst files.
249 fn repartitioned(
250 &self,
251 _target_partitions: usize,
252 _config: &ConfigOptions,
253 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
254 Ok(None)
255 }
256
257 /// Begin execution of `partition`, returning a [`Stream`] of
258 /// [`RecordBatch`]es.
259 ///
260 /// # Notes
261 ///
262 /// The `execute` method itself is not `async` but it returns an `async`
263 /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
264 /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
265 /// Most `ExecutionPlan`s should not do any work before the first
266 /// `RecordBatch` is requested from the stream.
267 ///
268 /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
269 /// [`Stream`] into a [`SendableRecordBatchStream`].
270 ///
271 /// Using `async` `Streams` allows for network I/O during execution and
272 /// takes advantage of Rust's built in support for `async` continuations and
273 /// crate ecosystem.
274 ///
275 /// [`Stream`]: futures::stream::Stream
276 /// [`StreamExt`]: futures::stream::StreamExt
277 /// [`TryStreamExt`]: futures::stream::TryStreamExt
278 /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
279 ///
280 /// # Error handling
281 ///
282 /// Any error that occurs during execution is sent as an `Err` in the output
283 /// stream.
284 ///
285 /// `ExecutionPlan` implementations in DataFusion cancel additional work
286 /// immediately once an error occurs. The rationale is that if the overall
287 /// query will return an error, any additional work such as continued
288 /// polling of inputs will be wasted as it will be thrown away.
289 ///
290 /// # Cancellation / Aborting Execution
291 ///
292 /// The [`Stream`] that is returned must ensure that any allocated resources
293 /// are freed when the stream itself is dropped. This is particularly
294 /// important for [`spawn`]ed tasks or threads. Unless care is taken to
295 /// "abort" such tasks, they may continue to consume resources even after
296 /// the plan is dropped, generating intermediate results that are never
297 /// used.
298 /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
299 ///
300 /// To enable timely cancellation, the [`Stream`] that is returned must not
301 /// block the CPU indefinitely and must yield back to the tokio runtime regularly.
302 /// In a typical [`ExecutionPlan`], this automatically happens unless there are
303 /// special circumstances; e.g. when the computational complexity of processing a
304 /// batch is superlinear. See this [general guideline][async-guideline] for more context
305 /// on this point, which explains why one should avoid spending a long time without
306 /// reaching an `await`/yield point in asynchronous runtimes.
307 /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
308 /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
309 /// [`tokio::task::yield_now()`] when appropriate.
310 /// In special cases that warrant manual yielding, determination for "regularly" may be
311 /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
312 /// a timer (being careful with the overhead-heavy system call needed to take the time), or by
313 /// counting rows or batches.
314 ///
315 /// The [cancellation benchmark] tracks some cases of how quickly queries can
316 /// be cancelled.
317 ///
318 /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
319 /// for structures to help ensure all background tasks are cancelled.
320 ///
321 /// [`spawn`]: tokio::task::spawn
322 /// [cancellation benchmark]: https://github.com/apache/datafusion/blob/main/benchmarks/README.md#cancellation
323 /// [`JoinSet`]: datafusion_common_runtime::JoinSet
324 /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
325 /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
326 /// [`Poll::Pending`]: std::task::Poll::Pending
327 /// [async-guideline]: https://ryhl.io/blog/async-what-is-blocking/
328 ///
329 /// # Implementation Examples
330 ///
331 /// While `async` `Stream`s have a non trivial learning curve, the
332 /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
333 /// which help simplify many common operations.
334 ///
335 /// Here are some common patterns:
336 ///
337 /// ## Return Precomputed `RecordBatch`
338 ///
339 /// We can return a precomputed `RecordBatch` as a `Stream`:
340 ///
341 /// ```
342 /// # use std::sync::Arc;
343 /// # use arrow::array::RecordBatch;
344 /// # use arrow::datatypes::SchemaRef;
345 /// # use datafusion_common::Result;
346 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
347 /// # use datafusion_physical_plan::memory::MemoryStream;
348 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
349 /// struct MyPlan {
350 /// batch: RecordBatch,
351 /// }
352 ///
353 /// impl MyPlan {
354 /// fn execute(
355 /// &self,
356 /// partition: usize,
357 /// context: Arc<TaskContext>
358 /// ) -> Result<SendableRecordBatchStream> {
359 /// // use functions from futures crate convert the batch into a stream
360 /// let fut = futures::future::ready(Ok(self.batch.clone()));
361 /// let stream = futures::stream::once(fut);
362 /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
363 /// }
364 /// }
365 /// ```
366 ///
367 /// ## Lazily (async) Compute `RecordBatch`
368 ///
369 /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
370 ///
371 /// ```
372 /// # use std::sync::Arc;
373 /// # use arrow::array::RecordBatch;
374 /// # use arrow::datatypes::SchemaRef;
375 /// # use datafusion_common::Result;
376 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
377 /// # use datafusion_physical_plan::memory::MemoryStream;
378 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
379 /// struct MyPlan {
380 /// schema: SchemaRef,
381 /// }
382 ///
383 /// /// Returns a single batch when the returned stream is polled
384 /// async fn get_batch() -> Result<RecordBatch> {
385 /// todo!()
386 /// }
387 ///
388 /// impl MyPlan {
389 /// fn execute(
390 /// &self,
391 /// partition: usize,
392 /// context: Arc<TaskContext>
393 /// ) -> Result<SendableRecordBatchStream> {
394 /// let fut = get_batch();
395 /// let stream = futures::stream::once(fut);
396 /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
397 /// }
398 /// }
399 /// ```
400 ///
401 /// ## Lazily (async) create a Stream
402 ///
403 /// If you need to create the return `Stream` using an `async` function,
404 /// you can do so by flattening the result:
405 ///
406 /// ```
407 /// # use std::sync::Arc;
408 /// # use arrow::array::RecordBatch;
409 /// # use arrow::datatypes::SchemaRef;
410 /// # use futures::TryStreamExt;
411 /// # use datafusion_common::Result;
412 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
413 /// # use datafusion_physical_plan::memory::MemoryStream;
414 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
415 /// struct MyPlan {
416 /// schema: SchemaRef,
417 /// }
418 ///
419 /// /// async function that returns a stream
420 /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
421 /// todo!()
422 /// }
423 ///
424 /// impl MyPlan {
425 /// fn execute(
426 /// &self,
427 /// partition: usize,
428 /// context: Arc<TaskContext>
429 /// ) -> Result<SendableRecordBatchStream> {
430 /// // A future that yields a stream
431 /// let fut = get_batch_stream();
432 /// // Use TryStreamExt::try_flatten to flatten the stream of streams
433 /// let stream = futures::stream::once(fut).try_flatten();
434 /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
435 /// }
436 /// }
437 /// ```
438 fn execute(
439 &self,
440 partition: usize,
441 context: Arc<TaskContext>,
442 ) -> Result<SendableRecordBatchStream>;
443
444 /// Return a snapshot of the set of [`Metric`]s for this
445 /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
446 ///
447 /// While the values of the metrics in the returned
448 /// [`MetricsSet`]s may change as execution progresses, the
449 /// specific metrics will not.
450 ///
451 /// Once `self.execute()` has returned (technically the future is
452 /// resolved) for all available partitions, the set of metrics
453 /// should be complete. If this function is called prior to
454 /// `execute()` new metrics may appear in subsequent calls.
455 fn metrics(&self) -> Option<MetricsSet> {
456 None
457 }
458
459 /// Returns statistics for this `ExecutionPlan` node. If statistics are not
460 /// available, should return [`Statistics::new_unknown`] (the default), not
461 /// an error.
462 ///
463 /// For TableScan executors, which supports filter pushdown, special attention
464 /// needs to be paid to whether the stats returned by this method are exact or not
465 #[deprecated(since = "48.0.0", note = "Use `partition_statistics` method instead")]
466 fn statistics(&self) -> Result<Statistics> {
467 Ok(Statistics::new_unknown(&self.schema()))
468 }
469
470 /// Returns statistics for a specific partition of this `ExecutionPlan` node.
471 /// If statistics are not available, should return [`Statistics::new_unknown`]
472 /// (the default), not an error.
473 /// If `partition` is `None`, it returns statistics for the entire plan.
474 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
475 if let Some(idx) = partition {
476 // Validate partition index
477 let partition_count = self.properties().partitioning.partition_count();
478 if idx >= partition_count {
479 return internal_err!(
480 "Invalid partition index: {}, the partition count is {}",
481 idx,
482 partition_count
483 );
484 }
485 }
486 Ok(Statistics::new_unknown(&self.schema()))
487 }
488
489 /// Returns `true` if a limit can be safely pushed down through this
490 /// `ExecutionPlan` node.
491 ///
492 /// If this method returns `true`, and the query plan contains a limit at
493 /// the output of this node, DataFusion will push the limit to the input
494 /// of this node.
495 fn supports_limit_pushdown(&self) -> bool {
496 false
497 }
498
499 /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
500 /// fetch limits. Returns `None` otherwise.
501 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
502 None
503 }
504
505 /// Gets the fetch count for the operator, `None` means there is no fetch.
506 fn fetch(&self) -> Option<usize> {
507 None
508 }
509
510 /// Gets the effect on cardinality, if known
511 fn cardinality_effect(&self) -> CardinalityEffect {
512 CardinalityEffect::Unknown
513 }
514
515 /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
516 ///
517 /// If the operator supports this optimization, the resulting plan will be:
518 /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
519 /// Otherwise, it returns the current `ExecutionPlan` as-is.
520 ///
521 /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
522 /// or not possible, or `Err` on failure.
523 fn try_swapping_with_projection(
524 &self,
525 _projection: &ProjectionExec,
526 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
527 Ok(None)
528 }
529
530 /// Collect filters that this node can push down to its children.
531 /// Filters that are being pushed down from parents are passed in,
532 /// and the node may generate additional filters to push down.
533 /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
534 /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`:
535 /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent
536 /// filters so it only returns that `FilterExec` wants to push down its own predicate.
537 /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from
538 /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key)
539 /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
540 /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec`
541 /// and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything
542 /// since it has no children and no additional filters to push down.
543 /// It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse
544 /// up the plan that `DataSourceExec` can actually bind the filters.
545 ///
546 /// The default implementation bars all parent filters from being pushed down and adds no new filters.
547 /// This is the safest option, making filter pushdown opt-in on a per-node pasis.
548 ///
549 /// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
550 /// Depending on the phase the operator may or may not be allowed to modify the plan.
551 /// See [`FilterPushdownPhase`] for more details.
552 fn gather_filters_for_pushdown(
553 &self,
554 _phase: FilterPushdownPhase,
555 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
556 _config: &ConfigOptions,
557 ) -> Result<FilterDescription> {
558 Ok(FilterDescription::all_unsupported(
559 &parent_filters,
560 &self.children(),
561 ))
562 }
563
564 /// Handle the result of a child pushdown.
565 /// This method is called as we recurse back up the plan tree after pushing
566 /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
567 /// It allows the current node to process the results of filter pushdown from
568 /// its children, deciding whether to absorb filters, modify the plan, or pass
569 /// filters back up to its parent.
570 ///
571 /// **Purpose and Context:**
572 /// Filter pushdown is a critical optimization in DataFusion that aims to
573 /// reduce the amount of data processed by applying filters as early as
574 /// possible in the query plan. This method is part of the second phase of
575 /// filter pushdown, where results are propagated back up the tree after
576 /// being pushed down. Each node can inspect the pushdown results from its
577 /// children and decide how to handle any unapplied filters, potentially
578 /// optimizing the plan structure or filter application.
579 ///
580 /// **Behavior in Different Nodes:**
581 /// - For a `DataSourceExec`, this often means absorbing the filters to apply
582 /// them during the scan phase (late materialization), reducing the data
583 /// read from the source.
584 /// - A `FilterExec` may absorb any filters its children could not handle,
585 /// combining them with its own predicate. If no filters remain (i.e., the
586 /// predicate becomes trivially true), it may remove itself from the plan
587 /// altogether. It typically marks parent filters as supported, indicating
588 /// they have been handled.
589 /// - A `HashJoinExec` might ignore the pushdown result if filters need to
590 /// be applied during the join operation. It passes the parent filters back
591 /// up wrapped in [`FilterPushdownPropagation::if_any`], discarding
592 /// any self-filters from children.
593 ///
594 /// **Example Walkthrough:**
595 /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
596 /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
597 /// `FilterExec`, the filter `f1` is gathered and pushed down to
598 /// `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
599 /// the join or add its own filters (e.g., a min-max filter from the build side),
600 /// then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
601 /// has no children to push to, so it prepares to handle filters in the
602 /// upward phase.
603 /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
604 /// `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
605 /// for late materialization during scanning, marking them as supported.
606 /// `HashJoinExec` receives the result, decides whether to apply any
607 /// remaining filters during the join, and passes unhandled filters back
608 /// up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
609 /// updates its predicate if necessary, or removes itself if the predicate
610 /// becomes trivial (e.g., `lit(true)`), and marks filters as supported
611 /// for its parent.
612 ///
613 /// The default implementation is a no-op that passes the result of pushdown
614 /// from the children to its parent transparently, ensuring no filters are
615 /// lost if a node does not override this behavior.
616 ///
617 /// **Notes for Implementation:**
618 /// When returning filters via [`FilterPushdownPropagation`], the order of
619 /// filters need not match the order they were passed in via
620 /// `child_pushdown_result`. However, preserving the order is recommended for
621 /// debugging and ease of reasoning about the resulting plans.
622 ///
623 /// **Helper Methods for Customization:**
624 /// There are various helper methods to simplify implementing this method:
625 /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as
626 /// supported as long as at least one child supports them.
627 /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as
628 /// supported as long as all children support them.
629 /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters
630 /// to the propagation result, indicating which filters are supported by
631 /// the current node.
632 /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
633 /// current node in the propagation result, used if the node
634 /// has modified its plan based on the pushdown results.
635 ///
636 /// **Filter Pushdown Phases:**
637 /// There are two different phases in filter pushdown (`Pre` and others),
638 /// which some operators may handle differently. Depending on the phase, the
639 /// operator may or may not be allowed to modify the plan. See
640 /// [`FilterPushdownPhase`] for more details on phase-specific behavior.
641 ///
642 /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported
643 fn handle_child_pushdown_result(
644 &self,
645 _phase: FilterPushdownPhase,
646 child_pushdown_result: ChildPushdownResult,
647 _config: &ConfigOptions,
648 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
649 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
650 }
651
652 /// Injects arbitrary run-time state into this execution plan, returning a new plan
653 /// instance that incorporates that state *if* it is relevant to the concrete
654 /// node implementation.
655 ///
656 /// This is a generic entry point: the `state` can be any type wrapped in
657 /// `Arc<dyn Any + Send + Sync>`. A node that cares about the state should
658 /// down-cast it to the concrete type it expects and, if successful, return a
659 /// modified copy of itself that captures the provided value. If the state is
660 /// not applicable, the default behaviour is to return `None` so that parent
661 /// nodes can continue propagating the attempt further down the plan tree.
662 ///
663 /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
664 /// down-casts the supplied state to an `Arc<WorkTable>`
665 /// in order to wire up the working table used during recursive-CTE execution.
666 /// Similar patterns can be followed by custom nodes that need late-bound
667 /// dependencies or shared state.
668 fn with_new_state(
669 &self,
670 _state: Arc<dyn Any + Send + Sync>,
671 ) -> Option<Arc<dyn ExecutionPlan>> {
672 None
673 }
674}
675
676/// [`ExecutionPlan`] Invariant Level
677///
678/// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan`
679///
680/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
681#[derive(Clone, Copy)]
682pub enum InvariantLevel {
683 /// Invariants that are always true for the [`ExecutionPlan`] node
684 /// such as the number of expected children.
685 Always,
686 /// Invariants that must hold true for the [`ExecutionPlan`] node
687 /// to be "executable", such as ordering and/or distribution requirements
688 /// being fulfilled.
689 Executable,
690}
691
692/// Extension trait provides an easy API to fetch various properties of
693/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
694pub trait ExecutionPlanProperties {
695 /// Specifies how the output of this `ExecutionPlan` is split into
696 /// partitions.
697 fn output_partitioning(&self) -> &Partitioning;
698
699 /// If the output of this `ExecutionPlan` within each partition is sorted,
700 /// returns `Some(keys)` describing the ordering. A `None` return value
701 /// indicates no assumptions should be made on the output ordering.
702 ///
703 /// For example, `SortExec` (obviously) produces sorted output as does
704 /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
705 /// output if its input is sorted as it does not reorder the input rows.
706 fn output_ordering(&self) -> Option<&LexOrdering>;
707
708 /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
709 /// For more details, see [`Boundedness`].
710 fn boundedness(&self) -> Boundedness;
711
712 /// Indicates how the stream of this `ExecutionPlan` emits its results.
713 /// For more details, see [`EmissionType`].
714 fn pipeline_behavior(&self) -> EmissionType;
715
716 /// Get the [`EquivalenceProperties`] within the plan.
717 ///
718 /// Equivalence properties tell DataFusion what columns are known to be
719 /// equal, during various optimization passes. By default, this returns "no
720 /// known equivalences" which is always correct, but may cause DataFusion to
721 /// unnecessarily resort data.
722 ///
723 /// If this ExecutionPlan makes no changes to the schema of the rows flowing
724 /// through it or how columns within each row relate to each other, it
725 /// should return the equivalence properties of its input. For
726 /// example, since [`FilterExec`] may remove rows from its input, but does not
727 /// otherwise modify them, it preserves its input equivalence properties.
728 /// However, since `ProjectionExec` may calculate derived expressions, it
729 /// needs special handling.
730 ///
731 /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
732 /// for related concepts.
733 ///
734 /// [`FilterExec`]: crate::filter::FilterExec
735 fn equivalence_properties(&self) -> &EquivalenceProperties;
736}
737
738impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
739 fn output_partitioning(&self) -> &Partitioning {
740 self.properties().output_partitioning()
741 }
742
743 fn output_ordering(&self) -> Option<&LexOrdering> {
744 self.properties().output_ordering()
745 }
746
747 fn boundedness(&self) -> Boundedness {
748 self.properties().boundedness
749 }
750
751 fn pipeline_behavior(&self) -> EmissionType {
752 self.properties().emission_type
753 }
754
755 fn equivalence_properties(&self) -> &EquivalenceProperties {
756 self.properties().equivalence_properties()
757 }
758}
759
760impl ExecutionPlanProperties for &dyn ExecutionPlan {
761 fn output_partitioning(&self) -> &Partitioning {
762 self.properties().output_partitioning()
763 }
764
765 fn output_ordering(&self) -> Option<&LexOrdering> {
766 self.properties().output_ordering()
767 }
768
769 fn boundedness(&self) -> Boundedness {
770 self.properties().boundedness
771 }
772
773 fn pipeline_behavior(&self) -> EmissionType {
774 self.properties().emission_type
775 }
776
777 fn equivalence_properties(&self) -> &EquivalenceProperties {
778 self.properties().equivalence_properties()
779 }
780}
781
782/// Represents whether a stream of data **generated** by an operator is bounded (finite)
783/// or unbounded (infinite).
784///
785/// This is used to determine whether an execution plan will eventually complete
786/// processing all its data (bounded) or could potentially run forever (unbounded).
787///
788/// For unbounded streams, it also tracks whether the operator requires finite memory
789/// to process the stream or if memory usage could grow unbounded.
790///
791/// Boundedness of the output stream is based on the the boundedness of the input stream and the nature of
792/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
793#[derive(Debug, Clone, Copy, PartialEq, Eq)]
794pub enum Boundedness {
795 /// The data stream is bounded (finite) and will eventually complete
796 Bounded,
797 /// The data stream is unbounded (infinite) and could run forever
798 Unbounded {
799 /// Whether this operator requires infinite memory to process the unbounded stream.
800 /// If false, the operator can process an infinite stream with bounded memory.
801 /// If true, memory usage may grow unbounded while processing the stream.
802 ///
803 /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
804 /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
805 requires_infinite_memory: bool,
806 },
807}
808
809impl Boundedness {
810 pub fn is_unbounded(&self) -> bool {
811 matches!(self, Boundedness::Unbounded { .. })
812 }
813}
814
815/// Represents how an operator emits its output records.
816///
817/// This is used to determine whether an operator emits records incrementally as they arrive,
818/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
819/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
820///
821/// For example, in the following plan:
822/// ```text
823/// SortExec [EmissionType::Final]
824/// |_ on: [col1 ASC]
825/// FilterExec [EmissionType::Incremental]
826/// |_ pred: col2 > 100
827/// DataSourceExec [EmissionType::Incremental]
828/// |_ file: "data.csv"
829/// ```
830/// - DataSourceExec emits records incrementally as it reads from the file
831/// - FilterExec processes and emits filtered records incrementally as they arrive
832/// - SortExec must wait for all input records before it can emit the sorted result,
833/// since it needs to see all values to determine their final order
834///
835/// Left joins can emit both incrementally and finally:
836/// - Incrementally emit matches as they are found
837/// - Finally emit non-matches after all input is processed
838#[derive(Debug, Clone, Copy, PartialEq, Eq)]
839pub enum EmissionType {
840 /// Records are emitted incrementally as they arrive and are processed
841 Incremental,
842 /// Records are only emitted once all input has been processed
843 Final,
844 /// Records can be emitted both incrementally and as a final result
845 Both,
846}
847
848/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the
849/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details.
850#[derive(Debug, Clone, Copy, PartialEq, Eq)]
851pub enum SchedulingType {
852 /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in
853 /// cooperative scheduling. This means the implementation of the `Stream` returned by
854 /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as
855 /// [`tokio::task::coop::consume_budget`].
856 ///
857 /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to
858 /// the [`coop`](crate::coop) module for details on when it may be useful to use
859 /// `Cooperative` instead.
860 NonCooperative,
861 /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in
862 /// cooperative scheduling by consuming task budget when it was able to produce a
863 /// [`RecordBatch`].
864 Cooperative,
865}
866
867/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
868///
869/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
870/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
871///
872/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
873/// is known as data-driven or eager evaluation.
874#[derive(Debug, Clone, Copy, PartialEq, Eq)]
875pub enum EvaluationType {
876 /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
877 /// instances when it is demanded by invoking `Stream::poll_next`.
878 /// Filter, projection, and join are examples of such lazy operators.
879 ///
880 /// Lazy operators are also known as demand-driven operators.
881 Lazy,
882 /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
883 /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
884 /// `Stream::poll_next` is called.
885 /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
886 ///
887 /// Eager operators are also known as a data-driven operators.
888 Eager,
889}
890
891/// Utility to determine an operator's boundedness based on its children's boundedness.
892///
893/// Assumes boundedness can be inferred from child operators:
894/// - Unbounded (requires_infinite_memory: true) takes precedence.
895/// - Unbounded (requires_infinite_memory: false) is considered next.
896/// - Otherwise, the operator is bounded.
897///
898/// **Note:** This is a general-purpose utility and may not apply to
899/// all multi-child operators. Ensure your operator's behavior aligns
900/// with these assumptions before using.
901pub(crate) fn boundedness_from_children<'a>(
902 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
903) -> Boundedness {
904 let mut unbounded_with_finite_mem = false;
905
906 for child in children {
907 match child.boundedness() {
908 Boundedness::Unbounded {
909 requires_infinite_memory: true,
910 } => {
911 return Boundedness::Unbounded {
912 requires_infinite_memory: true,
913 }
914 }
915 Boundedness::Unbounded {
916 requires_infinite_memory: false,
917 } => {
918 unbounded_with_finite_mem = true;
919 }
920 Boundedness::Bounded => {}
921 }
922 }
923
924 if unbounded_with_finite_mem {
925 Boundedness::Unbounded {
926 requires_infinite_memory: false,
927 }
928 } else {
929 Boundedness::Bounded
930 }
931}
932
933/// Determines the emission type of an operator based on its children's pipeline behavior.
934///
935/// The precedence of emission types is:
936/// - `Final` has the highest precedence.
937/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
938/// - `Incremental` is the default if all children emit incremental results.
939///
940/// **Note:** This is a general-purpose utility and may not apply to
941/// all multi-child operators. Verify your operator's behavior aligns
942/// with these assumptions.
943pub(crate) fn emission_type_from_children<'a>(
944 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
945) -> EmissionType {
946 let mut inc_and_final = false;
947
948 for child in children {
949 match child.pipeline_behavior() {
950 EmissionType::Final => return EmissionType::Final,
951 EmissionType::Both => inc_and_final = true,
952 EmissionType::Incremental => continue,
953 }
954 }
955
956 if inc_and_final {
957 EmissionType::Both
958 } else {
959 EmissionType::Incremental
960 }
961}
962
963/// Stores certain, often expensive to compute, plan properties used in query
964/// optimization.
965///
966/// These properties are stored a single structure to permit this information to
967/// be computed once and then those cached results used multiple times without
968/// recomputation (aka a cache)
969#[derive(Debug, Clone)]
970pub struct PlanProperties {
971 /// See [ExecutionPlanProperties::equivalence_properties]
972 pub eq_properties: EquivalenceProperties,
973 /// See [ExecutionPlanProperties::output_partitioning]
974 pub partitioning: Partitioning,
975 /// See [ExecutionPlanProperties::pipeline_behavior]
976 pub emission_type: EmissionType,
977 /// See [ExecutionPlanProperties::boundedness]
978 pub boundedness: Boundedness,
979 pub evaluation_type: EvaluationType,
980 pub scheduling_type: SchedulingType,
981 /// See [ExecutionPlanProperties::output_ordering]
982 output_ordering: Option<LexOrdering>,
983}
984
985impl PlanProperties {
986 /// Construct a new `PlanPropertiesCache` from the
987 pub fn new(
988 eq_properties: EquivalenceProperties,
989 partitioning: Partitioning,
990 emission_type: EmissionType,
991 boundedness: Boundedness,
992 ) -> Self {
993 // Output ordering can be derived from `eq_properties`.
994 let output_ordering = eq_properties.output_ordering();
995 Self {
996 eq_properties,
997 partitioning,
998 emission_type,
999 boundedness,
1000 evaluation_type: EvaluationType::Lazy,
1001 scheduling_type: SchedulingType::NonCooperative,
1002 output_ordering,
1003 }
1004 }
1005
1006 /// Overwrite output partitioning with its new value.
1007 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
1008 self.partitioning = partitioning;
1009 self
1010 }
1011
1012 /// Overwrite equivalence properties with its new value.
1013 pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
1014 // Changing equivalence properties also changes output ordering, so
1015 // make sure to overwrite it:
1016 self.output_ordering = eq_properties.output_ordering();
1017 self.eq_properties = eq_properties;
1018 self
1019 }
1020
1021 /// Overwrite boundedness with its new value.
1022 pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
1023 self.boundedness = boundedness;
1024 self
1025 }
1026
1027 /// Overwrite emission type with its new value.
1028 pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
1029 self.emission_type = emission_type;
1030 self
1031 }
1032
1033 /// Set the [`SchedulingType`].
1034 ///
1035 /// Defaults to [`SchedulingType::NonCooperative`]
1036 pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
1037 self.scheduling_type = scheduling_type;
1038 self
1039 }
1040
1041 /// Set the [`EvaluationType`].
1042 ///
1043 /// Defaults to [`EvaluationType::Lazy`]
1044 pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
1045 self.evaluation_type = drive_type;
1046 self
1047 }
1048
1049 /// Overwrite constraints with its new value.
1050 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1051 self.eq_properties = self.eq_properties.with_constraints(constraints);
1052 self
1053 }
1054
1055 pub fn equivalence_properties(&self) -> &EquivalenceProperties {
1056 &self.eq_properties
1057 }
1058
1059 pub fn output_partitioning(&self) -> &Partitioning {
1060 &self.partitioning
1061 }
1062
1063 pub fn output_ordering(&self) -> Option<&LexOrdering> {
1064 self.output_ordering.as_ref()
1065 }
1066
1067 /// Get schema of the node.
1068 pub(crate) fn schema(&self) -> &SchemaRef {
1069 self.eq_properties.schema()
1070 }
1071}
1072
1073macro_rules! check_len {
1074 ($target:expr, $func_name:ident, $expected_len:expr) => {
1075 let actual_len = $target.$func_name().len();
1076 if actual_len != $expected_len {
1077 return internal_err!(
1078 "{}::{} returned Vec with incorrect size: {} != {}",
1079 $target.name(),
1080 stringify!($func_name),
1081 actual_len,
1082 $expected_len
1083 );
1084 }
1085 };
1086}
1087
1088/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1089/// Returns an error if the given node does not conform.
1090pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1091 plan: &P,
1092 _check: InvariantLevel,
1093) -> Result<(), DataFusionError> {
1094 let children_len = plan.children().len();
1095
1096 check_len!(plan, maintains_input_order, children_len);
1097 check_len!(plan, required_input_ordering, children_len);
1098 check_len!(plan, required_input_distribution, children_len);
1099 check_len!(plan, benefits_from_input_partitioning, children_len);
1100
1101 Ok(())
1102}
1103
1104/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
1105/// especially for the distributed engine to judge whether need to deal with shuffling.
1106/// Currently, there are 3 kinds of execution plan which needs data exchange
1107/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
1108/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
1109/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1110pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
1111 plan.properties().evaluation_type == EvaluationType::Eager
1112}
1113
1114/// Returns a copy of this plan if we change any child according to the pointer comparison.
1115/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1116pub fn with_new_children_if_necessary(
1117 plan: Arc<dyn ExecutionPlan>,
1118 children: Vec<Arc<dyn ExecutionPlan>>,
1119) -> Result<Arc<dyn ExecutionPlan>> {
1120 let old_children = plan.children();
1121 if children.len() != old_children.len() {
1122 internal_err!("Wrong number of children")
1123 } else if children.is_empty()
1124 || children
1125 .iter()
1126 .zip(old_children.iter())
1127 .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
1128 {
1129 plan.with_new_children(children)
1130 } else {
1131 Ok(plan)
1132 }
1133}
1134
1135/// Return a [`DisplayableExecutionPlan`] wrapper around an
1136/// [`ExecutionPlan`] which can be displayed in various easier to
1137/// understand ways.
1138///
1139/// See examples on [`DisplayableExecutionPlan`]
1140pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
1141 DisplayableExecutionPlan::new(plan)
1142}
1143
1144/// Execute the [ExecutionPlan] and collect the results in memory
1145pub async fn collect(
1146 plan: Arc<dyn ExecutionPlan>,
1147 context: Arc<TaskContext>,
1148) -> Result<Vec<RecordBatch>> {
1149 let stream = execute_stream(plan, context)?;
1150 crate::common::collect(stream).await
1151}
1152
1153/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
1154///
1155/// See [collect] to buffer the `RecordBatch`es in memory.
1156///
1157/// # Aborting Execution
1158///
1159/// Dropping the stream will abort the execution of the query, and free up
1160/// any allocated resources
1161pub fn execute_stream(
1162 plan: Arc<dyn ExecutionPlan>,
1163 context: Arc<TaskContext>,
1164) -> Result<SendableRecordBatchStream> {
1165 match plan.output_partitioning().partition_count() {
1166 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1167 1 => plan.execute(0, context),
1168 2.. => {
1169 // merge into a single partition
1170 let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
1171 // CoalescePartitionsExec must produce a single partition
1172 assert_eq!(1, plan.properties().output_partitioning().partition_count());
1173 plan.execute(0, context)
1174 }
1175 }
1176}
1177
1178/// Execute the [ExecutionPlan] and collect the results in memory
1179pub async fn collect_partitioned(
1180 plan: Arc<dyn ExecutionPlan>,
1181 context: Arc<TaskContext>,
1182) -> Result<Vec<Vec<RecordBatch>>> {
1183 let streams = execute_stream_partitioned(plan, context)?;
1184
1185 let mut join_set = JoinSet::new();
1186 // Execute the plan and collect the results into batches.
1187 streams.into_iter().enumerate().for_each(|(idx, stream)| {
1188 join_set.spawn(async move {
1189 let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
1190 (idx, result)
1191 });
1192 });
1193
1194 let mut batches = vec![];
1195 // Note that currently this doesn't identify the thread that panicked
1196 //
1197 // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
1198 // once it is stable
1199 while let Some(result) = join_set.join_next().await {
1200 match result {
1201 Ok((idx, res)) => batches.push((idx, res?)),
1202 Err(e) => {
1203 if e.is_panic() {
1204 std::panic::resume_unwind(e.into_panic());
1205 } else {
1206 unreachable!();
1207 }
1208 }
1209 }
1210 }
1211
1212 batches.sort_by_key(|(idx, _)| *idx);
1213 let batches = batches.into_iter().map(|(_, batch)| batch).collect();
1214
1215 Ok(batches)
1216}
1217
1218/// Execute the [ExecutionPlan] and return a vec with one stream per output
1219/// partition
1220///
1221/// # Aborting Execution
1222///
1223/// Dropping the stream will abort the execution of the query, and free up
1224/// any allocated resources
1225pub fn execute_stream_partitioned(
1226 plan: Arc<dyn ExecutionPlan>,
1227 context: Arc<TaskContext>,
1228) -> Result<Vec<SendableRecordBatchStream>> {
1229 let num_partitions = plan.output_partitioning().partition_count();
1230 let mut streams = Vec::with_capacity(num_partitions);
1231 for i in 0..num_partitions {
1232 streams.push(plan.execute(i, Arc::clone(&context))?);
1233 }
1234 Ok(streams)
1235}
1236
1237/// Executes an input stream and ensures that the resulting stream adheres to
1238/// the `not null` constraints specified in the `sink_schema`.
1239///
1240/// # Arguments
1241///
1242/// * `input` - An execution plan
1243/// * `sink_schema` - The schema to be applied to the output stream
1244/// * `partition` - The partition index to be executed
1245/// * `context` - The task context
1246///
1247/// # Returns
1248///
1249/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
1250///
1251/// This function first executes the given input plan for the specified partition
1252/// and context. It then checks if there are any columns in the input that might
1253/// violate the `not null` constraints specified in the `sink_schema`. If there are
1254/// such columns, it wraps the resulting stream to enforce the `not null` constraints
1255/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1256pub fn execute_input_stream(
1257 input: Arc<dyn ExecutionPlan>,
1258 sink_schema: SchemaRef,
1259 partition: usize,
1260 context: Arc<TaskContext>,
1261) -> Result<SendableRecordBatchStream> {
1262 let input_stream = input.execute(partition, context)?;
1263
1264 debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
1265
1266 // Find input columns that may violate the not null constraint.
1267 let risky_columns: Vec<_> = sink_schema
1268 .fields()
1269 .iter()
1270 .zip(input.schema().fields().iter())
1271 .enumerate()
1272 .filter_map(|(idx, (sink_field, input_field))| {
1273 (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
1274 })
1275 .collect();
1276
1277 if risky_columns.is_empty() {
1278 Ok(input_stream)
1279 } else {
1280 // Check not null constraint on the input stream
1281 Ok(Box::pin(RecordBatchStreamAdapter::new(
1282 sink_schema,
1283 input_stream
1284 .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
1285 )))
1286 }
1287}
1288
1289/// Checks a `RecordBatch` for `not null` constraints on specified columns.
1290///
1291/// # Arguments
1292///
1293/// * `batch` - The `RecordBatch` to be checked
1294/// * `column_indices` - A vector of column indices that should be checked for
1295/// `not null` constraints.
1296///
1297/// # Returns
1298///
1299/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
1300///
1301/// This function iterates over the specified column indices and ensures that none
1302/// of the columns contain null values. If any column contains null values, an error
1303/// is returned.
1304pub fn check_not_null_constraints(
1305 batch: RecordBatch,
1306 column_indices: &Vec<usize>,
1307) -> Result<RecordBatch> {
1308 for &index in column_indices {
1309 if batch.num_columns() <= index {
1310 return exec_err!(
1311 "Invalid batch column count {} expected > {}",
1312 batch.num_columns(),
1313 index
1314 );
1315 }
1316
1317 if batch
1318 .column(index)
1319 .logical_nulls()
1320 .map(|nulls| nulls.null_count())
1321 .unwrap_or_default()
1322 > 0
1323 {
1324 return exec_err!(
1325 "Invalid batch column at '{}' has null but schema specifies non-nullable",
1326 index
1327 );
1328 }
1329 }
1330
1331 Ok(batch)
1332}
1333
1334/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1335pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1336 let formatted = displayable(plan.as_ref()).indent(true).to_string();
1337 let actual: Vec<&str> = formatted.trim().lines().collect();
1338 actual.iter().map(|elem| elem.to_string()).collect()
1339}
1340
1341/// Indicates the effect an execution plan operator will have on the cardinality
1342/// of its input stream
1343pub enum CardinalityEffect {
1344 /// Unknown effect. This is the default
1345 Unknown,
1346 /// The operator is guaranteed to produce exactly one row for
1347 /// each input row
1348 Equal,
1349 /// The operator may produce fewer output rows than it receives input rows
1350 LowerEqual,
1351 /// The operator may produce more output rows than it receives input rows
1352 GreaterEqual,
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357 use std::any::Any;
1358 use std::sync::Arc;
1359
1360 use super::*;
1361 use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1362
1363 use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1364 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1365 use datafusion_common::{Result, Statistics};
1366 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1367
1368 #[derive(Debug)]
1369 pub struct EmptyExec;
1370
1371 impl EmptyExec {
1372 pub fn new(_schema: SchemaRef) -> Self {
1373 Self
1374 }
1375 }
1376
1377 impl DisplayAs for EmptyExec {
1378 fn fmt_as(
1379 &self,
1380 _t: DisplayFormatType,
1381 _f: &mut std::fmt::Formatter,
1382 ) -> std::fmt::Result {
1383 unimplemented!()
1384 }
1385 }
1386
1387 impl ExecutionPlan for EmptyExec {
1388 fn name(&self) -> &'static str {
1389 Self::static_name()
1390 }
1391
1392 fn as_any(&self) -> &dyn Any {
1393 self
1394 }
1395
1396 fn properties(&self) -> &PlanProperties {
1397 unimplemented!()
1398 }
1399
1400 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1401 vec![]
1402 }
1403
1404 fn with_new_children(
1405 self: Arc<Self>,
1406 _: Vec<Arc<dyn ExecutionPlan>>,
1407 ) -> Result<Arc<dyn ExecutionPlan>> {
1408 unimplemented!()
1409 }
1410
1411 fn execute(
1412 &self,
1413 _partition: usize,
1414 _context: Arc<TaskContext>,
1415 ) -> Result<SendableRecordBatchStream> {
1416 unimplemented!()
1417 }
1418
1419 fn statistics(&self) -> Result<Statistics> {
1420 unimplemented!()
1421 }
1422
1423 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1424 unimplemented!()
1425 }
1426 }
1427
1428 #[derive(Debug)]
1429 pub struct RenamedEmptyExec;
1430
1431 impl RenamedEmptyExec {
1432 pub fn new(_schema: SchemaRef) -> Self {
1433 Self
1434 }
1435 }
1436
1437 impl DisplayAs for RenamedEmptyExec {
1438 fn fmt_as(
1439 &self,
1440 _t: DisplayFormatType,
1441 _f: &mut std::fmt::Formatter,
1442 ) -> std::fmt::Result {
1443 unimplemented!()
1444 }
1445 }
1446
1447 impl ExecutionPlan for RenamedEmptyExec {
1448 fn name(&self) -> &'static str {
1449 Self::static_name()
1450 }
1451
1452 fn static_name() -> &'static str
1453 where
1454 Self: Sized,
1455 {
1456 "MyRenamedEmptyExec"
1457 }
1458
1459 fn as_any(&self) -> &dyn Any {
1460 self
1461 }
1462
1463 fn properties(&self) -> &PlanProperties {
1464 unimplemented!()
1465 }
1466
1467 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1468 vec![]
1469 }
1470
1471 fn with_new_children(
1472 self: Arc<Self>,
1473 _: Vec<Arc<dyn ExecutionPlan>>,
1474 ) -> Result<Arc<dyn ExecutionPlan>> {
1475 unimplemented!()
1476 }
1477
1478 fn execute(
1479 &self,
1480 _partition: usize,
1481 _context: Arc<TaskContext>,
1482 ) -> Result<SendableRecordBatchStream> {
1483 unimplemented!()
1484 }
1485
1486 fn statistics(&self) -> Result<Statistics> {
1487 unimplemented!()
1488 }
1489
1490 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1491 unimplemented!()
1492 }
1493 }
1494
1495 #[test]
1496 fn test_execution_plan_name() {
1497 let schema1 = Arc::new(Schema::empty());
1498 let default_name_exec = EmptyExec::new(schema1);
1499 assert_eq!(default_name_exec.name(), "EmptyExec");
1500
1501 let schema2 = Arc::new(Schema::empty());
1502 let renamed_exec = RenamedEmptyExec::new(schema2);
1503 assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1504 assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1505 }
1506
1507 /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1508 /// be called from a trait object.
1509 /// Related ticket: https://github.com/apache/datafusion/pull/11047
1510 #[allow(dead_code)]
1511 fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1512 let _ = plan.name();
1513 }
1514
1515 #[test]
1516 fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1517 check_not_null_constraints(
1518 RecordBatch::try_new(
1519 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1520 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1521 )?,
1522 &vec![0],
1523 )?;
1524 Ok(())
1525 }
1526
1527 #[test]
1528 fn test_check_not_null_constraints_reject_null() -> Result<()> {
1529 let result = check_not_null_constraints(
1530 RecordBatch::try_new(
1531 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1532 vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1533 )?,
1534 &vec![0],
1535 );
1536 assert!(result.is_err());
1537 assert_eq!(
1538 result.err().unwrap().strip_backtrace(),
1539 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1540 );
1541 Ok(())
1542 }
1543
1544 #[test]
1545 fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1546 // some null value inside REE array
1547 let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1548 let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1549 let run_end_array = RunArray::try_new(&run_ends, &values)?;
1550 let result = check_not_null_constraints(
1551 RecordBatch::try_new(
1552 Arc::new(Schema::new(vec![Field::new(
1553 "a",
1554 run_end_array.data_type().to_owned(),
1555 true,
1556 )])),
1557 vec![Arc::new(run_end_array)],
1558 )?,
1559 &vec![0],
1560 );
1561 assert!(result.is_err());
1562 assert_eq!(
1563 result.err().unwrap().strip_backtrace(),
1564 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1565 );
1566 Ok(())
1567 }
1568
1569 #[test]
1570 fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1571 let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1572 let keys = Int32Array::from(vec![0, 1, 2, 3]);
1573 let dictionary = DictionaryArray::new(keys, values);
1574 let result = check_not_null_constraints(
1575 RecordBatch::try_new(
1576 Arc::new(Schema::new(vec![Field::new(
1577 "a",
1578 dictionary.data_type().to_owned(),
1579 true,
1580 )])),
1581 vec![Arc::new(dictionary)],
1582 )?,
1583 &vec![0],
1584 );
1585 assert!(result.is_err());
1586 assert_eq!(
1587 result.err().unwrap().strip_backtrace(),
1588 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1589 );
1590 Ok(())
1591 }
1592
1593 #[test]
1594 fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1595 // some null value marked out by dictionary array
1596 let values = Arc::new(Int32Array::from(vec![
1597 Some(1),
1598 None, // this null value is masked by dictionary keys
1599 Some(3),
1600 Some(4),
1601 ]));
1602 let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1603 let dictionary = DictionaryArray::new(keys, values);
1604 check_not_null_constraints(
1605 RecordBatch::try_new(
1606 Arc::new(Schema::new(vec![Field::new(
1607 "a",
1608 dictionary.data_type().to_owned(),
1609 true,
1610 )])),
1611 vec![Arc::new(dictionary)],
1612 )?,
1613 &vec![0],
1614 )?;
1615 Ok(())
1616 }
1617
1618 #[test]
1619 fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1620 // null value of Null type
1621 let result = check_not_null_constraints(
1622 RecordBatch::try_new(
1623 Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1624 vec![Arc::new(NullArray::new(3))],
1625 )?,
1626 &vec![0],
1627 );
1628 assert!(result.is_err());
1629 assert_eq!(
1630 result.err().unwrap().strip_backtrace(),
1631 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1632 );
1633 Ok(())
1634 }
1635}