datafusion_physical_plan/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19use crate::filter_pushdown::{
20    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21    FilterPushdownPropagation,
22};
23pub use crate::metrics::Metric;
24pub use crate::ordering::InputOrderMode;
25use crate::sort_pushdown::SortOrderPushdownResult;
26pub use crate::stream::EmptyRecordBatchStream;
27
28pub use datafusion_common::hash_utils;
29pub use datafusion_common::utils::project_schema;
30pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
31pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
32pub use datafusion_expr::{Accumulator, ColumnarValue};
33pub use datafusion_physical_expr::window::WindowExpr;
34pub use datafusion_physical_expr::{
35    Distribution, Partitioning, PhysicalExpr, expressions,
36};
37
38use std::any::Any;
39use std::fmt::Debug;
40use std::sync::Arc;
41
42use crate::coalesce_partitions::CoalescePartitionsExec;
43use crate::display::DisplayableExecutionPlan;
44use crate::metrics::MetricsSet;
45use crate::projection::ProjectionExec;
46use crate::stream::RecordBatchStreamAdapter;
47
48use arrow::array::{Array, RecordBatch};
49use arrow::datatypes::SchemaRef;
50use datafusion_common::config::ConfigOptions;
51use datafusion_common::{
52    Constraints, DataFusionError, Result, assert_eq_or_internal_err,
53    assert_or_internal_err, exec_err,
54};
55use datafusion_common_runtime::JoinSet;
56use datafusion_execution::TaskContext;
57use datafusion_physical_expr::EquivalenceProperties;
58use datafusion_physical_expr_common::sort_expr::{
59    LexOrdering, OrderingRequirements, PhysicalSortExpr,
60};
61
62use futures::stream::{StreamExt, TryStreamExt};
63
64/// Represent nodes in the DataFusion Physical Plan.
65///
66/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
67/// [`RecordBatch`] that incrementally computes a partition of the
68/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
69/// details on partitioning.
70///
71/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
72/// properties of the output to the DataFusion optimizer, and methods such as
73/// [`required_input_distribution`] and [`required_input_ordering`] express
74/// requirements of the `ExecutionPlan` from its input.
75///
76/// [`ExecutionPlan`] can be displayed in a simplified form using the
77/// return value from [`displayable`] in addition to the (normally
78/// quite verbose) `Debug` output.
79///
80/// [`execute`]: ExecutionPlan::execute
81/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
82/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
83///
84/// # Examples
85///
86/// See [`datafusion-examples`] for examples, including
87/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom
88/// `ExecutionPlan` with memory tracking and spilling support.
89///
90/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
91/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
92pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
93    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
94    ///
95    /// Implementation note: this method can just proxy to
96    /// [`static_name`](ExecutionPlan::static_name) if no special action is
97    /// needed. It doesn't provide a default implementation like that because
98    /// this method doesn't require the `Sized` constrain to allow a wilder
99    /// range of use cases.
100    fn name(&self) -> &str;
101
102    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
103    /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
104    fn static_name() -> &'static str
105    where
106        Self: Sized,
107    {
108        let full_name = std::any::type_name::<Self>();
109        let maybe_start_idx = full_name.rfind(':');
110        match maybe_start_idx {
111            Some(start_idx) => &full_name[start_idx + 1..],
112            None => "UNKNOWN",
113        }
114    }
115
116    /// Returns the execution plan as [`Any`] so that it can be
117    /// downcast to a specific implementation.
118    fn as_any(&self) -> &dyn Any;
119
120    /// Get the schema for this execution plan
121    fn schema(&self) -> SchemaRef {
122        Arc::clone(self.properties().schema())
123    }
124
125    /// Return properties of the output of the `ExecutionPlan`, such as output
126    /// ordering(s), partitioning information etc.
127    ///
128    /// This information is available via methods on [`ExecutionPlanProperties`]
129    /// trait, which is implemented for all `ExecutionPlan`s.
130    fn properties(&self) -> &PlanProperties;
131
132    /// Returns an error if this individual node does not conform to its invariants.
133    /// These invariants are typically only checked in debug mode.
134    ///
135    /// A default set of invariants is provided in the [check_default_invariants] function.
136    /// The default implementation of `check_invariants` calls this function.
137    /// Extension nodes can provide their own invariants.
138    fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
139        check_default_invariants(self, check)
140    }
141
142    /// Specifies the data distribution requirements for all the
143    /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
144    fn required_input_distribution(&self) -> Vec<Distribution> {
145        vec![Distribution::UnspecifiedDistribution; self.children().len()]
146    }
147
148    /// Specifies the ordering required for all of the children of this
149    /// `ExecutionPlan`.
150    ///
151    /// For each child, it's the local ordering requirement within
152    /// each partition rather than the global ordering
153    ///
154    /// NOTE that checking `!is_empty()` does **not** check for a
155    /// required input ordering. Instead, the correct check is that at
156    /// least one entry must be `Some`
157    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
158        vec![None; self.children().len()]
159    }
160
161    /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
162    /// rows within or between partitions.
163    ///
164    /// For example, Projection, Filter, and Limit maintain the order
165    /// of inputs -- they may transform values (Projection) or not
166    /// produce the same number of rows that went in (Filter and
167    /// Limit), but the rows that are produced go in the same way.
168    ///
169    /// DataFusion uses this metadata to apply certain optimizations
170    /// such as automatically repartitioning correctly.
171    ///
172    /// The default implementation returns `false`
173    ///
174    /// WARNING: if you override this default, you *MUST* ensure that
175    /// the `ExecutionPlan`'s maintains the ordering invariant or else
176    /// DataFusion may produce incorrect results.
177    fn maintains_input_order(&self) -> Vec<bool> {
178        vec![false; self.children().len()]
179    }
180
181    /// Specifies whether the `ExecutionPlan` benefits from increased
182    /// parallelization at its input for each child.
183    ///
184    /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
185    /// its corresponding child (and thus from more parallelism). For
186    /// `ExecutionPlan` that do very little work the overhead of extra
187    /// parallelism may outweigh any benefits
188    ///
189    /// The default implementation returns `true` unless this `ExecutionPlan`
190    /// has signalled it requires a single child input partition.
191    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
192        // By default try to maximize parallelism with more CPUs if
193        // possible
194        self.required_input_distribution()
195            .into_iter()
196            .map(|dist| !matches!(dist, Distribution::SinglePartition))
197            .collect()
198    }
199
200    /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
201    /// The returned list will be empty for leaf nodes such as scans, will contain
202    /// a single value for unary nodes, or two values for binary nodes (such as
203    /// joins).
204    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
205
206    /// Returns a new `ExecutionPlan` where all existing children were replaced
207    /// by the `children`, in order
208    fn with_new_children(
209        self: Arc<Self>,
210        children: Vec<Arc<dyn ExecutionPlan>>,
211    ) -> Result<Arc<dyn ExecutionPlan>>;
212
213    /// Reset any internal state within this [`ExecutionPlan`].
214    ///
215    /// This method is called when an [`ExecutionPlan`] needs to be re-executed,
216    /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
217    /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
218    /// are reset to their initial state.
219    ///
220    /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
221    /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
222    /// necessarily resetting any internal state. Implementations that require resetting of some
223    /// internal state should override this method to provide the necessary logic.
224    ///
225    /// This method should *not* reset state recursively for children, as it is expected that
226    /// it will be called from within a walk of the execution plan tree so that it will be called on each child later
227    /// or was already called on each child.
228    ///
229    /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
230    /// thus it is expected that any cached plan properties will remain valid after the reset.
231    ///
232    /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
233    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
234        let children = self.children().into_iter().cloned().collect();
235        self.with_new_children(children)
236    }
237
238    /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
239    /// produce `target_partitions` partitions.
240    ///
241    /// If the `ExecutionPlan` does not support changing its partitioning,
242    /// returns `Ok(None)` (the default).
243    ///
244    /// It is the `ExecutionPlan` can increase its partitioning, but not to the
245    /// `target_partitions`, it may return an ExecutionPlan with fewer
246    /// partitions. This might happen, for example, if each new partition would
247    /// be too small to be efficiently processed individually.
248    ///
249    /// The DataFusion optimizer attempts to use as many threads as possible by
250    /// repartitioning its inputs to match the target number of threads
251    /// available (`target_partitions`). Some data sources, such as the built in
252    /// CSV and Parquet readers, implement this method as they are able to read
253    /// from their input files in parallel, regardless of how the source data is
254    /// split amongst files.
255    fn repartitioned(
256        &self,
257        _target_partitions: usize,
258        _config: &ConfigOptions,
259    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
260        Ok(None)
261    }
262
263    /// Begin execution of `partition`, returning a [`Stream`] of
264    /// [`RecordBatch`]es.
265    ///
266    /// # Notes
267    ///
268    /// The `execute` method itself is not `async` but it returns an `async`
269    /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
270    /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
271    /// Most `ExecutionPlan`s should not do any work before the first
272    /// `RecordBatch` is requested from the stream.
273    ///
274    /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
275    /// [`Stream`] into a [`SendableRecordBatchStream`].
276    ///
277    /// Using `async` `Streams` allows for network I/O during execution and
278    /// takes advantage of Rust's built in support for `async` continuations and
279    /// crate ecosystem.
280    ///
281    /// [`Stream`]: futures::stream::Stream
282    /// [`StreamExt`]: futures::stream::StreamExt
283    /// [`TryStreamExt`]: futures::stream::TryStreamExt
284    /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
285    ///
286    /// # Error handling
287    ///
288    /// Any error that occurs during execution is sent as an `Err` in the output
289    /// stream.
290    ///
291    /// `ExecutionPlan` implementations in DataFusion cancel additional work
292    /// immediately once an error occurs. The rationale is that if the overall
293    /// query will return an error,  any additional work such as continued
294    /// polling of inputs will be wasted as it will be thrown away.
295    ///
296    /// # Cancellation / Aborting Execution
297    ///
298    /// The [`Stream`] that is returned must ensure that any allocated resources
299    /// are freed when the stream itself is dropped. This is particularly
300    /// important for [`spawn`]ed tasks or threads. Unless care is taken to
301    /// "abort" such tasks, they may continue to consume resources even after
302    /// the plan is dropped, generating intermediate results that are never
303    /// used.
304    /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
305    ///
306    /// To enable timely cancellation, the [`Stream`] that is returned must not
307    /// block the CPU indefinitely and must yield back to the tokio runtime regularly.
308    /// In a typical [`ExecutionPlan`], this automatically happens unless there are
309    /// special circumstances; e.g. when the computational complexity of processing a
310    /// batch is superlinear. See this [general guideline][async-guideline] for more context
311    /// on this point, which explains why one should avoid spending a long time without
312    /// reaching an `await`/yield point in asynchronous runtimes.
313    /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
314    /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
315    /// [`tokio::task::yield_now()`] when appropriate.
316    /// In special cases that warrant manual yielding, determination for "regularly" may be
317    /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
318    /// a timer (being careful with the overhead-heavy system call needed to take the time), or by
319    /// counting rows or batches.
320    ///
321    /// The [cancellation benchmark] tracks some cases of how quickly queries can
322    /// be cancelled.
323    ///
324    /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
325    /// for structures to help ensure all background tasks are cancelled.
326    ///
327    /// [`spawn`]: tokio::task::spawn
328    /// [cancellation benchmark]: https://github.com/apache/datafusion/blob/main/benchmarks/README.md#cancellation
329    /// [`JoinSet`]: datafusion_common_runtime::JoinSet
330    /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
331    /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
332    /// [`Poll::Pending`]: std::task::Poll::Pending
333    /// [async-guideline]: https://ryhl.io/blog/async-what-is-blocking/
334    ///
335    /// # Implementation Examples
336    ///
337    /// While `async` `Stream`s have a non trivial learning curve, the
338    /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
339    /// which help simplify many common operations.
340    ///
341    /// Here are some common patterns:
342    ///
343    /// ## Return Precomputed `RecordBatch`
344    ///
345    /// We can return a precomputed `RecordBatch` as a `Stream`:
346    ///
347    /// ```
348    /// # use std::sync::Arc;
349    /// # use arrow::array::RecordBatch;
350    /// # use arrow::datatypes::SchemaRef;
351    /// # use datafusion_common::Result;
352    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
353    /// # use datafusion_physical_plan::memory::MemoryStream;
354    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
355    /// struct MyPlan {
356    ///     batch: RecordBatch,
357    /// }
358    ///
359    /// impl MyPlan {
360    ///     fn execute(
361    ///         &self,
362    ///         partition: usize,
363    ///         context: Arc<TaskContext>,
364    ///     ) -> Result<SendableRecordBatchStream> {
365    ///         // use functions from futures crate convert the batch into a stream
366    ///         let fut = futures::future::ready(Ok(self.batch.clone()));
367    ///         let stream = futures::stream::once(fut);
368    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
369    ///             self.batch.schema(),
370    ///             stream,
371    ///         )))
372    ///     }
373    /// }
374    /// ```
375    ///
376    /// ## Lazily (async) Compute `RecordBatch`
377    ///
378    /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
379    ///
380    /// ```
381    /// # use std::sync::Arc;
382    /// # use arrow::array::RecordBatch;
383    /// # use arrow::datatypes::SchemaRef;
384    /// # use datafusion_common::Result;
385    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
386    /// # use datafusion_physical_plan::memory::MemoryStream;
387    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
388    /// struct MyPlan {
389    ///     schema: SchemaRef,
390    /// }
391    ///
392    /// /// Returns a single batch when the returned stream is polled
393    /// async fn get_batch() -> Result<RecordBatch> {
394    ///     todo!()
395    /// }
396    ///
397    /// impl MyPlan {
398    ///     fn execute(
399    ///         &self,
400    ///         partition: usize,
401    ///         context: Arc<TaskContext>,
402    ///     ) -> Result<SendableRecordBatchStream> {
403    ///         let fut = get_batch();
404    ///         let stream = futures::stream::once(fut);
405    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
406    ///             self.schema.clone(),
407    ///             stream,
408    ///         )))
409    ///     }
410    /// }
411    /// ```
412    ///
413    /// ## Lazily (async) create a Stream
414    ///
415    /// If you need to create the return `Stream` using an `async` function,
416    /// you can do so by flattening the result:
417    ///
418    /// ```
419    /// # use std::sync::Arc;
420    /// # use arrow::array::RecordBatch;
421    /// # use arrow::datatypes::SchemaRef;
422    /// # use futures::TryStreamExt;
423    /// # use datafusion_common::Result;
424    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
425    /// # use datafusion_physical_plan::memory::MemoryStream;
426    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
427    /// struct MyPlan {
428    ///     schema: SchemaRef,
429    /// }
430    ///
431    /// /// async function that returns a stream
432    /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
433    ///     todo!()
434    /// }
435    ///
436    /// impl MyPlan {
437    ///     fn execute(
438    ///         &self,
439    ///         partition: usize,
440    ///         context: Arc<TaskContext>,
441    ///     ) -> Result<SendableRecordBatchStream> {
442    ///         // A future that yields a stream
443    ///         let fut = get_batch_stream();
444    ///         // Use TryStreamExt::try_flatten to flatten the stream of streams
445    ///         let stream = futures::stream::once(fut).try_flatten();
446    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
447    ///             self.schema.clone(),
448    ///             stream,
449    ///         )))
450    ///     }
451    /// }
452    /// ```
453    fn execute(
454        &self,
455        partition: usize,
456        context: Arc<TaskContext>,
457    ) -> Result<SendableRecordBatchStream>;
458
459    /// Return a snapshot of the set of [`Metric`]s for this
460    /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
461    ///
462    /// While the values of the metrics in the returned
463    /// [`MetricsSet`]s may change as execution progresses, the
464    /// specific metrics will not.
465    ///
466    /// Once `self.execute()` has returned (technically the future is
467    /// resolved) for all available partitions, the set of metrics
468    /// should be complete. If this function is called prior to
469    /// `execute()` new metrics may appear in subsequent calls.
470    fn metrics(&self) -> Option<MetricsSet> {
471        None
472    }
473
474    /// Returns statistics for this `ExecutionPlan` node. If statistics are not
475    /// available, should return [`Statistics::new_unknown`] (the default), not
476    /// an error.
477    ///
478    /// For TableScan executors, which supports filter pushdown, special attention
479    /// needs to be paid to whether the stats returned by this method are exact or not
480    #[deprecated(since = "48.0.0", note = "Use `partition_statistics` method instead")]
481    fn statistics(&self) -> Result<Statistics> {
482        Ok(Statistics::new_unknown(&self.schema()))
483    }
484
485    /// Returns statistics for a specific partition of this `ExecutionPlan` node.
486    /// If statistics are not available, should return [`Statistics::new_unknown`]
487    /// (the default), not an error.
488    /// If `partition` is `None`, it returns statistics for the entire plan.
489    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
490        if let Some(idx) = partition {
491            // Validate partition index
492            let partition_count = self.properties().partitioning.partition_count();
493            assert_or_internal_err!(
494                idx < partition_count,
495                "Invalid partition index: {}, the partition count is {}",
496                idx,
497                partition_count
498            );
499        }
500        Ok(Statistics::new_unknown(&self.schema()))
501    }
502
503    /// Returns `true` if a limit can be safely pushed down through this
504    /// `ExecutionPlan` node.
505    ///
506    /// If this method returns `true`, and the query plan contains a limit at
507    /// the output of this node, DataFusion will push the limit to the input
508    /// of this node.
509    fn supports_limit_pushdown(&self) -> bool {
510        false
511    }
512
513    /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
514    /// fetch limits. Returns `None` otherwise.
515    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
516        None
517    }
518
519    /// Gets the fetch count for the operator, `None` means there is no fetch.
520    fn fetch(&self) -> Option<usize> {
521        None
522    }
523
524    /// Gets the effect on cardinality, if known
525    fn cardinality_effect(&self) -> CardinalityEffect {
526        CardinalityEffect::Unknown
527    }
528
529    /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
530    ///
531    /// If the operator supports this optimization, the resulting plan will be:
532    /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
533    /// Otherwise, it returns the current `ExecutionPlan` as-is.
534    ///
535    /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
536    /// or not possible, or `Err` on failure.
537    fn try_swapping_with_projection(
538        &self,
539        _projection: &ProjectionExec,
540    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
541        Ok(None)
542    }
543
544    /// Collect filters that this node can push down to its children.
545    /// Filters that are being pushed down from parents are passed in,
546    /// and the node may generate additional filters to push down.
547    /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
548    /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`:
549    /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent
550    ///    filters so it only returns that `FilterExec` wants to push down its own predicate.
551    /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from
552    ///    `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key)
553    ///    but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
554    /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec`
555    ///    and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything
556    ///    since it has no children and no additional filters to push down.
557    ///    It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse
558    ///    up the plan that `DataSourceExec` can actually bind the filters.
559    ///
560    /// The default implementation bars all parent filters from being pushed down and adds no new filters.
561    /// This is the safest option, making filter pushdown opt-in on a per-node pasis.
562    ///
563    /// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
564    /// Depending on the phase the operator may or may not be allowed to modify the plan.
565    /// See [`FilterPushdownPhase`] for more details.
566    fn gather_filters_for_pushdown(
567        &self,
568        _phase: FilterPushdownPhase,
569        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
570        _config: &ConfigOptions,
571    ) -> Result<FilterDescription> {
572        Ok(FilterDescription::all_unsupported(
573            &parent_filters,
574            &self.children(),
575        ))
576    }
577
578    /// Handle the result of a child pushdown.
579    /// This method is called as we recurse back up the plan tree after pushing
580    /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
581    /// It allows the current node to process the results of filter pushdown from
582    /// its children, deciding whether to absorb filters, modify the plan, or pass
583    /// filters back up to its parent.
584    ///
585    /// **Purpose and Context:**
586    /// Filter pushdown is a critical optimization in DataFusion that aims to
587    /// reduce the amount of data processed by applying filters as early as
588    /// possible in the query plan. This method is part of the second phase of
589    /// filter pushdown, where results are propagated back up the tree after
590    /// being pushed down. Each node can inspect the pushdown results from its
591    /// children and decide how to handle any unapplied filters, potentially
592    /// optimizing the plan structure or filter application.
593    ///
594    /// **Behavior in Different Nodes:**
595    /// - For a `DataSourceExec`, this often means absorbing the filters to apply
596    ///   them during the scan phase (late materialization), reducing the data
597    ///   read from the source.
598    /// - A `FilterExec` may absorb any filters its children could not handle,
599    ///   combining them with its own predicate. If no filters remain (i.e., the
600    ///   predicate becomes trivially true), it may remove itself from the plan
601    ///   altogether. It typically marks parent filters as supported, indicating
602    ///   they have been handled.
603    /// - A `HashJoinExec` might ignore the pushdown result if filters need to
604    ///   be applied during the join operation. It passes the parent filters back
605    ///   up wrapped in [`FilterPushdownPropagation::if_any`], discarding
606    ///   any self-filters from children.
607    ///
608    /// **Example Walkthrough:**
609    /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
610    /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
611    ///    `FilterExec`, the filter `f1` is gathered and pushed down to
612    ///    `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
613    ///    the join or add its own filters (e.g., a min-max filter from the build side),
614    ///    then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
615    ///    has no children to push to, so it prepares to handle filters in the
616    ///    upward phase.
617    /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
618    ///    `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
619    ///    for late materialization during scanning, marking them as supported.
620    ///    `HashJoinExec` receives the result, decides whether to apply any
621    ///    remaining filters during the join, and passes unhandled filters back
622    ///    up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
623    ///    updates its predicate if necessary, or removes itself if the predicate
624    ///    becomes trivial (e.g., `lit(true)`), and marks filters as supported
625    ///    for its parent.
626    ///
627    /// The default implementation is a no-op that passes the result of pushdown
628    /// from the children to its parent transparently, ensuring no filters are
629    /// lost if a node does not override this behavior.
630    ///
631    /// **Notes for Implementation:**
632    /// When returning filters via [`FilterPushdownPropagation`], the order of
633    /// filters need not match the order they were passed in via
634    /// `child_pushdown_result`. However, preserving the order is recommended for
635    /// debugging and ease of reasoning about the resulting plans.
636    ///
637    /// **Helper Methods for Customization:**
638    /// There are various helper methods to simplify implementing this method:
639    /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as
640    ///   supported as long as at least one child supports them.
641    /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as
642    ///   supported as long as all children support them.
643    /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters
644    ///   to the propagation result, indicating which filters are supported by
645    ///   the current node.
646    /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
647    ///   current node in the propagation result, used if the node
648    ///   has modified its plan based on the pushdown results.
649    ///
650    /// **Filter Pushdown Phases:**
651    /// There are two different phases in filter pushdown (`Pre` and others),
652    /// which some operators may handle differently. Depending on the phase, the
653    /// operator may or may not be allowed to modify the plan. See
654    /// [`FilterPushdownPhase`] for more details on phase-specific behavior.
655    ///
656    /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported
657    fn handle_child_pushdown_result(
658        &self,
659        _phase: FilterPushdownPhase,
660        child_pushdown_result: ChildPushdownResult,
661        _config: &ConfigOptions,
662    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
663        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
664    }
665
666    /// Injects arbitrary run-time state into this execution plan, returning a new plan
667    /// instance that incorporates that state *if* it is relevant to the concrete
668    /// node implementation.
669    ///
670    /// This is a generic entry point: the `state` can be any type wrapped in
671    /// `Arc<dyn Any + Send + Sync>`.  A node that cares about the state should
672    /// down-cast it to the concrete type it expects and, if successful, return a
673    /// modified copy of itself that captures the provided value.  If the state is
674    /// not applicable, the default behaviour is to return `None` so that parent
675    /// nodes can continue propagating the attempt further down the plan tree.
676    ///
677    /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
678    /// down-casts the supplied state to an `Arc<WorkTable>`
679    /// in order to wire up the working table used during recursive-CTE execution.
680    /// Similar patterns can be followed by custom nodes that need late-bound
681    /// dependencies or shared state.
682    fn with_new_state(
683        &self,
684        _state: Arc<dyn Any + Send + Sync>,
685    ) -> Option<Arc<dyn ExecutionPlan>> {
686        None
687    }
688
689    /// Try to push down sort ordering requirements to this node.
690    ///
691    /// This method is called during sort pushdown optimization to determine if this
692    /// node can optimize for a requested sort ordering. Implementations should:
693    ///
694    /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact
695    ///   ordering (allowing the Sort operator to be removed)
696    /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the
697    ///   ordering but cannot guarantee perfect sorting (Sort operator is kept)
698    /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize
699    ///   for the ordering
700    ///
701    /// For transparent nodes (that preserve ordering), implement this to delegate to
702    /// children and wrap the result with a new instance of this node.
703    ///
704    /// Default implementation returns `Unsupported`.
705    fn try_pushdown_sort(
706        &self,
707        _order: &[PhysicalSortExpr],
708    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
709        Ok(SortOrderPushdownResult::Unsupported)
710    }
711}
712
713/// [`ExecutionPlan`] Invariant Level
714///
715/// What set of assertions ([Invariant]s)  holds for a particular `ExecutionPlan`
716///
717/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
718#[derive(Clone, Copy)]
719pub enum InvariantLevel {
720    /// Invariants that are always true for the [`ExecutionPlan`] node
721    /// such as the number of expected children.
722    Always,
723    /// Invariants that must hold true for the [`ExecutionPlan`] node
724    /// to be "executable", such as ordering and/or distribution requirements
725    /// being fulfilled.
726    Executable,
727}
728
729/// Extension trait provides an easy API to fetch various properties of
730/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
731pub trait ExecutionPlanProperties {
732    /// Specifies how the output of this `ExecutionPlan` is split into
733    /// partitions.
734    fn output_partitioning(&self) -> &Partitioning;
735
736    /// If the output of this `ExecutionPlan` within each partition is sorted,
737    /// returns `Some(keys)` describing the ordering. A `None` return value
738    /// indicates no assumptions should be made on the output ordering.
739    ///
740    /// For example, `SortExec` (obviously) produces sorted output as does
741    /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
742    /// output if its input is sorted as it does not reorder the input rows.
743    fn output_ordering(&self) -> Option<&LexOrdering>;
744
745    /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
746    /// For more details, see [`Boundedness`].
747    fn boundedness(&self) -> Boundedness;
748
749    /// Indicates how the stream of this `ExecutionPlan` emits its results.
750    /// For more details, see [`EmissionType`].
751    fn pipeline_behavior(&self) -> EmissionType;
752
753    /// Get the [`EquivalenceProperties`] within the plan.
754    ///
755    /// Equivalence properties tell DataFusion what columns are known to be
756    /// equal, during various optimization passes. By default, this returns "no
757    /// known equivalences" which is always correct, but may cause DataFusion to
758    /// unnecessarily resort data.
759    ///
760    /// If this ExecutionPlan makes no changes to the schema of the rows flowing
761    /// through it or how columns within each row relate to each other, it
762    /// should return the equivalence properties of its input. For
763    /// example, since [`FilterExec`] may remove rows from its input, but does not
764    /// otherwise modify them, it preserves its input equivalence properties.
765    /// However, since `ProjectionExec` may calculate derived expressions, it
766    /// needs special handling.
767    ///
768    /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
769    /// for related concepts.
770    ///
771    /// [`FilterExec`]: crate::filter::FilterExec
772    fn equivalence_properties(&self) -> &EquivalenceProperties;
773}
774
775impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
776    fn output_partitioning(&self) -> &Partitioning {
777        self.properties().output_partitioning()
778    }
779
780    fn output_ordering(&self) -> Option<&LexOrdering> {
781        self.properties().output_ordering()
782    }
783
784    fn boundedness(&self) -> Boundedness {
785        self.properties().boundedness
786    }
787
788    fn pipeline_behavior(&self) -> EmissionType {
789        self.properties().emission_type
790    }
791
792    fn equivalence_properties(&self) -> &EquivalenceProperties {
793        self.properties().equivalence_properties()
794    }
795}
796
797impl ExecutionPlanProperties for &dyn ExecutionPlan {
798    fn output_partitioning(&self) -> &Partitioning {
799        self.properties().output_partitioning()
800    }
801
802    fn output_ordering(&self) -> Option<&LexOrdering> {
803        self.properties().output_ordering()
804    }
805
806    fn boundedness(&self) -> Boundedness {
807        self.properties().boundedness
808    }
809
810    fn pipeline_behavior(&self) -> EmissionType {
811        self.properties().emission_type
812    }
813
814    fn equivalence_properties(&self) -> &EquivalenceProperties {
815        self.properties().equivalence_properties()
816    }
817}
818
819/// Represents whether a stream of data **generated** by an operator is bounded (finite)
820/// or unbounded (infinite).
821///
822/// This is used to determine whether an execution plan will eventually complete
823/// processing all its data (bounded) or could potentially run forever (unbounded).
824///
825/// For unbounded streams, it also tracks whether the operator requires finite memory
826/// to process the stream or if memory usage could grow unbounded.
827///
828/// Boundedness of the output stream is based on the boundedness of the input stream and the nature of
829/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
830#[derive(Debug, Clone, Copy, PartialEq, Eq)]
831pub enum Boundedness {
832    /// The data stream is bounded (finite) and will eventually complete
833    Bounded,
834    /// The data stream is unbounded (infinite) and could run forever
835    Unbounded {
836        /// Whether this operator requires infinite memory to process the unbounded stream.
837        /// If false, the operator can process an infinite stream with bounded memory.
838        /// If true, memory usage may grow unbounded while processing the stream.
839        ///
840        /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
841        /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
842        requires_infinite_memory: bool,
843    },
844}
845
846impl Boundedness {
847    pub fn is_unbounded(&self) -> bool {
848        matches!(self, Boundedness::Unbounded { .. })
849    }
850}
851
852/// Represents how an operator emits its output records.
853///
854/// This is used to determine whether an operator emits records incrementally as they arrive,
855/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
856/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
857///
858/// For example, in the following plan:
859/// ```text
860///   SortExec [EmissionType::Final]
861///     |_ on: [col1 ASC]
862///     FilterExec [EmissionType::Incremental]
863///       |_ pred: col2 > 100
864///       DataSourceExec [EmissionType::Incremental]
865///         |_ file: "data.csv"
866/// ```
867/// - DataSourceExec emits records incrementally as it reads from the file
868/// - FilterExec processes and emits filtered records incrementally as they arrive
869/// - SortExec must wait for all input records before it can emit the sorted result,
870///   since it needs to see all values to determine their final order
871///
872/// Left joins can emit both incrementally and finally:
873/// - Incrementally emit matches as they are found
874/// - Finally emit non-matches after all input is processed
875#[derive(Debug, Clone, Copy, PartialEq, Eq)]
876pub enum EmissionType {
877    /// Records are emitted incrementally as they arrive and are processed
878    Incremental,
879    /// Records are only emitted once all input has been processed
880    Final,
881    /// Records can be emitted both incrementally and as a final result
882    Both,
883}
884
885/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the
886/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details.
887#[derive(Debug, Clone, Copy, PartialEq, Eq)]
888pub enum SchedulingType {
889    /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in
890    /// cooperative scheduling. This means the implementation of the `Stream` returned by
891    /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as
892    /// [`tokio::task::coop::consume_budget`].
893    ///
894    /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to
895    /// the [`coop`](crate::coop) module for details on when it may be useful to use
896    /// `Cooperative` instead.
897    NonCooperative,
898    /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in
899    /// cooperative scheduling by consuming task budget when it was able to produce a
900    /// [`RecordBatch`].
901    Cooperative,
902}
903
904/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
905///
906/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
907/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
908///
909/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
910/// is known as data-driven or eager evaluation.
911#[derive(Debug, Clone, Copy, PartialEq, Eq)]
912pub enum EvaluationType {
913    /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
914    /// instances when it is demanded by invoking `Stream::poll_next`.
915    /// Filter, projection, and join are examples of such lazy operators.
916    ///
917    /// Lazy operators are also known as demand-driven operators.
918    Lazy,
919    /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
920    /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
921    /// `Stream::poll_next` is called.
922    /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
923    ///
924    /// Eager operators are also known as a data-driven operators.
925    Eager,
926}
927
928/// Utility to determine an operator's boundedness based on its children's boundedness.
929///
930/// Assumes boundedness can be inferred from child operators:
931/// - Unbounded (requires_infinite_memory: true) takes precedence.
932/// - Unbounded (requires_infinite_memory: false) is considered next.
933/// - Otherwise, the operator is bounded.
934///
935/// **Note:** This is a general-purpose utility and may not apply to
936/// all multi-child operators. Ensure your operator's behavior aligns
937/// with these assumptions before using.
938pub(crate) fn boundedness_from_children<'a>(
939    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
940) -> Boundedness {
941    let mut unbounded_with_finite_mem = false;
942
943    for child in children {
944        match child.boundedness() {
945            Boundedness::Unbounded {
946                requires_infinite_memory: true,
947            } => {
948                return Boundedness::Unbounded {
949                    requires_infinite_memory: true,
950                };
951            }
952            Boundedness::Unbounded {
953                requires_infinite_memory: false,
954            } => {
955                unbounded_with_finite_mem = true;
956            }
957            Boundedness::Bounded => {}
958        }
959    }
960
961    if unbounded_with_finite_mem {
962        Boundedness::Unbounded {
963            requires_infinite_memory: false,
964        }
965    } else {
966        Boundedness::Bounded
967    }
968}
969
970/// Determines the emission type of an operator based on its children's pipeline behavior.
971///
972/// The precedence of emission types is:
973/// - `Final` has the highest precedence.
974/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
975/// - `Incremental` is the default if all children emit incremental results.
976///
977/// **Note:** This is a general-purpose utility and may not apply to
978/// all multi-child operators. Verify your operator's behavior aligns
979/// with these assumptions.
980pub(crate) fn emission_type_from_children<'a>(
981    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
982) -> EmissionType {
983    let mut inc_and_final = false;
984
985    for child in children {
986        match child.pipeline_behavior() {
987            EmissionType::Final => return EmissionType::Final,
988            EmissionType::Both => inc_and_final = true,
989            EmissionType::Incremental => continue,
990        }
991    }
992
993    if inc_and_final {
994        EmissionType::Both
995    } else {
996        EmissionType::Incremental
997    }
998}
999
1000/// Stores certain, often expensive to compute, plan properties used in query
1001/// optimization.
1002///
1003/// These properties are stored a single structure to permit this information to
1004/// be computed once and then those cached results used multiple times without
1005/// recomputation (aka a cache)
1006#[derive(Debug, Clone)]
1007pub struct PlanProperties {
1008    /// See [ExecutionPlanProperties::equivalence_properties]
1009    pub eq_properties: EquivalenceProperties,
1010    /// See [ExecutionPlanProperties::output_partitioning]
1011    pub partitioning: Partitioning,
1012    /// See [ExecutionPlanProperties::pipeline_behavior]
1013    pub emission_type: EmissionType,
1014    /// See [ExecutionPlanProperties::boundedness]
1015    pub boundedness: Boundedness,
1016    pub evaluation_type: EvaluationType,
1017    pub scheduling_type: SchedulingType,
1018    /// See [ExecutionPlanProperties::output_ordering]
1019    output_ordering: Option<LexOrdering>,
1020}
1021
1022impl PlanProperties {
1023    /// Construct a new `PlanPropertiesCache` from the
1024    pub fn new(
1025        eq_properties: EquivalenceProperties,
1026        partitioning: Partitioning,
1027        emission_type: EmissionType,
1028        boundedness: Boundedness,
1029    ) -> Self {
1030        // Output ordering can be derived from `eq_properties`.
1031        let output_ordering = eq_properties.output_ordering();
1032        Self {
1033            eq_properties,
1034            partitioning,
1035            emission_type,
1036            boundedness,
1037            evaluation_type: EvaluationType::Lazy,
1038            scheduling_type: SchedulingType::NonCooperative,
1039            output_ordering,
1040        }
1041    }
1042
1043    /// Overwrite output partitioning with its new value.
1044    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
1045        self.partitioning = partitioning;
1046        self
1047    }
1048
1049    /// Overwrite equivalence properties with its new value.
1050    pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
1051        // Changing equivalence properties also changes output ordering, so
1052        // make sure to overwrite it:
1053        self.output_ordering = eq_properties.output_ordering();
1054        self.eq_properties = eq_properties;
1055        self
1056    }
1057
1058    /// Overwrite boundedness with its new value.
1059    pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
1060        self.boundedness = boundedness;
1061        self
1062    }
1063
1064    /// Overwrite emission type with its new value.
1065    pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
1066        self.emission_type = emission_type;
1067        self
1068    }
1069
1070    /// Set the [`SchedulingType`].
1071    ///
1072    /// Defaults to [`SchedulingType::NonCooperative`]
1073    pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
1074        self.scheduling_type = scheduling_type;
1075        self
1076    }
1077
1078    /// Set the [`EvaluationType`].
1079    ///
1080    /// Defaults to [`EvaluationType::Lazy`]
1081    pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
1082        self.evaluation_type = drive_type;
1083        self
1084    }
1085
1086    /// Overwrite constraints with its new value.
1087    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1088        self.eq_properties = self.eq_properties.with_constraints(constraints);
1089        self
1090    }
1091
1092    pub fn equivalence_properties(&self) -> &EquivalenceProperties {
1093        &self.eq_properties
1094    }
1095
1096    pub fn output_partitioning(&self) -> &Partitioning {
1097        &self.partitioning
1098    }
1099
1100    pub fn output_ordering(&self) -> Option<&LexOrdering> {
1101        self.output_ordering.as_ref()
1102    }
1103
1104    /// Get schema of the node.
1105    pub(crate) fn schema(&self) -> &SchemaRef {
1106        self.eq_properties.schema()
1107    }
1108}
1109
1110macro_rules! check_len {
1111    ($target:expr, $func_name:ident, $expected_len:expr) => {
1112        let actual_len = $target.$func_name().len();
1113        assert_eq_or_internal_err!(
1114            actual_len,
1115            $expected_len,
1116            "{}::{} returned Vec with incorrect size: {} != {}",
1117            $target.name(),
1118            stringify!($func_name),
1119            actual_len,
1120            $expected_len
1121        );
1122    };
1123}
1124
1125/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1126/// Returns an error if the given node does not conform.
1127pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1128    plan: &P,
1129    _check: InvariantLevel,
1130) -> Result<(), DataFusionError> {
1131    let children_len = plan.children().len();
1132
1133    check_len!(plan, maintains_input_order, children_len);
1134    check_len!(plan, required_input_ordering, children_len);
1135    check_len!(plan, required_input_distribution, children_len);
1136    check_len!(plan, benefits_from_input_partitioning, children_len);
1137
1138    Ok(())
1139}
1140
1141/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
1142/// especially for the distributed engine to judge whether need to deal with shuffling.
1143/// Currently, there are 3 kinds of execution plan which needs data exchange
1144///     1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
1145///     2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
1146///     3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1147#[expect(clippy::needless_pass_by_value)]
1148pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
1149    plan.properties().evaluation_type == EvaluationType::Eager
1150}
1151
1152/// Returns a copy of this plan if we change any child according to the pointer comparison.
1153/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1154pub fn with_new_children_if_necessary(
1155    plan: Arc<dyn ExecutionPlan>,
1156    children: Vec<Arc<dyn ExecutionPlan>>,
1157) -> Result<Arc<dyn ExecutionPlan>> {
1158    let old_children = plan.children();
1159    assert_eq_or_internal_err!(
1160        children.len(),
1161        old_children.len(),
1162        "Wrong number of children"
1163    );
1164    if children.is_empty()
1165        || children
1166            .iter()
1167            .zip(old_children.iter())
1168            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
1169    {
1170        plan.with_new_children(children)
1171    } else {
1172        Ok(plan)
1173    }
1174}
1175
1176/// Return a [`DisplayableExecutionPlan`] wrapper around an
1177/// [`ExecutionPlan`] which can be displayed in various easier to
1178/// understand ways.
1179///
1180/// See examples on [`DisplayableExecutionPlan`]
1181pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
1182    DisplayableExecutionPlan::new(plan)
1183}
1184
1185/// Execute the [ExecutionPlan] and collect the results in memory
1186pub async fn collect(
1187    plan: Arc<dyn ExecutionPlan>,
1188    context: Arc<TaskContext>,
1189) -> Result<Vec<RecordBatch>> {
1190    let stream = execute_stream(plan, context)?;
1191    crate::common::collect(stream).await
1192}
1193
1194/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
1195///
1196/// See [collect] to buffer the `RecordBatch`es in memory.
1197///
1198/// # Aborting Execution
1199///
1200/// Dropping the stream will abort the execution of the query, and free up
1201/// any allocated resources
1202#[expect(
1203    clippy::needless_pass_by_value,
1204    reason = "Public API that historically takes owned Arcs"
1205)]
1206pub fn execute_stream(
1207    plan: Arc<dyn ExecutionPlan>,
1208    context: Arc<TaskContext>,
1209) -> Result<SendableRecordBatchStream> {
1210    match plan.output_partitioning().partition_count() {
1211        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1212        1 => plan.execute(0, context),
1213        2.. => {
1214            // merge into a single partition
1215            let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
1216            // CoalescePartitionsExec must produce a single partition
1217            assert_eq!(1, plan.properties().output_partitioning().partition_count());
1218            plan.execute(0, context)
1219        }
1220    }
1221}
1222
1223/// Execute the [ExecutionPlan] and collect the results in memory
1224pub async fn collect_partitioned(
1225    plan: Arc<dyn ExecutionPlan>,
1226    context: Arc<TaskContext>,
1227) -> Result<Vec<Vec<RecordBatch>>> {
1228    let streams = execute_stream_partitioned(plan, context)?;
1229
1230    let mut join_set = JoinSet::new();
1231    // Execute the plan and collect the results into batches.
1232    streams.into_iter().enumerate().for_each(|(idx, stream)| {
1233        join_set.spawn(async move {
1234            let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
1235            (idx, result)
1236        });
1237    });
1238
1239    let mut batches = vec![];
1240    // Note that currently this doesn't identify the thread that panicked
1241    //
1242    // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
1243    // once it is stable
1244    while let Some(result) = join_set.join_next().await {
1245        match result {
1246            Ok((idx, res)) => batches.push((idx, res?)),
1247            Err(e) => {
1248                if e.is_panic() {
1249                    std::panic::resume_unwind(e.into_panic());
1250                } else {
1251                    unreachable!();
1252                }
1253            }
1254        }
1255    }
1256
1257    batches.sort_by_key(|(idx, _)| *idx);
1258    let batches = batches.into_iter().map(|(_, batch)| batch).collect();
1259
1260    Ok(batches)
1261}
1262
1263/// Execute the [ExecutionPlan] and return a vec with one stream per output
1264/// partition
1265///
1266/// # Aborting Execution
1267///
1268/// Dropping the stream will abort the execution of the query, and free up
1269/// any allocated resources
1270#[expect(
1271    clippy::needless_pass_by_value,
1272    reason = "Public API that historically takes owned Arcs"
1273)]
1274pub fn execute_stream_partitioned(
1275    plan: Arc<dyn ExecutionPlan>,
1276    context: Arc<TaskContext>,
1277) -> Result<Vec<SendableRecordBatchStream>> {
1278    let num_partitions = plan.output_partitioning().partition_count();
1279    let mut streams = Vec::with_capacity(num_partitions);
1280    for i in 0..num_partitions {
1281        streams.push(plan.execute(i, Arc::clone(&context))?);
1282    }
1283    Ok(streams)
1284}
1285
1286/// Executes an input stream and ensures that the resulting stream adheres to
1287/// the `not null` constraints specified in the `sink_schema`.
1288///
1289/// # Arguments
1290///
1291/// * `input` - An execution plan
1292/// * `sink_schema` - The schema to be applied to the output stream
1293/// * `partition` - The partition index to be executed
1294/// * `context` - The task context
1295///
1296/// # Returns
1297///
1298/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
1299///
1300/// This function first executes the given input plan for the specified partition
1301/// and context. It then checks if there are any columns in the input that might
1302/// violate the `not null` constraints specified in the `sink_schema`. If there are
1303/// such columns, it wraps the resulting stream to enforce the `not null` constraints
1304/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1305#[expect(
1306    clippy::needless_pass_by_value,
1307    reason = "Public API that historically takes owned Arcs"
1308)]
1309pub fn execute_input_stream(
1310    input: Arc<dyn ExecutionPlan>,
1311    sink_schema: SchemaRef,
1312    partition: usize,
1313    context: Arc<TaskContext>,
1314) -> Result<SendableRecordBatchStream> {
1315    let input_stream = input.execute(partition, context)?;
1316
1317    debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
1318
1319    // Find input columns that may violate the not null constraint.
1320    let risky_columns: Vec<_> = sink_schema
1321        .fields()
1322        .iter()
1323        .zip(input.schema().fields().iter())
1324        .enumerate()
1325        .filter_map(|(idx, (sink_field, input_field))| {
1326            (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
1327        })
1328        .collect();
1329
1330    if risky_columns.is_empty() {
1331        Ok(input_stream)
1332    } else {
1333        // Check not null constraint on the input stream
1334        Ok(Box::pin(RecordBatchStreamAdapter::new(
1335            sink_schema,
1336            input_stream
1337                .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
1338        )))
1339    }
1340}
1341
1342/// Checks a `RecordBatch` for `not null` constraints on specified columns.
1343///
1344/// # Arguments
1345///
1346/// * `batch` - The `RecordBatch` to be checked
1347/// * `column_indices` - A vector of column indices that should be checked for
1348///   `not null` constraints.
1349///
1350/// # Returns
1351///
1352/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
1353///
1354/// This function iterates over the specified column indices and ensures that none
1355/// of the columns contain null values. If any column contains null values, an error
1356/// is returned.
1357pub fn check_not_null_constraints(
1358    batch: RecordBatch,
1359    column_indices: &Vec<usize>,
1360) -> Result<RecordBatch> {
1361    for &index in column_indices {
1362        if batch.num_columns() <= index {
1363            return exec_err!(
1364                "Invalid batch column count {} expected > {}",
1365                batch.num_columns(),
1366                index
1367            );
1368        }
1369
1370        if batch
1371            .column(index)
1372            .logical_nulls()
1373            .map(|nulls| nulls.null_count())
1374            .unwrap_or_default()
1375            > 0
1376        {
1377            return exec_err!(
1378                "Invalid batch column at '{}' has null but schema specifies non-nullable",
1379                index
1380            );
1381        }
1382    }
1383
1384    Ok(batch)
1385}
1386
1387/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1388pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1389    let formatted = displayable(plan.as_ref()).indent(true).to_string();
1390    let actual: Vec<&str> = formatted.trim().lines().collect();
1391    actual.iter().map(|elem| (*elem).to_string()).collect()
1392}
1393
1394/// Indicates the effect an execution plan operator will have on the cardinality
1395/// of its input stream
1396pub enum CardinalityEffect {
1397    /// Unknown effect. This is the default
1398    Unknown,
1399    /// The operator is guaranteed to produce exactly one row for
1400    /// each input row
1401    Equal,
1402    /// The operator may produce fewer output rows than it receives input rows
1403    LowerEqual,
1404    /// The operator may produce more output rows than it receives input rows
1405    GreaterEqual,
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410    use std::any::Any;
1411    use std::sync::Arc;
1412
1413    use super::*;
1414    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1415
1416    use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1417    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1418    use datafusion_common::{Result, Statistics};
1419    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1420
1421    #[derive(Debug)]
1422    pub struct EmptyExec;
1423
1424    impl EmptyExec {
1425        pub fn new(_schema: SchemaRef) -> Self {
1426            Self
1427        }
1428    }
1429
1430    impl DisplayAs for EmptyExec {
1431        fn fmt_as(
1432            &self,
1433            _t: DisplayFormatType,
1434            _f: &mut std::fmt::Formatter,
1435        ) -> std::fmt::Result {
1436            unimplemented!()
1437        }
1438    }
1439
1440    impl ExecutionPlan for EmptyExec {
1441        fn name(&self) -> &'static str {
1442            Self::static_name()
1443        }
1444
1445        fn as_any(&self) -> &dyn Any {
1446            self
1447        }
1448
1449        fn properties(&self) -> &PlanProperties {
1450            unimplemented!()
1451        }
1452
1453        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1454            vec![]
1455        }
1456
1457        fn with_new_children(
1458            self: Arc<Self>,
1459            _: Vec<Arc<dyn ExecutionPlan>>,
1460        ) -> Result<Arc<dyn ExecutionPlan>> {
1461            unimplemented!()
1462        }
1463
1464        fn execute(
1465            &self,
1466            _partition: usize,
1467            _context: Arc<TaskContext>,
1468        ) -> Result<SendableRecordBatchStream> {
1469            unimplemented!()
1470        }
1471
1472        fn statistics(&self) -> Result<Statistics> {
1473            unimplemented!()
1474        }
1475
1476        fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1477            unimplemented!()
1478        }
1479    }
1480
1481    #[derive(Debug)]
1482    pub struct RenamedEmptyExec;
1483
1484    impl RenamedEmptyExec {
1485        pub fn new(_schema: SchemaRef) -> Self {
1486            Self
1487        }
1488    }
1489
1490    impl DisplayAs for RenamedEmptyExec {
1491        fn fmt_as(
1492            &self,
1493            _t: DisplayFormatType,
1494            _f: &mut std::fmt::Formatter,
1495        ) -> std::fmt::Result {
1496            unimplemented!()
1497        }
1498    }
1499
1500    impl ExecutionPlan for RenamedEmptyExec {
1501        fn name(&self) -> &'static str {
1502            Self::static_name()
1503        }
1504
1505        fn static_name() -> &'static str
1506        where
1507            Self: Sized,
1508        {
1509            "MyRenamedEmptyExec"
1510        }
1511
1512        fn as_any(&self) -> &dyn Any {
1513            self
1514        }
1515
1516        fn properties(&self) -> &PlanProperties {
1517            unimplemented!()
1518        }
1519
1520        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1521            vec![]
1522        }
1523
1524        fn with_new_children(
1525            self: Arc<Self>,
1526            _: Vec<Arc<dyn ExecutionPlan>>,
1527        ) -> Result<Arc<dyn ExecutionPlan>> {
1528            unimplemented!()
1529        }
1530
1531        fn execute(
1532            &self,
1533            _partition: usize,
1534            _context: Arc<TaskContext>,
1535        ) -> Result<SendableRecordBatchStream> {
1536            unimplemented!()
1537        }
1538
1539        fn statistics(&self) -> Result<Statistics> {
1540            unimplemented!()
1541        }
1542
1543        fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1544            unimplemented!()
1545        }
1546    }
1547
1548    #[test]
1549    fn test_execution_plan_name() {
1550        let schema1 = Arc::new(Schema::empty());
1551        let default_name_exec = EmptyExec::new(schema1);
1552        assert_eq!(default_name_exec.name(), "EmptyExec");
1553
1554        let schema2 = Arc::new(Schema::empty());
1555        let renamed_exec = RenamedEmptyExec::new(schema2);
1556        assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1557        assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1558    }
1559
1560    /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1561    /// be called from a trait object.
1562    /// Related ticket: https://github.com/apache/datafusion/pull/11047
1563    #[expect(unused)]
1564    fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1565        let _ = plan.name();
1566    }
1567
1568    #[test]
1569    fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1570        check_not_null_constraints(
1571            RecordBatch::try_new(
1572                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1573                vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1574            )?,
1575            &vec![0],
1576        )?;
1577        Ok(())
1578    }
1579
1580    #[test]
1581    fn test_check_not_null_constraints_reject_null() -> Result<()> {
1582        let result = check_not_null_constraints(
1583            RecordBatch::try_new(
1584                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1585                vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1586            )?,
1587            &vec![0],
1588        );
1589        assert!(result.is_err());
1590        assert_eq!(
1591            result.err().unwrap().strip_backtrace(),
1592            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1593        );
1594        Ok(())
1595    }
1596
1597    #[test]
1598    fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1599        // some null value inside REE array
1600        let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1601        let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1602        let run_end_array = RunArray::try_new(&run_ends, &values)?;
1603        let result = check_not_null_constraints(
1604            RecordBatch::try_new(
1605                Arc::new(Schema::new(vec![Field::new(
1606                    "a",
1607                    run_end_array.data_type().to_owned(),
1608                    true,
1609                )])),
1610                vec![Arc::new(run_end_array)],
1611            )?,
1612            &vec![0],
1613        );
1614        assert!(result.is_err());
1615        assert_eq!(
1616            result.err().unwrap().strip_backtrace(),
1617            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1618        );
1619        Ok(())
1620    }
1621
1622    #[test]
1623    fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1624        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1625        let keys = Int32Array::from(vec![0, 1, 2, 3]);
1626        let dictionary = DictionaryArray::new(keys, values);
1627        let result = check_not_null_constraints(
1628            RecordBatch::try_new(
1629                Arc::new(Schema::new(vec![Field::new(
1630                    "a",
1631                    dictionary.data_type().to_owned(),
1632                    true,
1633                )])),
1634                vec![Arc::new(dictionary)],
1635            )?,
1636            &vec![0],
1637        );
1638        assert!(result.is_err());
1639        assert_eq!(
1640            result.err().unwrap().strip_backtrace(),
1641            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1642        );
1643        Ok(())
1644    }
1645
1646    #[test]
1647    fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1648        // some null value marked out by dictionary array
1649        let values = Arc::new(Int32Array::from(vec![
1650            Some(1),
1651            None, // this null value is masked by dictionary keys
1652            Some(3),
1653            Some(4),
1654        ]));
1655        let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1656        let dictionary = DictionaryArray::new(keys, values);
1657        check_not_null_constraints(
1658            RecordBatch::try_new(
1659                Arc::new(Schema::new(vec![Field::new(
1660                    "a",
1661                    dictionary.data_type().to_owned(),
1662                    true,
1663                )])),
1664                vec![Arc::new(dictionary)],
1665            )?,
1666            &vec![0],
1667        )?;
1668        Ok(())
1669    }
1670
1671    #[test]
1672    fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1673        // null value of Null type
1674        let result = check_not_null_constraints(
1675            RecordBatch::try_new(
1676                Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1677                vec![Arc::new(NullArray::new(3))],
1678            )?,
1679            &vec![0],
1680        );
1681        assert!(result.is_err());
1682        assert_eq!(
1683            result.err().unwrap().strip_backtrace(),
1684            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1685        );
1686        Ok(())
1687    }
1688}