Skip to main content

datafusion_physical_plan/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19use crate::filter_pushdown::{
20    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21    FilterPushdownPropagation,
22};
23pub use crate::metrics::Metric;
24pub use crate::ordering::InputOrderMode;
25use crate::sort_pushdown::SortOrderPushdownResult;
26pub use crate::stream::EmptyRecordBatchStream;
27
28use arrow_schema::Schema;
29pub use datafusion_common::hash_utils;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31pub use datafusion_common::utils::project_schema;
32pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
33pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
34pub use datafusion_expr::{Accumulator, ColumnarValue};
35pub use datafusion_physical_expr::window::WindowExpr;
36pub use datafusion_physical_expr::{
37    Distribution, Partitioning, PhysicalExpr, expressions,
38};
39
40use std::any::Any;
41use std::fmt::Debug;
42use std::sync::{Arc, LazyLock};
43
44use crate::coalesce_partitions::CoalescePartitionsExec;
45use crate::display::DisplayableExecutionPlan;
46use crate::metrics::MetricsSet;
47use crate::projection::ProjectionExec;
48use crate::stream::RecordBatchStreamAdapter;
49
50use arrow::array::{Array, RecordBatch};
51use arrow::datatypes::SchemaRef;
52use datafusion_common::config::ConfigOptions;
53use datafusion_common::{
54    Constraints, DataFusionError, Result, assert_eq_or_internal_err,
55    assert_or_internal_err, exec_err,
56};
57use datafusion_common_runtime::JoinSet;
58use datafusion_execution::TaskContext;
59use datafusion_physical_expr::EquivalenceProperties;
60use datafusion_physical_expr_common::sort_expr::{
61    LexOrdering, OrderingRequirements, PhysicalSortExpr,
62};
63
64use futures::stream::{StreamExt, TryStreamExt};
65
66/// Represent nodes in the DataFusion Physical Plan.
67///
68/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
69/// [`RecordBatch`] that incrementally computes a partition of the
70/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
71/// details on partitioning.
72///
73/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
74/// properties of the output to the DataFusion optimizer, and methods such as
75/// [`required_input_distribution`] and [`required_input_ordering`] express
76/// requirements of the `ExecutionPlan` from its input.
77///
78/// [`ExecutionPlan`] can be displayed in a simplified form using the
79/// return value from [`displayable`] in addition to the (normally
80/// quite verbose) `Debug` output.
81///
82/// [`execute`]: ExecutionPlan::execute
83/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
84/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
85///
86/// # Examples
87///
88/// See [`datafusion-examples`] for examples, including
89/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom
90/// `ExecutionPlan` with memory tracking and spilling support.
91///
92/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
93/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
94pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
95    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
96    ///
97    /// Implementation note: this method can just proxy to
98    /// [`static_name`](ExecutionPlan::static_name) if no special action is
99    /// needed. It doesn't provide a default implementation like that because
100    /// this method doesn't require the `Sized` constrain to allow a wilder
101    /// range of use cases.
102    fn name(&self) -> &str;
103
104    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
105    /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
106    fn static_name() -> &'static str
107    where
108        Self: Sized,
109    {
110        let full_name = std::any::type_name::<Self>();
111        let maybe_start_idx = full_name.rfind(':');
112        match maybe_start_idx {
113            Some(start_idx) => &full_name[start_idx + 1..],
114            None => "UNKNOWN",
115        }
116    }
117
118    /// Returns the execution plan as [`Any`] so that it can be
119    /// downcast to a specific implementation.
120    fn as_any(&self) -> &dyn Any;
121
122    /// Get the schema for this execution plan
123    fn schema(&self) -> SchemaRef {
124        Arc::clone(self.properties().schema())
125    }
126
127    /// Return properties of the output of the `ExecutionPlan`, such as output
128    /// ordering(s), partitioning information etc.
129    ///
130    /// This information is available via methods on [`ExecutionPlanProperties`]
131    /// trait, which is implemented for all `ExecutionPlan`s.
132    fn properties(&self) -> &Arc<PlanProperties>;
133
134    /// Returns an error if this individual node does not conform to its invariants.
135    /// These invariants are typically only checked in debug mode.
136    ///
137    /// A default set of invariants is provided in the [check_default_invariants] function.
138    /// The default implementation of `check_invariants` calls this function.
139    /// Extension nodes can provide their own invariants.
140    fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
141        check_default_invariants(self, check)
142    }
143
144    /// Specifies the data distribution requirements for all the
145    /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
146    fn required_input_distribution(&self) -> Vec<Distribution> {
147        vec![Distribution::UnspecifiedDistribution; self.children().len()]
148    }
149
150    /// Specifies the ordering required for all of the children of this
151    /// `ExecutionPlan`.
152    ///
153    /// For each child, it's the local ordering requirement within
154    /// each partition rather than the global ordering
155    ///
156    /// NOTE that checking `!is_empty()` does **not** check for a
157    /// required input ordering. Instead, the correct check is that at
158    /// least one entry must be `Some`
159    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
160        vec![None; self.children().len()]
161    }
162
163    /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
164    /// rows within or between partitions.
165    ///
166    /// For example, Projection, Filter, and Limit maintain the order
167    /// of inputs -- they may transform values (Projection) or not
168    /// produce the same number of rows that went in (Filter and
169    /// Limit), but the rows that are produced go in the same way.
170    ///
171    /// DataFusion uses this metadata to apply certain optimizations
172    /// such as automatically repartitioning correctly.
173    ///
174    /// The default implementation returns `false`
175    ///
176    /// WARNING: if you override this default, you *MUST* ensure that
177    /// the `ExecutionPlan`'s maintains the ordering invariant or else
178    /// DataFusion may produce incorrect results.
179    fn maintains_input_order(&self) -> Vec<bool> {
180        vec![false; self.children().len()]
181    }
182
183    /// Specifies whether the `ExecutionPlan` benefits from increased
184    /// parallelization at its input for each child.
185    ///
186    /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
187    /// its corresponding child (and thus from more parallelism). For
188    /// `ExecutionPlan` that do very little work the overhead of extra
189    /// parallelism may outweigh any benefits
190    ///
191    /// The default implementation returns `true` unless this `ExecutionPlan`
192    /// has signalled it requires a single child input partition.
193    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
194        // By default try to maximize parallelism with more CPUs if
195        // possible
196        self.required_input_distribution()
197            .into_iter()
198            .map(|dist| !matches!(dist, Distribution::SinglePartition))
199            .collect()
200    }
201
202    /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
203    /// The returned list will be empty for leaf nodes such as scans, will contain
204    /// a single value for unary nodes, or two values for binary nodes (such as
205    /// joins).
206    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
207
208    /// Returns a new `ExecutionPlan` where all existing children were replaced
209    /// by the `children`, in order
210    fn with_new_children(
211        self: Arc<Self>,
212        children: Vec<Arc<dyn ExecutionPlan>>,
213    ) -> Result<Arc<dyn ExecutionPlan>>;
214
215    /// Reset any internal state within this [`ExecutionPlan`].
216    ///
217    /// This method is called when an [`ExecutionPlan`] needs to be re-executed,
218    /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
219    /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
220    /// are reset to their initial state.
221    ///
222    /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
223    /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
224    /// necessarily resetting any internal state. Implementations that require resetting of some
225    /// internal state should override this method to provide the necessary logic.
226    ///
227    /// This method should *not* reset state recursively for children, as it is expected that
228    /// it will be called from within a walk of the execution plan tree so that it will be called on each child later
229    /// or was already called on each child.
230    ///
231    /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
232    /// thus it is expected that any cached plan properties will remain valid after the reset.
233    ///
234    /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
235    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
236        let children = self.children().into_iter().cloned().collect();
237        self.with_new_children(children)
238    }
239
240    /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
241    /// produce `target_partitions` partitions.
242    ///
243    /// If the `ExecutionPlan` does not support changing its partitioning,
244    /// returns `Ok(None)` (the default).
245    ///
246    /// It is the `ExecutionPlan` can increase its partitioning, but not to the
247    /// `target_partitions`, it may return an ExecutionPlan with fewer
248    /// partitions. This might happen, for example, if each new partition would
249    /// be too small to be efficiently processed individually.
250    ///
251    /// The DataFusion optimizer attempts to use as many threads as possible by
252    /// repartitioning its inputs to match the target number of threads
253    /// available (`target_partitions`). Some data sources, such as the built in
254    /// CSV and Parquet readers, implement this method as they are able to read
255    /// from their input files in parallel, regardless of how the source data is
256    /// split amongst files.
257    fn repartitioned(
258        &self,
259        _target_partitions: usize,
260        _config: &ConfigOptions,
261    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
262        Ok(None)
263    }
264
265    /// Begin execution of `partition`, returning a [`Stream`] of
266    /// [`RecordBatch`]es.
267    ///
268    /// # Notes
269    ///
270    /// The `execute` method itself is not `async` but it returns an `async`
271    /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
272    /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
273    /// Most `ExecutionPlan`s should not do any work before the first
274    /// `RecordBatch` is requested from the stream.
275    ///
276    /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
277    /// [`Stream`] into a [`SendableRecordBatchStream`].
278    ///
279    /// Using `async` `Streams` allows for network I/O during execution and
280    /// takes advantage of Rust's built in support for `async` continuations and
281    /// crate ecosystem.
282    ///
283    /// [`Stream`]: futures::stream::Stream
284    /// [`StreamExt`]: futures::stream::StreamExt
285    /// [`TryStreamExt`]: futures::stream::TryStreamExt
286    /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
287    ///
288    /// # Error handling
289    ///
290    /// Any error that occurs during execution is sent as an `Err` in the output
291    /// stream.
292    ///
293    /// `ExecutionPlan` implementations in DataFusion cancel additional work
294    /// immediately once an error occurs. The rationale is that if the overall
295    /// query will return an error,  any additional work such as continued
296    /// polling of inputs will be wasted as it will be thrown away.
297    ///
298    /// # Cancellation / Aborting Execution
299    ///
300    /// The [`Stream`] that is returned must ensure that any allocated resources
301    /// are freed when the stream itself is dropped. This is particularly
302    /// important for [`spawn`]ed tasks or threads. Unless care is taken to
303    /// "abort" such tasks, they may continue to consume resources even after
304    /// the plan is dropped, generating intermediate results that are never
305    /// used.
306    /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
307    ///
308    /// To enable timely cancellation, the [`Stream`] that is returned must not
309    /// block the CPU indefinitely and must yield back to the tokio runtime regularly.
310    /// In a typical [`ExecutionPlan`], this automatically happens unless there are
311    /// special circumstances; e.g. when the computational complexity of processing a
312    /// batch is superlinear. See this [general guideline][async-guideline] for more context
313    /// on this point, which explains why one should avoid spending a long time without
314    /// reaching an `await`/yield point in asynchronous runtimes.
315    /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
316    /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
317    /// [`tokio::task::yield_now()`] when appropriate.
318    /// In special cases that warrant manual yielding, determination for "regularly" may be
319    /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
320    /// a timer (being careful with the overhead-heavy system call needed to take the time), or by
321    /// counting rows or batches.
322    ///
323    /// The [cancellation benchmark] tracks some cases of how quickly queries can
324    /// be cancelled.
325    ///
326    /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
327    /// for structures to help ensure all background tasks are cancelled.
328    ///
329    /// [`spawn`]: tokio::task::spawn
330    /// [cancellation benchmark]: https://github.com/apache/datafusion/blob/main/benchmarks/README.md#cancellation
331    /// [`JoinSet`]: datafusion_common_runtime::JoinSet
332    /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
333    /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
334    /// [`Poll::Pending`]: std::task::Poll::Pending
335    /// [async-guideline]: https://ryhl.io/blog/async-what-is-blocking/
336    ///
337    /// # Implementation Examples
338    ///
339    /// While `async` `Stream`s have a non trivial learning curve, the
340    /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
341    /// which help simplify many common operations.
342    ///
343    /// Here are some common patterns:
344    ///
345    /// ## Return Precomputed `RecordBatch`
346    ///
347    /// We can return a precomputed `RecordBatch` as a `Stream`:
348    ///
349    /// ```
350    /// # use std::sync::Arc;
351    /// # use arrow::array::RecordBatch;
352    /// # use arrow::datatypes::SchemaRef;
353    /// # use datafusion_common::Result;
354    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
355    /// # use datafusion_physical_plan::memory::MemoryStream;
356    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
357    /// struct MyPlan {
358    ///     batch: RecordBatch,
359    /// }
360    ///
361    /// impl MyPlan {
362    ///     fn execute(
363    ///         &self,
364    ///         partition: usize,
365    ///         context: Arc<TaskContext>,
366    ///     ) -> Result<SendableRecordBatchStream> {
367    ///         // use functions from futures crate convert the batch into a stream
368    ///         let fut = futures::future::ready(Ok(self.batch.clone()));
369    ///         let stream = futures::stream::once(fut);
370    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
371    ///             self.batch.schema(),
372    ///             stream,
373    ///         )))
374    ///     }
375    /// }
376    /// ```
377    ///
378    /// ## Lazily (async) Compute `RecordBatch`
379    ///
380    /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
381    ///
382    /// ```
383    /// # use std::sync::Arc;
384    /// # use arrow::array::RecordBatch;
385    /// # use arrow::datatypes::SchemaRef;
386    /// # use datafusion_common::Result;
387    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
388    /// # use datafusion_physical_plan::memory::MemoryStream;
389    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
390    /// struct MyPlan {
391    ///     schema: SchemaRef,
392    /// }
393    ///
394    /// /// Returns a single batch when the returned stream is polled
395    /// async fn get_batch() -> Result<RecordBatch> {
396    ///     todo!()
397    /// }
398    ///
399    /// impl MyPlan {
400    ///     fn execute(
401    ///         &self,
402    ///         partition: usize,
403    ///         context: Arc<TaskContext>,
404    ///     ) -> Result<SendableRecordBatchStream> {
405    ///         let fut = get_batch();
406    ///         let stream = futures::stream::once(fut);
407    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
408    ///             self.schema.clone(),
409    ///             stream,
410    ///         )))
411    ///     }
412    /// }
413    /// ```
414    ///
415    /// ## Lazily (async) create a Stream
416    ///
417    /// If you need to create the return `Stream` using an `async` function,
418    /// you can do so by flattening the result:
419    ///
420    /// ```
421    /// # use std::sync::Arc;
422    /// # use arrow::array::RecordBatch;
423    /// # use arrow::datatypes::SchemaRef;
424    /// # use futures::TryStreamExt;
425    /// # use datafusion_common::Result;
426    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
427    /// # use datafusion_physical_plan::memory::MemoryStream;
428    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
429    /// struct MyPlan {
430    ///     schema: SchemaRef,
431    /// }
432    ///
433    /// /// async function that returns a stream
434    /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
435    ///     todo!()
436    /// }
437    ///
438    /// impl MyPlan {
439    ///     fn execute(
440    ///         &self,
441    ///         partition: usize,
442    ///         context: Arc<TaskContext>,
443    ///     ) -> Result<SendableRecordBatchStream> {
444    ///         // A future that yields a stream
445    ///         let fut = get_batch_stream();
446    ///         // Use TryStreamExt::try_flatten to flatten the stream of streams
447    ///         let stream = futures::stream::once(fut).try_flatten();
448    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
449    ///             self.schema.clone(),
450    ///             stream,
451    ///         )))
452    ///     }
453    /// }
454    /// ```
455    fn execute(
456        &self,
457        partition: usize,
458        context: Arc<TaskContext>,
459    ) -> Result<SendableRecordBatchStream>;
460
461    /// Return a snapshot of the set of [`Metric`]s for this
462    /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
463    ///
464    /// While the values of the metrics in the returned
465    /// [`MetricsSet`]s may change as execution progresses, the
466    /// specific metrics will not.
467    ///
468    /// Once `self.execute()` has returned (technically the future is
469    /// resolved) for all available partitions, the set of metrics
470    /// should be complete. If this function is called prior to
471    /// `execute()` new metrics may appear in subsequent calls.
472    fn metrics(&self) -> Option<MetricsSet> {
473        None
474    }
475
476    /// Returns statistics for a specific partition of this `ExecutionPlan` node.
477    /// If statistics are not available, should return [`Statistics::new_unknown`]
478    /// (the default), not an error.
479    /// If `partition` is `None`, it returns statistics for the entire plan.
480    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
481        if let Some(idx) = partition {
482            // Validate partition index
483            let partition_count = self.properties().partitioning.partition_count();
484            assert_or_internal_err!(
485                idx < partition_count,
486                "Invalid partition index: {}, the partition count is {}",
487                idx,
488                partition_count
489            );
490        }
491        Ok(Statistics::new_unknown(&self.schema()))
492    }
493
494    /// Returns `true` if a limit can be safely pushed down through this
495    /// `ExecutionPlan` node.
496    ///
497    /// If this method returns `true`, and the query plan contains a limit at
498    /// the output of this node, DataFusion will push the limit to the input
499    /// of this node.
500    fn supports_limit_pushdown(&self) -> bool {
501        false
502    }
503
504    /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
505    /// fetch limits. Returns `None` otherwise.
506    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
507        None
508    }
509
510    /// Gets the fetch count for the operator, `None` means there is no fetch.
511    fn fetch(&self) -> Option<usize> {
512        None
513    }
514
515    /// Gets the effect on cardinality, if known
516    fn cardinality_effect(&self) -> CardinalityEffect {
517        CardinalityEffect::Unknown
518    }
519
520    /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
521    ///
522    /// If the operator supports this optimization, the resulting plan will be:
523    /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
524    /// Otherwise, it returns the current `ExecutionPlan` as-is.
525    ///
526    /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
527    /// or not possible, or `Err` on failure.
528    fn try_swapping_with_projection(
529        &self,
530        _projection: &ProjectionExec,
531    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
532        Ok(None)
533    }
534
535    /// Collect filters that this node can push down to its children.
536    /// Filters that are being pushed down from parents are passed in,
537    /// and the node may generate additional filters to push down.
538    /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
539    /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`:
540    /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent
541    ///    filters so it only returns that `FilterExec` wants to push down its own predicate.
542    /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from
543    ///    `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key)
544    ///    but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
545    /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec`
546    ///    and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything
547    ///    since it has no children and no additional filters to push down.
548    ///    It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse
549    ///    up the plan that `DataSourceExec` can actually bind the filters.
550    ///
551    /// The default implementation bars all parent filters from being pushed down and adds no new filters.
552    /// This is the safest option, making filter pushdown opt-in on a per-node pasis.
553    ///
554    /// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
555    /// Depending on the phase the operator may or may not be allowed to modify the plan.
556    /// See [`FilterPushdownPhase`] for more details.
557    fn gather_filters_for_pushdown(
558        &self,
559        _phase: FilterPushdownPhase,
560        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
561        _config: &ConfigOptions,
562    ) -> Result<FilterDescription> {
563        Ok(FilterDescription::all_unsupported(
564            &parent_filters,
565            &self.children(),
566        ))
567    }
568
569    /// Handle the result of a child pushdown.
570    ///
571    /// This method is called as we recurse back up the plan tree after pushing
572    /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
573    /// It allows the current node to process the results of filter pushdown from
574    /// its children, deciding whether to absorb filters, modify the plan, or pass
575    /// filters back up to its parent.
576    ///
577    /// **Purpose and Context:**
578    /// Filter pushdown is a critical optimization in DataFusion that aims to
579    /// reduce the amount of data processed by applying filters as early as
580    /// possible in the query plan. This method is part of the second phase of
581    /// filter pushdown, where results are propagated back up the tree after
582    /// being pushed down. Each node can inspect the pushdown results from its
583    /// children and decide how to handle any unapplied filters, potentially
584    /// optimizing the plan structure or filter application.
585    ///
586    /// **Behavior in Different Nodes:**
587    /// - For a `DataSourceExec`, this often means absorbing the filters to apply
588    ///   them during the scan phase (late materialization), reducing the data
589    ///   read from the source.
590    /// - A `FilterExec` may absorb any filters its children could not handle,
591    ///   combining them with its own predicate. If no filters remain (i.e., the
592    ///   predicate becomes trivially true), it may remove itself from the plan
593    ///   altogether. It typically marks parent filters as supported, indicating
594    ///   they have been handled.
595    /// - A `HashJoinExec` might ignore the pushdown result if filters need to
596    ///   be applied during the join operation. It passes the parent filters back
597    ///   up wrapped in [`FilterPushdownPropagation::if_any`], discarding
598    ///   any self-filters from children.
599    ///
600    /// **Example Walkthrough:**
601    /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
602    /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
603    ///    `FilterExec`, the filter `f1` is gathered and pushed down to
604    ///    `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
605    ///    the join or add its own filters (e.g., a min-max filter from the build side),
606    ///    then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
607    ///    has no children to push to, so it prepares to handle filters in the
608    ///    upward phase.
609    /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
610    ///    `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
611    ///    for late materialization during scanning, marking them as supported.
612    ///    `HashJoinExec` receives the result, decides whether to apply any
613    ///    remaining filters during the join, and passes unhandled filters back
614    ///    up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
615    ///    updates its predicate if necessary, or removes itself if the predicate
616    ///    becomes trivial (e.g., `lit(true)`), and marks filters as supported
617    ///    for its parent.
618    ///
619    /// The default implementation is a no-op that passes the result of pushdown
620    /// from the children to its parent transparently, ensuring no filters are
621    /// lost if a node does not override this behavior.
622    ///
623    /// **Notes for Implementation:**
624    /// When returning filters via [`FilterPushdownPropagation`], the order of
625    /// filters need not match the order they were passed in via
626    /// `child_pushdown_result`. However, preserving the order is recommended for
627    /// debugging and ease of reasoning about the resulting plans.
628    ///
629    /// **Helper Methods for Customization:**
630    /// There are various helper methods to simplify implementing this method:
631    /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as
632    ///   supported as long as at least one child supports them.
633    /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as
634    ///   supported as long as all children support them.
635    /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters
636    ///   to the propagation result, indicating which filters are supported by
637    ///   the current node.
638    /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
639    ///   current node in the propagation result, used if the node
640    ///   has modified its plan based on the pushdown results.
641    ///
642    /// **Filter Pushdown Phases:**
643    /// There are two different phases in filter pushdown (`Pre` and others),
644    /// which some operators may handle differently. Depending on the phase, the
645    /// operator may or may not be allowed to modify the plan. See
646    /// [`FilterPushdownPhase`] for more details on phase-specific behavior.
647    ///
648    /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported
649    fn handle_child_pushdown_result(
650        &self,
651        _phase: FilterPushdownPhase,
652        child_pushdown_result: ChildPushdownResult,
653        _config: &ConfigOptions,
654    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
655        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
656    }
657
658    /// Injects arbitrary run-time state into this execution plan, returning a new plan
659    /// instance that incorporates that state *if* it is relevant to the concrete
660    /// node implementation.
661    ///
662    /// This is a generic entry point: the `state` can be any type wrapped in
663    /// `Arc<dyn Any + Send + Sync>`.  A node that cares about the state should
664    /// down-cast it to the concrete type it expects and, if successful, return a
665    /// modified copy of itself that captures the provided value.  If the state is
666    /// not applicable, the default behaviour is to return `None` so that parent
667    /// nodes can continue propagating the attempt further down the plan tree.
668    ///
669    /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
670    /// down-casts the supplied state to an `Arc<WorkTable>`
671    /// in order to wire up the working table used during recursive-CTE execution.
672    /// Similar patterns can be followed by custom nodes that need late-bound
673    /// dependencies or shared state.
674    fn with_new_state(
675        &self,
676        _state: Arc<dyn Any + Send + Sync>,
677    ) -> Option<Arc<dyn ExecutionPlan>> {
678        None
679    }
680
681    /// Try to push down sort ordering requirements to this node.
682    ///
683    /// This method is called during sort pushdown optimization to determine if this
684    /// node can optimize for a requested sort ordering. Implementations should:
685    ///
686    /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact
687    ///   ordering (allowing the Sort operator to be removed)
688    /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the
689    ///   ordering but cannot guarantee perfect sorting (Sort operator is kept)
690    /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize
691    ///   for the ordering
692    ///
693    /// For transparent nodes (that preserve ordering), implement this to delegate to
694    /// children and wrap the result with a new instance of this node.
695    ///
696    /// Default implementation returns `Unsupported`.
697    fn try_pushdown_sort(
698        &self,
699        _order: &[PhysicalSortExpr],
700    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
701        Ok(SortOrderPushdownResult::Unsupported)
702    }
703
704    /// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity.
705    ///
706    /// This is used to signal to data sources that the output ordering must be
707    /// preserved, even if it might be more efficient to ignore it (e.g. by
708    /// skipping some row groups in Parquet).
709    ///
710    fn with_preserve_order(
711        &self,
712        _preserve_order: bool,
713    ) -> Option<Arc<dyn ExecutionPlan>> {
714        None
715    }
716}
717
718/// [`ExecutionPlan`] Invariant Level
719///
720/// What set of assertions ([Invariant]s)  holds for a particular `ExecutionPlan`
721///
722/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
723#[derive(Clone, Copy)]
724pub enum InvariantLevel {
725    /// Invariants that are always true for the [`ExecutionPlan`] node
726    /// such as the number of expected children.
727    Always,
728    /// Invariants that must hold true for the [`ExecutionPlan`] node
729    /// to be "executable", such as ordering and/or distribution requirements
730    /// being fulfilled.
731    Executable,
732}
733
734/// Extension trait provides an easy API to fetch various properties of
735/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
736pub trait ExecutionPlanProperties {
737    /// Specifies how the output of this `ExecutionPlan` is split into
738    /// partitions.
739    fn output_partitioning(&self) -> &Partitioning;
740
741    /// If the output of this `ExecutionPlan` within each partition is sorted,
742    /// returns `Some(keys)` describing the ordering. A `None` return value
743    /// indicates no assumptions should be made on the output ordering.
744    ///
745    /// For example, `SortExec` (obviously) produces sorted output as does
746    /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
747    /// output if its input is sorted as it does not reorder the input rows.
748    fn output_ordering(&self) -> Option<&LexOrdering>;
749
750    /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
751    /// For more details, see [`Boundedness`].
752    fn boundedness(&self) -> Boundedness;
753
754    /// Indicates how the stream of this `ExecutionPlan` emits its results.
755    /// For more details, see [`EmissionType`].
756    fn pipeline_behavior(&self) -> EmissionType;
757
758    /// Get the [`EquivalenceProperties`] within the plan.
759    ///
760    /// Equivalence properties tell DataFusion what columns are known to be
761    /// equal, during various optimization passes. By default, this returns "no
762    /// known equivalences" which is always correct, but may cause DataFusion to
763    /// unnecessarily resort data.
764    ///
765    /// If this ExecutionPlan makes no changes to the schema of the rows flowing
766    /// through it or how columns within each row relate to each other, it
767    /// should return the equivalence properties of its input. For
768    /// example, since [`FilterExec`] may remove rows from its input, but does not
769    /// otherwise modify them, it preserves its input equivalence properties.
770    /// However, since `ProjectionExec` may calculate derived expressions, it
771    /// needs special handling.
772    ///
773    /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
774    /// for related concepts.
775    ///
776    /// [`FilterExec`]: crate::filter::FilterExec
777    fn equivalence_properties(&self) -> &EquivalenceProperties;
778}
779
780impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
781    fn output_partitioning(&self) -> &Partitioning {
782        self.properties().output_partitioning()
783    }
784
785    fn output_ordering(&self) -> Option<&LexOrdering> {
786        self.properties().output_ordering()
787    }
788
789    fn boundedness(&self) -> Boundedness {
790        self.properties().boundedness
791    }
792
793    fn pipeline_behavior(&self) -> EmissionType {
794        self.properties().emission_type
795    }
796
797    fn equivalence_properties(&self) -> &EquivalenceProperties {
798        self.properties().equivalence_properties()
799    }
800}
801
802impl ExecutionPlanProperties for &dyn ExecutionPlan {
803    fn output_partitioning(&self) -> &Partitioning {
804        self.properties().output_partitioning()
805    }
806
807    fn output_ordering(&self) -> Option<&LexOrdering> {
808        self.properties().output_ordering()
809    }
810
811    fn boundedness(&self) -> Boundedness {
812        self.properties().boundedness
813    }
814
815    fn pipeline_behavior(&self) -> EmissionType {
816        self.properties().emission_type
817    }
818
819    fn equivalence_properties(&self) -> &EquivalenceProperties {
820        self.properties().equivalence_properties()
821    }
822}
823
824/// Represents whether a stream of data **generated** by an operator is bounded (finite)
825/// or unbounded (infinite).
826///
827/// This is used to determine whether an execution plan will eventually complete
828/// processing all its data (bounded) or could potentially run forever (unbounded).
829///
830/// For unbounded streams, it also tracks whether the operator requires finite memory
831/// to process the stream or if memory usage could grow unbounded.
832///
833/// Boundedness of the output stream is based on the boundedness of the input stream and the nature of
834/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
835#[derive(Debug, Clone, Copy, PartialEq, Eq)]
836pub enum Boundedness {
837    /// The data stream is bounded (finite) and will eventually complete
838    Bounded,
839    /// The data stream is unbounded (infinite) and could run forever
840    Unbounded {
841        /// Whether this operator requires infinite memory to process the unbounded stream.
842        /// If false, the operator can process an infinite stream with bounded memory.
843        /// If true, memory usage may grow unbounded while processing the stream.
844        ///
845        /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
846        /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
847        requires_infinite_memory: bool,
848    },
849}
850
851impl Boundedness {
852    pub fn is_unbounded(&self) -> bool {
853        matches!(self, Boundedness::Unbounded { .. })
854    }
855}
856
857/// Represents how an operator emits its output records.
858///
859/// This is used to determine whether an operator emits records incrementally as they arrive,
860/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
861/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
862///
863/// For example, in the following plan:
864/// ```text
865///   SortExec [EmissionType::Final]
866///     |_ on: [col1 ASC]
867///     FilterExec [EmissionType::Incremental]
868///       |_ pred: col2 > 100
869///       DataSourceExec [EmissionType::Incremental]
870///         |_ file: "data.csv"
871/// ```
872/// - DataSourceExec emits records incrementally as it reads from the file
873/// - FilterExec processes and emits filtered records incrementally as they arrive
874/// - SortExec must wait for all input records before it can emit the sorted result,
875///   since it needs to see all values to determine their final order
876///
877/// Left joins can emit both incrementally and finally:
878/// - Incrementally emit matches as they are found
879/// - Finally emit non-matches after all input is processed
880#[derive(Debug, Clone, Copy, PartialEq, Eq)]
881pub enum EmissionType {
882    /// Records are emitted incrementally as they arrive and are processed
883    Incremental,
884    /// Records are only emitted once all input has been processed
885    Final,
886    /// Records can be emitted both incrementally and as a final result
887    Both,
888}
889
890/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the
891/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details.
892#[derive(Debug, Clone, Copy, PartialEq, Eq)]
893pub enum SchedulingType {
894    /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in
895    /// cooperative scheduling. This means the implementation of the `Stream` returned by
896    /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as
897    /// [`tokio::task::coop::consume_budget`].
898    ///
899    /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to
900    /// the [`coop`](crate::coop) module for details on when it may be useful to use
901    /// `Cooperative` instead.
902    NonCooperative,
903    /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in
904    /// cooperative scheduling by consuming task budget when it was able to produce a
905    /// [`RecordBatch`].
906    Cooperative,
907}
908
909/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
910///
911/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
912/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
913///
914/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
915/// is known as data-driven or eager evaluation.
916#[derive(Debug, Clone, Copy, PartialEq, Eq)]
917pub enum EvaluationType {
918    /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
919    /// instances when it is demanded by invoking `Stream::poll_next`.
920    /// Filter, projection, and join are examples of such lazy operators.
921    ///
922    /// Lazy operators are also known as demand-driven operators.
923    Lazy,
924    /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
925    /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
926    /// `Stream::poll_next` is called.
927    /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
928    ///
929    /// Eager operators are also known as a data-driven operators.
930    Eager,
931}
932
933/// Utility to determine an operator's boundedness based on its children's boundedness.
934///
935/// Assumes boundedness can be inferred from child operators:
936/// - Unbounded (requires_infinite_memory: true) takes precedence.
937/// - Unbounded (requires_infinite_memory: false) is considered next.
938/// - Otherwise, the operator is bounded.
939///
940/// **Note:** This is a general-purpose utility and may not apply to
941/// all multi-child operators. Ensure your operator's behavior aligns
942/// with these assumptions before using.
943pub(crate) fn boundedness_from_children<'a>(
944    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
945) -> Boundedness {
946    let mut unbounded_with_finite_mem = false;
947
948    for child in children {
949        match child.boundedness() {
950            Boundedness::Unbounded {
951                requires_infinite_memory: true,
952            } => {
953                return Boundedness::Unbounded {
954                    requires_infinite_memory: true,
955                };
956            }
957            Boundedness::Unbounded {
958                requires_infinite_memory: false,
959            } => {
960                unbounded_with_finite_mem = true;
961            }
962            Boundedness::Bounded => {}
963        }
964    }
965
966    if unbounded_with_finite_mem {
967        Boundedness::Unbounded {
968            requires_infinite_memory: false,
969        }
970    } else {
971        Boundedness::Bounded
972    }
973}
974
975/// Determines the emission type of an operator based on its children's pipeline behavior.
976///
977/// The precedence of emission types is:
978/// - `Final` has the highest precedence.
979/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
980/// - `Incremental` is the default if all children emit incremental results.
981///
982/// **Note:** This is a general-purpose utility and may not apply to
983/// all multi-child operators. Verify your operator's behavior aligns
984/// with these assumptions.
985pub(crate) fn emission_type_from_children<'a>(
986    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
987) -> EmissionType {
988    let mut inc_and_final = false;
989
990    for child in children {
991        match child.pipeline_behavior() {
992            EmissionType::Final => return EmissionType::Final,
993            EmissionType::Both => inc_and_final = true,
994            EmissionType::Incremental => continue,
995        }
996    }
997
998    if inc_and_final {
999        EmissionType::Both
1000    } else {
1001        EmissionType::Incremental
1002    }
1003}
1004
1005/// Stores certain, often expensive to compute, plan properties used in query
1006/// optimization.
1007///
1008/// These properties are stored a single structure to permit this information to
1009/// be computed once and then those cached results used multiple times without
1010/// recomputation (aka a cache)
1011#[derive(Debug, Clone)]
1012pub struct PlanProperties {
1013    /// See [ExecutionPlanProperties::equivalence_properties]
1014    pub eq_properties: EquivalenceProperties,
1015    /// See [ExecutionPlanProperties::output_partitioning]
1016    pub partitioning: Partitioning,
1017    /// See [ExecutionPlanProperties::pipeline_behavior]
1018    pub emission_type: EmissionType,
1019    /// See [ExecutionPlanProperties::boundedness]
1020    pub boundedness: Boundedness,
1021    pub evaluation_type: EvaluationType,
1022    pub scheduling_type: SchedulingType,
1023    /// See [ExecutionPlanProperties::output_ordering]
1024    output_ordering: Option<LexOrdering>,
1025}
1026
1027impl PlanProperties {
1028    /// Construct a new `PlanPropertiesCache` from the
1029    pub fn new(
1030        eq_properties: EquivalenceProperties,
1031        partitioning: Partitioning,
1032        emission_type: EmissionType,
1033        boundedness: Boundedness,
1034    ) -> Self {
1035        // Output ordering can be derived from `eq_properties`.
1036        let output_ordering = eq_properties.output_ordering();
1037        Self {
1038            eq_properties,
1039            partitioning,
1040            emission_type,
1041            boundedness,
1042            evaluation_type: EvaluationType::Lazy,
1043            scheduling_type: SchedulingType::NonCooperative,
1044            output_ordering,
1045        }
1046    }
1047
1048    /// Overwrite output partitioning with its new value.
1049    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
1050        self.partitioning = partitioning;
1051        self
1052    }
1053
1054    /// Set equivalence properties having mut reference.
1055    pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) {
1056        // Changing equivalence properties also changes output ordering, so
1057        // make sure to overwrite it:
1058        self.output_ordering = eq_properties.output_ordering();
1059        self.eq_properties = eq_properties;
1060    }
1061
1062    /// Overwrite equivalence properties with its new value.
1063    pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
1064        self.set_eq_properties(eq_properties);
1065        self
1066    }
1067
1068    /// Overwrite boundedness with its new value.
1069    pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
1070        self.boundedness = boundedness;
1071        self
1072    }
1073
1074    /// Overwrite emission type with its new value.
1075    pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
1076        self.emission_type = emission_type;
1077        self
1078    }
1079
1080    /// Set the [`SchedulingType`].
1081    ///
1082    /// Defaults to [`SchedulingType::NonCooperative`]
1083    pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
1084        self.scheduling_type = scheduling_type;
1085        self
1086    }
1087
1088    /// Set the [`EvaluationType`].
1089    ///
1090    /// Defaults to [`EvaluationType::Lazy`]
1091    pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
1092        self.evaluation_type = drive_type;
1093        self
1094    }
1095
1096    /// Set constraints having mut reference.
1097    pub fn set_constraints(&mut self, constraints: Constraints) {
1098        self.eq_properties.set_constraints(constraints);
1099    }
1100
1101    /// Overwrite constraints with its new value.
1102    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1103        self.set_constraints(constraints);
1104        self
1105    }
1106
1107    pub fn equivalence_properties(&self) -> &EquivalenceProperties {
1108        &self.eq_properties
1109    }
1110
1111    pub fn output_partitioning(&self) -> &Partitioning {
1112        &self.partitioning
1113    }
1114
1115    pub fn output_ordering(&self) -> Option<&LexOrdering> {
1116        self.output_ordering.as_ref()
1117    }
1118
1119    /// Get schema of the node.
1120    pub(crate) fn schema(&self) -> &SchemaRef {
1121        self.eq_properties.schema()
1122    }
1123}
1124
1125macro_rules! check_len {
1126    ($target:expr, $func_name:ident, $expected_len:expr) => {
1127        let actual_len = $target.$func_name().len();
1128        assert_eq_or_internal_err!(
1129            actual_len,
1130            $expected_len,
1131            "{}::{} returned Vec with incorrect size: {} != {}",
1132            $target.name(),
1133            stringify!($func_name),
1134            actual_len,
1135            $expected_len
1136        );
1137    };
1138}
1139
1140/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1141/// Returns an error if the given node does not conform.
1142pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1143    plan: &P,
1144    _check: InvariantLevel,
1145) -> Result<(), DataFusionError> {
1146    let children_len = plan.children().len();
1147
1148    check_len!(plan, maintains_input_order, children_len);
1149    check_len!(plan, required_input_ordering, children_len);
1150    check_len!(plan, required_input_distribution, children_len);
1151    check_len!(plan, benefits_from_input_partitioning, children_len);
1152
1153    Ok(())
1154}
1155
1156/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
1157/// especially for the distributed engine to judge whether need to deal with shuffling.
1158/// Currently, there are 3 kinds of execution plan which needs data exchange
1159///     1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
1160///     2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
1161///     3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1162#[expect(clippy::needless_pass_by_value)]
1163pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
1164    plan.properties().evaluation_type == EvaluationType::Eager
1165}
1166
1167/// Returns a copy of this plan if we change any child according to the pointer comparison.
1168/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1169pub fn with_new_children_if_necessary(
1170    plan: Arc<dyn ExecutionPlan>,
1171    children: Vec<Arc<dyn ExecutionPlan>>,
1172) -> Result<Arc<dyn ExecutionPlan>> {
1173    let old_children = plan.children();
1174    assert_eq_or_internal_err!(
1175        children.len(),
1176        old_children.len(),
1177        "Wrong number of children"
1178    );
1179    if children.is_empty()
1180        || children
1181            .iter()
1182            .zip(old_children.iter())
1183            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
1184    {
1185        plan.with_new_children(children)
1186    } else {
1187        Ok(plan)
1188    }
1189}
1190
1191/// Return a [`DisplayableExecutionPlan`] wrapper around an
1192/// [`ExecutionPlan`] which can be displayed in various easier to
1193/// understand ways.
1194///
1195/// See examples on [`DisplayableExecutionPlan`]
1196pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
1197    DisplayableExecutionPlan::new(plan)
1198}
1199
1200/// Execute the [ExecutionPlan] and collect the results in memory
1201pub async fn collect(
1202    plan: Arc<dyn ExecutionPlan>,
1203    context: Arc<TaskContext>,
1204) -> Result<Vec<RecordBatch>> {
1205    let stream = execute_stream(plan, context)?;
1206    crate::common::collect(stream).await
1207}
1208
1209/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
1210///
1211/// See [collect] to buffer the `RecordBatch`es in memory.
1212///
1213/// # Aborting Execution
1214///
1215/// Dropping the stream will abort the execution of the query, and free up
1216/// any allocated resources
1217#[expect(
1218    clippy::needless_pass_by_value,
1219    reason = "Public API that historically takes owned Arcs"
1220)]
1221pub fn execute_stream(
1222    plan: Arc<dyn ExecutionPlan>,
1223    context: Arc<TaskContext>,
1224) -> Result<SendableRecordBatchStream> {
1225    match plan.output_partitioning().partition_count() {
1226        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1227        1 => plan.execute(0, context),
1228        2.. => {
1229            // merge into a single partition
1230            let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
1231            // CoalescePartitionsExec must produce a single partition
1232            assert_eq!(1, plan.properties().output_partitioning().partition_count());
1233            plan.execute(0, context)
1234        }
1235    }
1236}
1237
1238/// Execute the [ExecutionPlan] and collect the results in memory
1239pub async fn collect_partitioned(
1240    plan: Arc<dyn ExecutionPlan>,
1241    context: Arc<TaskContext>,
1242) -> Result<Vec<Vec<RecordBatch>>> {
1243    let streams = execute_stream_partitioned(plan, context)?;
1244
1245    let mut join_set = JoinSet::new();
1246    // Execute the plan and collect the results into batches.
1247    streams.into_iter().enumerate().for_each(|(idx, stream)| {
1248        join_set.spawn(async move {
1249            let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
1250            (idx, result)
1251        });
1252    });
1253
1254    let mut batches = vec![];
1255    // Note that currently this doesn't identify the thread that panicked
1256    //
1257    // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
1258    // once it is stable
1259    while let Some(result) = join_set.join_next().await {
1260        match result {
1261            Ok((idx, res)) => batches.push((idx, res?)),
1262            Err(e) => {
1263                if e.is_panic() {
1264                    std::panic::resume_unwind(e.into_panic());
1265                } else {
1266                    unreachable!();
1267                }
1268            }
1269        }
1270    }
1271
1272    batches.sort_by_key(|(idx, _)| *idx);
1273    let batches = batches.into_iter().map(|(_, batch)| batch).collect();
1274
1275    Ok(batches)
1276}
1277
1278/// Execute the [ExecutionPlan] and return a vec with one stream per output
1279/// partition
1280///
1281/// # Aborting Execution
1282///
1283/// Dropping the stream will abort the execution of the query, and free up
1284/// any allocated resources
1285#[expect(
1286    clippy::needless_pass_by_value,
1287    reason = "Public API that historically takes owned Arcs"
1288)]
1289pub fn execute_stream_partitioned(
1290    plan: Arc<dyn ExecutionPlan>,
1291    context: Arc<TaskContext>,
1292) -> Result<Vec<SendableRecordBatchStream>> {
1293    let num_partitions = plan.output_partitioning().partition_count();
1294    let mut streams = Vec::with_capacity(num_partitions);
1295    for i in 0..num_partitions {
1296        streams.push(plan.execute(i, Arc::clone(&context))?);
1297    }
1298    Ok(streams)
1299}
1300
1301/// Executes an input stream and ensures that the resulting stream adheres to
1302/// the `not null` constraints specified in the `sink_schema`.
1303///
1304/// # Arguments
1305///
1306/// * `input` - An execution plan
1307/// * `sink_schema` - The schema to be applied to the output stream
1308/// * `partition` - The partition index to be executed
1309/// * `context` - The task context
1310///
1311/// # Returns
1312///
1313/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
1314///
1315/// This function first executes the given input plan for the specified partition
1316/// and context. It then checks if there are any columns in the input that might
1317/// violate the `not null` constraints specified in the `sink_schema`. If there are
1318/// such columns, it wraps the resulting stream to enforce the `not null` constraints
1319/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1320#[expect(
1321    clippy::needless_pass_by_value,
1322    reason = "Public API that historically takes owned Arcs"
1323)]
1324pub fn execute_input_stream(
1325    input: Arc<dyn ExecutionPlan>,
1326    sink_schema: SchemaRef,
1327    partition: usize,
1328    context: Arc<TaskContext>,
1329) -> Result<SendableRecordBatchStream> {
1330    let input_stream = input.execute(partition, context)?;
1331
1332    debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
1333
1334    // Find input columns that may violate the not null constraint.
1335    let risky_columns: Vec<_> = sink_schema
1336        .fields()
1337        .iter()
1338        .zip(input.schema().fields().iter())
1339        .enumerate()
1340        .filter_map(|(idx, (sink_field, input_field))| {
1341            (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
1342        })
1343        .collect();
1344
1345    if risky_columns.is_empty() {
1346        Ok(input_stream)
1347    } else {
1348        // Check not null constraint on the input stream
1349        Ok(Box::pin(RecordBatchStreamAdapter::new(
1350            sink_schema,
1351            input_stream
1352                .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
1353        )))
1354    }
1355}
1356
1357/// Checks a `RecordBatch` for `not null` constraints on specified columns.
1358///
1359/// # Arguments
1360///
1361/// * `batch` - The `RecordBatch` to be checked
1362/// * `column_indices` - A vector of column indices that should be checked for
1363///   `not null` constraints.
1364///
1365/// # Returns
1366///
1367/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
1368///
1369/// This function iterates over the specified column indices and ensures that none
1370/// of the columns contain null values. If any column contains null values, an error
1371/// is returned.
1372pub fn check_not_null_constraints(
1373    batch: RecordBatch,
1374    column_indices: &Vec<usize>,
1375) -> Result<RecordBatch> {
1376    for &index in column_indices {
1377        if batch.num_columns() <= index {
1378            return exec_err!(
1379                "Invalid batch column count {} expected > {}",
1380                batch.num_columns(),
1381                index
1382            );
1383        }
1384
1385        if batch
1386            .column(index)
1387            .logical_nulls()
1388            .map(|nulls| nulls.null_count())
1389            .unwrap_or_default()
1390            > 0
1391        {
1392            return exec_err!(
1393                "Invalid batch column at '{}' has null but schema specifies non-nullable",
1394                index
1395            );
1396        }
1397    }
1398
1399    Ok(batch)
1400}
1401
1402/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1403///
1404/// Some plans will change their internal states after execution, making them unable to be executed again.
1405/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1406///
1407/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1408/// However, if the data of the left table is derived from the work table, it will become outdated
1409/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1410///
1411/// # Limitations
1412///
1413/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1414///
1415/// * uses dynamic filters,
1416/// * represents a recursive query.
1417///
1418pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1419    plan.transform_up(|plan| {
1420        let new_plan = Arc::clone(&plan).reset_state()?;
1421        Ok(Transformed::yes(new_plan))
1422    })
1423    .data()
1424}
1425
1426/// Check if the `plan` children has the same properties as passed `children`.
1427/// In this case plan can avoid self properties re-computation when its children
1428/// replace is requested.
1429/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1430pub fn has_same_children_properties(
1431    plan: &impl ExecutionPlan,
1432    children: &[Arc<dyn ExecutionPlan>],
1433) -> Result<bool> {
1434    let old_children = plan.children();
1435    assert_eq_or_internal_err!(
1436        children.len(),
1437        old_children.len(),
1438        "Wrong number of children"
1439    );
1440    for (lhs, rhs) in old_children.iter().zip(children.iter()) {
1441        if !Arc::ptr_eq(lhs.properties(), rhs.properties()) {
1442            return Ok(false);
1443        }
1444    }
1445    Ok(true)
1446}
1447
1448/// Helper macro to avoid properties re-computation if passed children properties
1449/// the same as plan already has. Could be used to implement fast-path for method
1450/// [`ExecutionPlan::with_new_children`].
1451#[macro_export]
1452macro_rules! check_if_same_properties {
1453    ($plan: expr, $children: expr) => {
1454        if $crate::execution_plan::has_same_children_properties(
1455            $plan.as_ref(),
1456            &$children,
1457        )? {
1458            let plan = $plan.with_new_children_and_same_properties($children);
1459            return Ok(::std::sync::Arc::new(plan));
1460        }
1461    };
1462}
1463
1464/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1465pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1466    let formatted = displayable(plan.as_ref()).indent(true).to_string();
1467    let actual: Vec<&str> = formatted.trim().lines().collect();
1468    actual.iter().map(|elem| (*elem).to_string()).collect()
1469}
1470
1471/// Indicates the effect an execution plan operator will have on the cardinality
1472/// of its input stream
1473pub enum CardinalityEffect {
1474    /// Unknown effect. This is the default
1475    Unknown,
1476    /// The operator is guaranteed to produce exactly one row for
1477    /// each input row
1478    Equal,
1479    /// The operator may produce fewer output rows than it receives input rows
1480    LowerEqual,
1481    /// The operator may produce more output rows than it receives input rows
1482    GreaterEqual,
1483}
1484
1485/// Can be used in contexts where properties have not yet been initialized properly.
1486pub(crate) fn stub_properties() -> Arc<PlanProperties> {
1487    static STUB_PROPERTIES: LazyLock<Arc<PlanProperties>> = LazyLock::new(|| {
1488        Arc::new(PlanProperties::new(
1489            EquivalenceProperties::new(Arc::new(Schema::empty())),
1490            Partitioning::UnknownPartitioning(1),
1491            EmissionType::Final,
1492            Boundedness::Bounded,
1493        ))
1494    });
1495
1496    Arc::clone(&STUB_PROPERTIES)
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501    use std::any::Any;
1502    use std::sync::Arc;
1503
1504    use super::*;
1505    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1506
1507    use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1508    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1509    use datafusion_common::{Result, Statistics};
1510    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1511
1512    #[derive(Debug)]
1513    pub struct EmptyExec;
1514
1515    impl EmptyExec {
1516        pub fn new(_schema: SchemaRef) -> Self {
1517            Self
1518        }
1519    }
1520
1521    impl DisplayAs for EmptyExec {
1522        fn fmt_as(
1523            &self,
1524            _t: DisplayFormatType,
1525            _f: &mut std::fmt::Formatter,
1526        ) -> std::fmt::Result {
1527            unimplemented!()
1528        }
1529    }
1530
1531    impl ExecutionPlan for EmptyExec {
1532        fn name(&self) -> &'static str {
1533            Self::static_name()
1534        }
1535
1536        fn as_any(&self) -> &dyn Any {
1537            self
1538        }
1539
1540        fn properties(&self) -> &Arc<PlanProperties> {
1541            unimplemented!()
1542        }
1543
1544        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1545            vec![]
1546        }
1547
1548        fn with_new_children(
1549            self: Arc<Self>,
1550            _: Vec<Arc<dyn ExecutionPlan>>,
1551        ) -> Result<Arc<dyn ExecutionPlan>> {
1552            unimplemented!()
1553        }
1554
1555        fn execute(
1556            &self,
1557            _partition: usize,
1558            _context: Arc<TaskContext>,
1559        ) -> Result<SendableRecordBatchStream> {
1560            unimplemented!()
1561        }
1562
1563        fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1564            unimplemented!()
1565        }
1566    }
1567
1568    #[derive(Debug)]
1569    pub struct RenamedEmptyExec;
1570
1571    impl RenamedEmptyExec {
1572        pub fn new(_schema: SchemaRef) -> Self {
1573            Self
1574        }
1575    }
1576
1577    impl DisplayAs for RenamedEmptyExec {
1578        fn fmt_as(
1579            &self,
1580            _t: DisplayFormatType,
1581            _f: &mut std::fmt::Formatter,
1582        ) -> std::fmt::Result {
1583            unimplemented!()
1584        }
1585    }
1586
1587    impl ExecutionPlan for RenamedEmptyExec {
1588        fn name(&self) -> &'static str {
1589            Self::static_name()
1590        }
1591
1592        fn static_name() -> &'static str
1593        where
1594            Self: Sized,
1595        {
1596            "MyRenamedEmptyExec"
1597        }
1598
1599        fn as_any(&self) -> &dyn Any {
1600            self
1601        }
1602
1603        fn properties(&self) -> &Arc<PlanProperties> {
1604            unimplemented!()
1605        }
1606
1607        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1608            vec![]
1609        }
1610
1611        fn with_new_children(
1612            self: Arc<Self>,
1613            _: Vec<Arc<dyn ExecutionPlan>>,
1614        ) -> Result<Arc<dyn ExecutionPlan>> {
1615            unimplemented!()
1616        }
1617
1618        fn execute(
1619            &self,
1620            _partition: usize,
1621            _context: Arc<TaskContext>,
1622        ) -> Result<SendableRecordBatchStream> {
1623            unimplemented!()
1624        }
1625
1626        fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1627            unimplemented!()
1628        }
1629    }
1630
1631    #[test]
1632    fn test_execution_plan_name() {
1633        let schema1 = Arc::new(Schema::empty());
1634        let default_name_exec = EmptyExec::new(schema1);
1635        assert_eq!(default_name_exec.name(), "EmptyExec");
1636
1637        let schema2 = Arc::new(Schema::empty());
1638        let renamed_exec = RenamedEmptyExec::new(schema2);
1639        assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1640        assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1641    }
1642
1643    /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1644    /// be called from a trait object.
1645    /// Related ticket: https://github.com/apache/datafusion/pull/11047
1646    #[expect(unused)]
1647    fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1648        let _ = plan.name();
1649    }
1650
1651    #[test]
1652    fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1653        check_not_null_constraints(
1654            RecordBatch::try_new(
1655                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1656                vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1657            )?,
1658            &vec![0],
1659        )?;
1660        Ok(())
1661    }
1662
1663    #[test]
1664    fn test_check_not_null_constraints_reject_null() -> Result<()> {
1665        let result = check_not_null_constraints(
1666            RecordBatch::try_new(
1667                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1668                vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1669            )?,
1670            &vec![0],
1671        );
1672        assert!(result.is_err());
1673        assert_eq!(
1674            result.err().unwrap().strip_backtrace(),
1675            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1676        );
1677        Ok(())
1678    }
1679
1680    #[test]
1681    fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1682        // some null value inside REE array
1683        let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1684        let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1685        let run_end_array = RunArray::try_new(&run_ends, &values)?;
1686        let result = check_not_null_constraints(
1687            RecordBatch::try_new(
1688                Arc::new(Schema::new(vec![Field::new(
1689                    "a",
1690                    run_end_array.data_type().to_owned(),
1691                    true,
1692                )])),
1693                vec![Arc::new(run_end_array)],
1694            )?,
1695            &vec![0],
1696        );
1697        assert!(result.is_err());
1698        assert_eq!(
1699            result.err().unwrap().strip_backtrace(),
1700            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1701        );
1702        Ok(())
1703    }
1704
1705    #[test]
1706    fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1707        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1708        let keys = Int32Array::from(vec![0, 1, 2, 3]);
1709        let dictionary = DictionaryArray::new(keys, values);
1710        let result = check_not_null_constraints(
1711            RecordBatch::try_new(
1712                Arc::new(Schema::new(vec![Field::new(
1713                    "a",
1714                    dictionary.data_type().to_owned(),
1715                    true,
1716                )])),
1717                vec![Arc::new(dictionary)],
1718            )?,
1719            &vec![0],
1720        );
1721        assert!(result.is_err());
1722        assert_eq!(
1723            result.err().unwrap().strip_backtrace(),
1724            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1725        );
1726        Ok(())
1727    }
1728
1729    #[test]
1730    fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1731        // some null value marked out by dictionary array
1732        let values = Arc::new(Int32Array::from(vec![
1733            Some(1),
1734            None, // this null value is masked by dictionary keys
1735            Some(3),
1736            Some(4),
1737        ]));
1738        let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1739        let dictionary = DictionaryArray::new(keys, values);
1740        check_not_null_constraints(
1741            RecordBatch::try_new(
1742                Arc::new(Schema::new(vec![Field::new(
1743                    "a",
1744                    dictionary.data_type().to_owned(),
1745                    true,
1746                )])),
1747                vec![Arc::new(dictionary)],
1748            )?,
1749            &vec![0],
1750        )?;
1751        Ok(())
1752    }
1753
1754    #[test]
1755    fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1756        // null value of Null type
1757        let result = check_not_null_constraints(
1758            RecordBatch::try_new(
1759                Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1760                vec![Arc::new(NullArray::new(3))],
1761            )?,
1762            &vec![0],
1763        );
1764        assert!(result.is_err());
1765        assert_eq!(
1766            result.err().unwrap().strip_backtrace(),
1767            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1768        );
1769        Ok(())
1770    }
1771}