datafusion_physical_plan/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19use crate::filter_pushdown::{
20    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21    FilterPushdownPropagation,
22};
23pub use crate::metrics::Metric;
24pub use crate::ordering::InputOrderMode;
25pub use crate::stream::EmptyRecordBatchStream;
26
27pub use datafusion_common::hash_utils;
28pub use datafusion_common::utils::project_schema;
29pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
30pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
31pub use datafusion_expr::{Accumulator, ColumnarValue};
32pub use datafusion_physical_expr::window::WindowExpr;
33pub use datafusion_physical_expr::{
34    expressions, Distribution, Partitioning, PhysicalExpr,
35};
36
37use std::any::Any;
38use std::fmt::Debug;
39use std::sync::Arc;
40
41use crate::coalesce_partitions::CoalescePartitionsExec;
42use crate::display::DisplayableExecutionPlan;
43use crate::metrics::MetricsSet;
44use crate::projection::ProjectionExec;
45use crate::stream::RecordBatchStreamAdapter;
46
47use arrow::array::{Array, RecordBatch};
48use arrow::datatypes::SchemaRef;
49use datafusion_common::config::ConfigOptions;
50use datafusion_common::{exec_err, Constraints, DataFusionError, Result};
51use datafusion_common_runtime::JoinSet;
52use datafusion_execution::TaskContext;
53use datafusion_physical_expr::EquivalenceProperties;
54use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
55
56use futures::stream::{StreamExt, TryStreamExt};
57
58/// Represent nodes in the DataFusion Physical Plan.
59///
60/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
61/// [`RecordBatch`] that incrementally computes a partition of the
62/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
63/// details on partitioning.
64///
65/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
66/// properties of the output to the DataFusion optimizer, and methods such as
67/// [`required_input_distribution`] and [`required_input_ordering`] express
68/// requirements of the `ExecutionPlan` from its input.
69///
70/// [`ExecutionPlan`] can be displayed in a simplified form using the
71/// return value from [`displayable`] in addition to the (normally
72/// quite verbose) `Debug` output.
73///
74/// [`execute`]: ExecutionPlan::execute
75/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
76/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
77///
78/// # Examples
79///
80/// See [`datafusion-examples`] for examples, including
81/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom
82/// `ExecutionPlan` with memory tracking and spilling support.
83///
84/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
85/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_execution_plan.rs
86pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
87    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
88    ///
89    /// Implementation note: this method can just proxy to
90    /// [`static_name`](ExecutionPlan::static_name) if no special action is
91    /// needed. It doesn't provide a default implementation like that because
92    /// this method doesn't require the `Sized` constrain to allow a wilder
93    /// range of use cases.
94    fn name(&self) -> &str;
95
96    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
97    /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
98    fn static_name() -> &'static str
99    where
100        Self: Sized,
101    {
102        let full_name = std::any::type_name::<Self>();
103        let maybe_start_idx = full_name.rfind(':');
104        match maybe_start_idx {
105            Some(start_idx) => &full_name[start_idx + 1..],
106            None => "UNKNOWN",
107        }
108    }
109
110    /// Returns the execution plan as [`Any`] so that it can be
111    /// downcast to a specific implementation.
112    fn as_any(&self) -> &dyn Any;
113
114    /// Get the schema for this execution plan
115    fn schema(&self) -> SchemaRef {
116        Arc::clone(self.properties().schema())
117    }
118
119    /// Return properties of the output of the `ExecutionPlan`, such as output
120    /// ordering(s), partitioning information etc.
121    ///
122    /// This information is available via methods on [`ExecutionPlanProperties`]
123    /// trait, which is implemented for all `ExecutionPlan`s.
124    fn properties(&self) -> &PlanProperties;
125
126    /// Returns an error if this individual node does not conform to its invariants.
127    /// These invariants are typically only checked in debug mode.
128    ///
129    /// A default set of invariants is provided in the [check_default_invariants] function.
130    /// The default implementation of `check_invariants` calls this function.
131    /// Extension nodes can provide their own invariants.
132    fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
133        check_default_invariants(self, check)
134    }
135
136    /// Specifies the data distribution requirements for all the
137    /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
138    fn required_input_distribution(&self) -> Vec<Distribution> {
139        vec![Distribution::UnspecifiedDistribution; self.children().len()]
140    }
141
142    /// Specifies the ordering required for all of the children of this
143    /// `ExecutionPlan`.
144    ///
145    /// For each child, it's the local ordering requirement within
146    /// each partition rather than the global ordering
147    ///
148    /// NOTE that checking `!is_empty()` does **not** check for a
149    /// required input ordering. Instead, the correct check is that at
150    /// least one entry must be `Some`
151    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
152        vec![None; self.children().len()]
153    }
154
155    /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
156    /// rows within or between partitions.
157    ///
158    /// For example, Projection, Filter, and Limit maintain the order
159    /// of inputs -- they may transform values (Projection) or not
160    /// produce the same number of rows that went in (Filter and
161    /// Limit), but the rows that are produced go in the same way.
162    ///
163    /// DataFusion uses this metadata to apply certain optimizations
164    /// such as automatically repartitioning correctly.
165    ///
166    /// The default implementation returns `false`
167    ///
168    /// WARNING: if you override this default, you *MUST* ensure that
169    /// the `ExecutionPlan`'s maintains the ordering invariant or else
170    /// DataFusion may produce incorrect results.
171    fn maintains_input_order(&self) -> Vec<bool> {
172        vec![false; self.children().len()]
173    }
174
175    /// Specifies whether the `ExecutionPlan` benefits from increased
176    /// parallelization at its input for each child.
177    ///
178    /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
179    /// its corresponding child (and thus from more parallelism). For
180    /// `ExecutionPlan` that do very little work the overhead of extra
181    /// parallelism may outweigh any benefits
182    ///
183    /// The default implementation returns `true` unless this `ExecutionPlan`
184    /// has signalled it requires a single child input partition.
185    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
186        // By default try to maximize parallelism with more CPUs if
187        // possible
188        self.required_input_distribution()
189            .into_iter()
190            .map(|dist| !matches!(dist, Distribution::SinglePartition))
191            .collect()
192    }
193
194    /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
195    /// The returned list will be empty for leaf nodes such as scans, will contain
196    /// a single value for unary nodes, or two values for binary nodes (such as
197    /// joins).
198    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
199
200    /// Returns a new `ExecutionPlan` where all existing children were replaced
201    /// by the `children`, in order
202    fn with_new_children(
203        self: Arc<Self>,
204        children: Vec<Arc<dyn ExecutionPlan>>,
205    ) -> Result<Arc<dyn ExecutionPlan>>;
206
207    /// Reset any internal state within this [`ExecutionPlan`].
208    ///
209    /// This method is called when an [`ExecutionPlan`] needs to be re-executed,
210    /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
211    /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
212    /// are reset to their initial state.
213    ///
214    /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
215    /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
216    /// necessarily resetting any internal state. Implementations that require resetting of some
217    /// internal state should override this method to provide the necessary logic.
218    ///
219    /// This method should *not* reset state recursively for children, as it is expected that
220    /// it will be called from within a walk of the execution plan tree so that it will be called on each child later
221    /// or was already called on each child.
222    ///
223    /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
224    /// thus it is expected that any cached plan properties will remain valid after the reset.
225    ///
226    /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
227    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
228        let children = self.children().into_iter().cloned().collect();
229        self.with_new_children(children)
230    }
231
232    /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
233    /// produce `target_partitions` partitions.
234    ///
235    /// If the `ExecutionPlan` does not support changing its partitioning,
236    /// returns `Ok(None)` (the default).
237    ///
238    /// It is the `ExecutionPlan` can increase its partitioning, but not to the
239    /// `target_partitions`, it may return an ExecutionPlan with fewer
240    /// partitions. This might happen, for example, if each new partition would
241    /// be too small to be efficiently processed individually.
242    ///
243    /// The DataFusion optimizer attempts to use as many threads as possible by
244    /// repartitioning its inputs to match the target number of threads
245    /// available (`target_partitions`). Some data sources, such as the built in
246    /// CSV and Parquet readers, implement this method as they are able to read
247    /// from their input files in parallel, regardless of how the source data is
248    /// split amongst files.
249    fn repartitioned(
250        &self,
251        _target_partitions: usize,
252        _config: &ConfigOptions,
253    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
254        Ok(None)
255    }
256
257    /// Begin execution of `partition`, returning a [`Stream`] of
258    /// [`RecordBatch`]es.
259    ///
260    /// # Notes
261    ///
262    /// The `execute` method itself is not `async` but it returns an `async`
263    /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
264    /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
265    /// Most `ExecutionPlan`s should not do any work before the first
266    /// `RecordBatch` is requested from the stream.
267    ///
268    /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
269    /// [`Stream`] into a [`SendableRecordBatchStream`].
270    ///
271    /// Using `async` `Streams` allows for network I/O during execution and
272    /// takes advantage of Rust's built in support for `async` continuations and
273    /// crate ecosystem.
274    ///
275    /// [`Stream`]: futures::stream::Stream
276    /// [`StreamExt`]: futures::stream::StreamExt
277    /// [`TryStreamExt`]: futures::stream::TryStreamExt
278    /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
279    ///
280    /// # Error handling
281    ///
282    /// Any error that occurs during execution is sent as an `Err` in the output
283    /// stream.
284    ///
285    /// `ExecutionPlan` implementations in DataFusion cancel additional work
286    /// immediately once an error occurs. The rationale is that if the overall
287    /// query will return an error,  any additional work such as continued
288    /// polling of inputs will be wasted as it will be thrown away.
289    ///
290    /// # Cancellation / Aborting Execution
291    ///
292    /// The [`Stream`] that is returned must ensure that any allocated resources
293    /// are freed when the stream itself is dropped. This is particularly
294    /// important for [`spawn`]ed tasks or threads. Unless care is taken to
295    /// "abort" such tasks, they may continue to consume resources even after
296    /// the plan is dropped, generating intermediate results that are never
297    /// used.
298    /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
299    ///
300    /// To enable timely cancellation, the [`Stream`] that is returned must not
301    /// block the CPU indefinitely and must yield back to the tokio runtime regularly.
302    /// In a typical [`ExecutionPlan`], this automatically happens unless there are
303    /// special circumstances; e.g. when the computational complexity of processing a
304    /// batch is superlinear. See this [general guideline][async-guideline] for more context
305    /// on this point, which explains why one should avoid spending a long time without
306    /// reaching an `await`/yield point in asynchronous runtimes.
307    /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
308    /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
309    /// [`tokio::task::yield_now()`] when appropriate.
310    /// In special cases that warrant manual yielding, determination for "regularly" may be
311    /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
312    /// a timer (being careful with the overhead-heavy system call needed to take the time), or by
313    /// counting rows or batches.
314    ///
315    /// The [cancellation benchmark] tracks some cases of how quickly queries can
316    /// be cancelled.
317    ///
318    /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
319    /// for structures to help ensure all background tasks are cancelled.
320    ///
321    /// [`spawn`]: tokio::task::spawn
322    /// [cancellation benchmark]: https://github.com/apache/datafusion/blob/main/benchmarks/README.md#cancellation
323    /// [`JoinSet`]: datafusion_common_runtime::JoinSet
324    /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
325    /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
326    /// [`Poll::Pending`]: std::task::Poll::Pending
327    /// [async-guideline]: https://ryhl.io/blog/async-what-is-blocking/
328    ///
329    /// # Implementation Examples
330    ///
331    /// While `async` `Stream`s have a non trivial learning curve, the
332    /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
333    /// which help simplify many common operations.
334    ///
335    /// Here are some common patterns:
336    ///
337    /// ## Return Precomputed `RecordBatch`
338    ///
339    /// We can return a precomputed `RecordBatch` as a `Stream`:
340    ///
341    /// ```
342    /// # use std::sync::Arc;
343    /// # use arrow::array::RecordBatch;
344    /// # use arrow::datatypes::SchemaRef;
345    /// # use datafusion_common::Result;
346    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
347    /// # use datafusion_physical_plan::memory::MemoryStream;
348    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
349    /// struct MyPlan {
350    ///     batch: RecordBatch,
351    /// }
352    ///
353    /// impl MyPlan {
354    ///     fn execute(
355    ///         &self,
356    ///         partition: usize,
357    ///         context: Arc<TaskContext>,
358    ///     ) -> Result<SendableRecordBatchStream> {
359    ///         // use functions from futures crate convert the batch into a stream
360    ///         let fut = futures::future::ready(Ok(self.batch.clone()));
361    ///         let stream = futures::stream::once(fut);
362    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
363    ///             self.batch.schema(),
364    ///             stream,
365    ///         )))
366    ///     }
367    /// }
368    /// ```
369    ///
370    /// ## Lazily (async) Compute `RecordBatch`
371    ///
372    /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
373    ///
374    /// ```
375    /// # use std::sync::Arc;
376    /// # use arrow::array::RecordBatch;
377    /// # use arrow::datatypes::SchemaRef;
378    /// # use datafusion_common::Result;
379    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
380    /// # use datafusion_physical_plan::memory::MemoryStream;
381    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
382    /// struct MyPlan {
383    ///     schema: SchemaRef,
384    /// }
385    ///
386    /// /// Returns a single batch when the returned stream is polled
387    /// async fn get_batch() -> Result<RecordBatch> {
388    ///     todo!()
389    /// }
390    ///
391    /// impl MyPlan {
392    ///     fn execute(
393    ///         &self,
394    ///         partition: usize,
395    ///         context: Arc<TaskContext>,
396    ///     ) -> Result<SendableRecordBatchStream> {
397    ///         let fut = get_batch();
398    ///         let stream = futures::stream::once(fut);
399    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
400    ///             self.schema.clone(),
401    ///             stream,
402    ///         )))
403    ///     }
404    /// }
405    /// ```
406    ///
407    /// ## Lazily (async) create a Stream
408    ///
409    /// If you need to create the return `Stream` using an `async` function,
410    /// you can do so by flattening the result:
411    ///
412    /// ```
413    /// # use std::sync::Arc;
414    /// # use arrow::array::RecordBatch;
415    /// # use arrow::datatypes::SchemaRef;
416    /// # use futures::TryStreamExt;
417    /// # use datafusion_common::Result;
418    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
419    /// # use datafusion_physical_plan::memory::MemoryStream;
420    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
421    /// struct MyPlan {
422    ///     schema: SchemaRef,
423    /// }
424    ///
425    /// /// async function that returns a stream
426    /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
427    ///     todo!()
428    /// }
429    ///
430    /// impl MyPlan {
431    ///     fn execute(
432    ///         &self,
433    ///         partition: usize,
434    ///         context: Arc<TaskContext>,
435    ///     ) -> Result<SendableRecordBatchStream> {
436    ///         // A future that yields a stream
437    ///         let fut = get_batch_stream();
438    ///         // Use TryStreamExt::try_flatten to flatten the stream of streams
439    ///         let stream = futures::stream::once(fut).try_flatten();
440    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
441    ///             self.schema.clone(),
442    ///             stream,
443    ///         )))
444    ///     }
445    /// }
446    /// ```
447    fn execute(
448        &self,
449        partition: usize,
450        context: Arc<TaskContext>,
451    ) -> Result<SendableRecordBatchStream>;
452
453    /// Return a snapshot of the set of [`Metric`]s for this
454    /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
455    ///
456    /// While the values of the metrics in the returned
457    /// [`MetricsSet`]s may change as execution progresses, the
458    /// specific metrics will not.
459    ///
460    /// Once `self.execute()` has returned (technically the future is
461    /// resolved) for all available partitions, the set of metrics
462    /// should be complete. If this function is called prior to
463    /// `execute()` new metrics may appear in subsequent calls.
464    fn metrics(&self) -> Option<MetricsSet> {
465        None
466    }
467
468    /// Returns statistics for this `ExecutionPlan` node. If statistics are not
469    /// available, should return [`Statistics::new_unknown`] (the default), not
470    /// an error.
471    ///
472    /// For TableScan executors, which supports filter pushdown, special attention
473    /// needs to be paid to whether the stats returned by this method are exact or not
474    #[deprecated(since = "48.0.0", note = "Use `partition_statistics` method instead")]
475    fn statistics(&self) -> Result<Statistics> {
476        Ok(Statistics::new_unknown(&self.schema()))
477    }
478
479    /// Returns statistics for a specific partition of this `ExecutionPlan` node.
480    /// If statistics are not available, should return [`Statistics::new_unknown`]
481    /// (the default), not an error.
482    /// If `partition` is `None`, it returns statistics for the entire plan.
483    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
484        if let Some(idx) = partition {
485            // Validate partition index
486            let partition_count = self.properties().partitioning.partition_count();
487            if idx >= partition_count {
488                return internal_err!(
489                    "Invalid partition index: {}, the partition count is {}",
490                    idx,
491                    partition_count
492                );
493            }
494        }
495        Ok(Statistics::new_unknown(&self.schema()))
496    }
497
498    /// Returns `true` if a limit can be safely pushed down through this
499    /// `ExecutionPlan` node.
500    ///
501    /// If this method returns `true`, and the query plan contains a limit at
502    /// the output of this node, DataFusion will push the limit to the input
503    /// of this node.
504    fn supports_limit_pushdown(&self) -> bool {
505        false
506    }
507
508    /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
509    /// fetch limits. Returns `None` otherwise.
510    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
511        None
512    }
513
514    /// Gets the fetch count for the operator, `None` means there is no fetch.
515    fn fetch(&self) -> Option<usize> {
516        None
517    }
518
519    /// Gets the effect on cardinality, if known
520    fn cardinality_effect(&self) -> CardinalityEffect {
521        CardinalityEffect::Unknown
522    }
523
524    /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
525    ///
526    /// If the operator supports this optimization, the resulting plan will be:
527    /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
528    /// Otherwise, it returns the current `ExecutionPlan` as-is.
529    ///
530    /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
531    /// or not possible, or `Err` on failure.
532    fn try_swapping_with_projection(
533        &self,
534        _projection: &ProjectionExec,
535    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
536        Ok(None)
537    }
538
539    /// Collect filters that this node can push down to its children.
540    /// Filters that are being pushed down from parents are passed in,
541    /// and the node may generate additional filters to push down.
542    /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
543    /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`:
544    /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent
545    ///    filters so it only returns that `FilterExec` wants to push down its own predicate.
546    /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from
547    ///    `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key)
548    ///    but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
549    /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec`
550    ///    and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything
551    ///    since it has no children and no additional filters to push down.
552    ///    It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse
553    ///    up the plan that `DataSourceExec` can actually bind the filters.
554    ///
555    /// The default implementation bars all parent filters from being pushed down and adds no new filters.
556    /// This is the safest option, making filter pushdown opt-in on a per-node pasis.
557    ///
558    /// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
559    /// Depending on the phase the operator may or may not be allowed to modify the plan.
560    /// See [`FilterPushdownPhase`] for more details.
561    fn gather_filters_for_pushdown(
562        &self,
563        _phase: FilterPushdownPhase,
564        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
565        _config: &ConfigOptions,
566    ) -> Result<FilterDescription> {
567        Ok(FilterDescription::all_unsupported(
568            &parent_filters,
569            &self.children(),
570        ))
571    }
572
573    /// Handle the result of a child pushdown.
574    /// This method is called as we recurse back up the plan tree after pushing
575    /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
576    /// It allows the current node to process the results of filter pushdown from
577    /// its children, deciding whether to absorb filters, modify the plan, or pass
578    /// filters back up to its parent.
579    ///
580    /// **Purpose and Context:**
581    /// Filter pushdown is a critical optimization in DataFusion that aims to
582    /// reduce the amount of data processed by applying filters as early as
583    /// possible in the query plan. This method is part of the second phase of
584    /// filter pushdown, where results are propagated back up the tree after
585    /// being pushed down. Each node can inspect the pushdown results from its
586    /// children and decide how to handle any unapplied filters, potentially
587    /// optimizing the plan structure or filter application.
588    ///
589    /// **Behavior in Different Nodes:**
590    /// - For a `DataSourceExec`, this often means absorbing the filters to apply
591    ///   them during the scan phase (late materialization), reducing the data
592    ///   read from the source.
593    /// - A `FilterExec` may absorb any filters its children could not handle,
594    ///   combining them with its own predicate. If no filters remain (i.e., the
595    ///   predicate becomes trivially true), it may remove itself from the plan
596    ///   altogether. It typically marks parent filters as supported, indicating
597    ///   they have been handled.
598    /// - A `HashJoinExec` might ignore the pushdown result if filters need to
599    ///   be applied during the join operation. It passes the parent filters back
600    ///   up wrapped in [`FilterPushdownPropagation::if_any`], discarding
601    ///   any self-filters from children.
602    ///
603    /// **Example Walkthrough:**
604    /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
605    /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
606    ///    `FilterExec`, the filter `f1` is gathered and pushed down to
607    ///    `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
608    ///    the join or add its own filters (e.g., a min-max filter from the build side),
609    ///    then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
610    ///    has no children to push to, so it prepares to handle filters in the
611    ///    upward phase.
612    /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
613    ///    `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
614    ///    for late materialization during scanning, marking them as supported.
615    ///    `HashJoinExec` receives the result, decides whether to apply any
616    ///    remaining filters during the join, and passes unhandled filters back
617    ///    up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
618    ///    updates its predicate if necessary, or removes itself if the predicate
619    ///    becomes trivial (e.g., `lit(true)`), and marks filters as supported
620    ///    for its parent.
621    ///
622    /// The default implementation is a no-op that passes the result of pushdown
623    /// from the children to its parent transparently, ensuring no filters are
624    /// lost if a node does not override this behavior.
625    ///
626    /// **Notes for Implementation:**
627    /// When returning filters via [`FilterPushdownPropagation`], the order of
628    /// filters need not match the order they were passed in via
629    /// `child_pushdown_result`. However, preserving the order is recommended for
630    /// debugging and ease of reasoning about the resulting plans.
631    ///
632    /// **Helper Methods for Customization:**
633    /// There are various helper methods to simplify implementing this method:
634    /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as
635    ///   supported as long as at least one child supports them.
636    /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as
637    ///   supported as long as all children support them.
638    /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters
639    ///   to the propagation result, indicating which filters are supported by
640    ///   the current node.
641    /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
642    ///   current node in the propagation result, used if the node
643    ///   has modified its plan based on the pushdown results.
644    ///
645    /// **Filter Pushdown Phases:**
646    /// There are two different phases in filter pushdown (`Pre` and others),
647    /// which some operators may handle differently. Depending on the phase, the
648    /// operator may or may not be allowed to modify the plan. See
649    /// [`FilterPushdownPhase`] for more details on phase-specific behavior.
650    ///
651    /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported
652    fn handle_child_pushdown_result(
653        &self,
654        _phase: FilterPushdownPhase,
655        child_pushdown_result: ChildPushdownResult,
656        _config: &ConfigOptions,
657    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
658        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
659    }
660
661    /// Injects arbitrary run-time state into this execution plan, returning a new plan
662    /// instance that incorporates that state *if* it is relevant to the concrete
663    /// node implementation.
664    ///
665    /// This is a generic entry point: the `state` can be any type wrapped in
666    /// `Arc<dyn Any + Send + Sync>`.  A node that cares about the state should
667    /// down-cast it to the concrete type it expects and, if successful, return a
668    /// modified copy of itself that captures the provided value.  If the state is
669    /// not applicable, the default behaviour is to return `None` so that parent
670    /// nodes can continue propagating the attempt further down the plan tree.
671    ///
672    /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
673    /// down-casts the supplied state to an `Arc<WorkTable>`
674    /// in order to wire up the working table used during recursive-CTE execution.
675    /// Similar patterns can be followed by custom nodes that need late-bound
676    /// dependencies or shared state.
677    fn with_new_state(
678        &self,
679        _state: Arc<dyn Any + Send + Sync>,
680    ) -> Option<Arc<dyn ExecutionPlan>> {
681        None
682    }
683}
684
685/// [`ExecutionPlan`] Invariant Level
686///
687/// What set of assertions ([Invariant]s)  holds for a particular `ExecutionPlan`
688///
689/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
690#[derive(Clone, Copy)]
691pub enum InvariantLevel {
692    /// Invariants that are always true for the [`ExecutionPlan`] node
693    /// such as the number of expected children.
694    Always,
695    /// Invariants that must hold true for the [`ExecutionPlan`] node
696    /// to be "executable", such as ordering and/or distribution requirements
697    /// being fulfilled.
698    Executable,
699}
700
701/// Extension trait provides an easy API to fetch various properties of
702/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
703pub trait ExecutionPlanProperties {
704    /// Specifies how the output of this `ExecutionPlan` is split into
705    /// partitions.
706    fn output_partitioning(&self) -> &Partitioning;
707
708    /// If the output of this `ExecutionPlan` within each partition is sorted,
709    /// returns `Some(keys)` describing the ordering. A `None` return value
710    /// indicates no assumptions should be made on the output ordering.
711    ///
712    /// For example, `SortExec` (obviously) produces sorted output as does
713    /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
714    /// output if its input is sorted as it does not reorder the input rows.
715    fn output_ordering(&self) -> Option<&LexOrdering>;
716
717    /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
718    /// For more details, see [`Boundedness`].
719    fn boundedness(&self) -> Boundedness;
720
721    /// Indicates how the stream of this `ExecutionPlan` emits its results.
722    /// For more details, see [`EmissionType`].
723    fn pipeline_behavior(&self) -> EmissionType;
724
725    /// Get the [`EquivalenceProperties`] within the plan.
726    ///
727    /// Equivalence properties tell DataFusion what columns are known to be
728    /// equal, during various optimization passes. By default, this returns "no
729    /// known equivalences" which is always correct, but may cause DataFusion to
730    /// unnecessarily resort data.
731    ///
732    /// If this ExecutionPlan makes no changes to the schema of the rows flowing
733    /// through it or how columns within each row relate to each other, it
734    /// should return the equivalence properties of its input. For
735    /// example, since [`FilterExec`] may remove rows from its input, but does not
736    /// otherwise modify them, it preserves its input equivalence properties.
737    /// However, since `ProjectionExec` may calculate derived expressions, it
738    /// needs special handling.
739    ///
740    /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
741    /// for related concepts.
742    ///
743    /// [`FilterExec`]: crate::filter::FilterExec
744    fn equivalence_properties(&self) -> &EquivalenceProperties;
745}
746
747impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
748    fn output_partitioning(&self) -> &Partitioning {
749        self.properties().output_partitioning()
750    }
751
752    fn output_ordering(&self) -> Option<&LexOrdering> {
753        self.properties().output_ordering()
754    }
755
756    fn boundedness(&self) -> Boundedness {
757        self.properties().boundedness
758    }
759
760    fn pipeline_behavior(&self) -> EmissionType {
761        self.properties().emission_type
762    }
763
764    fn equivalence_properties(&self) -> &EquivalenceProperties {
765        self.properties().equivalence_properties()
766    }
767}
768
769impl ExecutionPlanProperties for &dyn ExecutionPlan {
770    fn output_partitioning(&self) -> &Partitioning {
771        self.properties().output_partitioning()
772    }
773
774    fn output_ordering(&self) -> Option<&LexOrdering> {
775        self.properties().output_ordering()
776    }
777
778    fn boundedness(&self) -> Boundedness {
779        self.properties().boundedness
780    }
781
782    fn pipeline_behavior(&self) -> EmissionType {
783        self.properties().emission_type
784    }
785
786    fn equivalence_properties(&self) -> &EquivalenceProperties {
787        self.properties().equivalence_properties()
788    }
789}
790
791/// Represents whether a stream of data **generated** by an operator is bounded (finite)
792/// or unbounded (infinite).
793///
794/// This is used to determine whether an execution plan will eventually complete
795/// processing all its data (bounded) or could potentially run forever (unbounded).
796///
797/// For unbounded streams, it also tracks whether the operator requires finite memory
798/// to process the stream or if memory usage could grow unbounded.
799///
800/// Boundedness of the output stream is based on the boundedness of the input stream and the nature of
801/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
802#[derive(Debug, Clone, Copy, PartialEq, Eq)]
803pub enum Boundedness {
804    /// The data stream is bounded (finite) and will eventually complete
805    Bounded,
806    /// The data stream is unbounded (infinite) and could run forever
807    Unbounded {
808        /// Whether this operator requires infinite memory to process the unbounded stream.
809        /// If false, the operator can process an infinite stream with bounded memory.
810        /// If true, memory usage may grow unbounded while processing the stream.
811        ///
812        /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
813        /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
814        requires_infinite_memory: bool,
815    },
816}
817
818impl Boundedness {
819    pub fn is_unbounded(&self) -> bool {
820        matches!(self, Boundedness::Unbounded { .. })
821    }
822}
823
824/// Represents how an operator emits its output records.
825///
826/// This is used to determine whether an operator emits records incrementally as they arrive,
827/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
828/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
829///
830/// For example, in the following plan:
831/// ```text
832///   SortExec [EmissionType::Final]
833///     |_ on: [col1 ASC]
834///     FilterExec [EmissionType::Incremental]
835///       |_ pred: col2 > 100
836///       DataSourceExec [EmissionType::Incremental]
837///         |_ file: "data.csv"
838/// ```
839/// - DataSourceExec emits records incrementally as it reads from the file
840/// - FilterExec processes and emits filtered records incrementally as they arrive
841/// - SortExec must wait for all input records before it can emit the sorted result,
842///   since it needs to see all values to determine their final order
843///
844/// Left joins can emit both incrementally and finally:
845/// - Incrementally emit matches as they are found
846/// - Finally emit non-matches after all input is processed
847#[derive(Debug, Clone, Copy, PartialEq, Eq)]
848pub enum EmissionType {
849    /// Records are emitted incrementally as they arrive and are processed
850    Incremental,
851    /// Records are only emitted once all input has been processed
852    Final,
853    /// Records can be emitted both incrementally and as a final result
854    Both,
855}
856
857/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the
858/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details.
859#[derive(Debug, Clone, Copy, PartialEq, Eq)]
860pub enum SchedulingType {
861    /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in
862    /// cooperative scheduling. This means the implementation of the `Stream` returned by
863    /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as
864    /// [`tokio::task::coop::consume_budget`].
865    ///
866    /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to
867    /// the [`coop`](crate::coop) module for details on when it may be useful to use
868    /// `Cooperative` instead.
869    NonCooperative,
870    /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in
871    /// cooperative scheduling by consuming task budget when it was able to produce a
872    /// [`RecordBatch`].
873    Cooperative,
874}
875
876/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
877///
878/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
879/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
880///
881/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
882/// is known as data-driven or eager evaluation.
883#[derive(Debug, Clone, Copy, PartialEq, Eq)]
884pub enum EvaluationType {
885    /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
886    /// instances when it is demanded by invoking `Stream::poll_next`.
887    /// Filter, projection, and join are examples of such lazy operators.
888    ///
889    /// Lazy operators are also known as demand-driven operators.
890    Lazy,
891    /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
892    /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
893    /// `Stream::poll_next` is called.
894    /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
895    ///
896    /// Eager operators are also known as a data-driven operators.
897    Eager,
898}
899
900/// Utility to determine an operator's boundedness based on its children's boundedness.
901///
902/// Assumes boundedness can be inferred from child operators:
903/// - Unbounded (requires_infinite_memory: true) takes precedence.
904/// - Unbounded (requires_infinite_memory: false) is considered next.
905/// - Otherwise, the operator is bounded.
906///
907/// **Note:** This is a general-purpose utility and may not apply to
908/// all multi-child operators. Ensure your operator's behavior aligns
909/// with these assumptions before using.
910pub(crate) fn boundedness_from_children<'a>(
911    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
912) -> Boundedness {
913    let mut unbounded_with_finite_mem = false;
914
915    for child in children {
916        match child.boundedness() {
917            Boundedness::Unbounded {
918                requires_infinite_memory: true,
919            } => {
920                return Boundedness::Unbounded {
921                    requires_infinite_memory: true,
922                }
923            }
924            Boundedness::Unbounded {
925                requires_infinite_memory: false,
926            } => {
927                unbounded_with_finite_mem = true;
928            }
929            Boundedness::Bounded => {}
930        }
931    }
932
933    if unbounded_with_finite_mem {
934        Boundedness::Unbounded {
935            requires_infinite_memory: false,
936        }
937    } else {
938        Boundedness::Bounded
939    }
940}
941
942/// Determines the emission type of an operator based on its children's pipeline behavior.
943///
944/// The precedence of emission types is:
945/// - `Final` has the highest precedence.
946/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
947/// - `Incremental` is the default if all children emit incremental results.
948///
949/// **Note:** This is a general-purpose utility and may not apply to
950/// all multi-child operators. Verify your operator's behavior aligns
951/// with these assumptions.
952pub(crate) fn emission_type_from_children<'a>(
953    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
954) -> EmissionType {
955    let mut inc_and_final = false;
956
957    for child in children {
958        match child.pipeline_behavior() {
959            EmissionType::Final => return EmissionType::Final,
960            EmissionType::Both => inc_and_final = true,
961            EmissionType::Incremental => continue,
962        }
963    }
964
965    if inc_and_final {
966        EmissionType::Both
967    } else {
968        EmissionType::Incremental
969    }
970}
971
972/// Stores certain, often expensive to compute, plan properties used in query
973/// optimization.
974///
975/// These properties are stored a single structure to permit this information to
976/// be computed once and then those cached results used multiple times without
977/// recomputation (aka a cache)
978#[derive(Debug, Clone)]
979pub struct PlanProperties {
980    /// See [ExecutionPlanProperties::equivalence_properties]
981    pub eq_properties: EquivalenceProperties,
982    /// See [ExecutionPlanProperties::output_partitioning]
983    pub partitioning: Partitioning,
984    /// See [ExecutionPlanProperties::pipeline_behavior]
985    pub emission_type: EmissionType,
986    /// See [ExecutionPlanProperties::boundedness]
987    pub boundedness: Boundedness,
988    pub evaluation_type: EvaluationType,
989    pub scheduling_type: SchedulingType,
990    /// See [ExecutionPlanProperties::output_ordering]
991    output_ordering: Option<LexOrdering>,
992}
993
994impl PlanProperties {
995    /// Construct a new `PlanPropertiesCache` from the
996    pub fn new(
997        eq_properties: EquivalenceProperties,
998        partitioning: Partitioning,
999        emission_type: EmissionType,
1000        boundedness: Boundedness,
1001    ) -> Self {
1002        // Output ordering can be derived from `eq_properties`.
1003        let output_ordering = eq_properties.output_ordering();
1004        Self {
1005            eq_properties,
1006            partitioning,
1007            emission_type,
1008            boundedness,
1009            evaluation_type: EvaluationType::Lazy,
1010            scheduling_type: SchedulingType::NonCooperative,
1011            output_ordering,
1012        }
1013    }
1014
1015    /// Overwrite output partitioning with its new value.
1016    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
1017        self.partitioning = partitioning;
1018        self
1019    }
1020
1021    /// Overwrite equivalence properties with its new value.
1022    pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
1023        // Changing equivalence properties also changes output ordering, so
1024        // make sure to overwrite it:
1025        self.output_ordering = eq_properties.output_ordering();
1026        self.eq_properties = eq_properties;
1027        self
1028    }
1029
1030    /// Overwrite boundedness with its new value.
1031    pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
1032        self.boundedness = boundedness;
1033        self
1034    }
1035
1036    /// Overwrite emission type with its new value.
1037    pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
1038        self.emission_type = emission_type;
1039        self
1040    }
1041
1042    /// Set the [`SchedulingType`].
1043    ///
1044    /// Defaults to [`SchedulingType::NonCooperative`]
1045    pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
1046        self.scheduling_type = scheduling_type;
1047        self
1048    }
1049
1050    /// Set the [`EvaluationType`].
1051    ///
1052    /// Defaults to [`EvaluationType::Lazy`]
1053    pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
1054        self.evaluation_type = drive_type;
1055        self
1056    }
1057
1058    /// Overwrite constraints with its new value.
1059    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1060        self.eq_properties = self.eq_properties.with_constraints(constraints);
1061        self
1062    }
1063
1064    pub fn equivalence_properties(&self) -> &EquivalenceProperties {
1065        &self.eq_properties
1066    }
1067
1068    pub fn output_partitioning(&self) -> &Partitioning {
1069        &self.partitioning
1070    }
1071
1072    pub fn output_ordering(&self) -> Option<&LexOrdering> {
1073        self.output_ordering.as_ref()
1074    }
1075
1076    /// Get schema of the node.
1077    pub(crate) fn schema(&self) -> &SchemaRef {
1078        self.eq_properties.schema()
1079    }
1080}
1081
1082macro_rules! check_len {
1083    ($target:expr, $func_name:ident, $expected_len:expr) => {
1084        let actual_len = $target.$func_name().len();
1085        if actual_len != $expected_len {
1086            return internal_err!(
1087                "{}::{} returned Vec with incorrect size: {} != {}",
1088                $target.name(),
1089                stringify!($func_name),
1090                actual_len,
1091                $expected_len
1092            );
1093        }
1094    };
1095}
1096
1097/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1098/// Returns an error if the given node does not conform.
1099pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1100    plan: &P,
1101    _check: InvariantLevel,
1102) -> Result<(), DataFusionError> {
1103    let children_len = plan.children().len();
1104
1105    check_len!(plan, maintains_input_order, children_len);
1106    check_len!(plan, required_input_ordering, children_len);
1107    check_len!(plan, required_input_distribution, children_len);
1108    check_len!(plan, benefits_from_input_partitioning, children_len);
1109
1110    Ok(())
1111}
1112
1113/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
1114/// especially for the distributed engine to judge whether need to deal with shuffling.
1115/// Currently, there are 3 kinds of execution plan which needs data exchange
1116///     1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
1117///     2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
1118///     3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1119pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
1120    plan.properties().evaluation_type == EvaluationType::Eager
1121}
1122
1123/// Returns a copy of this plan if we change any child according to the pointer comparison.
1124/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1125pub fn with_new_children_if_necessary(
1126    plan: Arc<dyn ExecutionPlan>,
1127    children: Vec<Arc<dyn ExecutionPlan>>,
1128) -> Result<Arc<dyn ExecutionPlan>> {
1129    let old_children = plan.children();
1130    if children.len() != old_children.len() {
1131        internal_err!("Wrong number of children")
1132    } else if children.is_empty()
1133        || children
1134            .iter()
1135            .zip(old_children.iter())
1136            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
1137    {
1138        plan.with_new_children(children)
1139    } else {
1140        Ok(plan)
1141    }
1142}
1143
1144/// Return a [`DisplayableExecutionPlan`] wrapper around an
1145/// [`ExecutionPlan`] which can be displayed in various easier to
1146/// understand ways.
1147///
1148/// See examples on [`DisplayableExecutionPlan`]
1149pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
1150    DisplayableExecutionPlan::new(plan)
1151}
1152
1153/// Execute the [ExecutionPlan] and collect the results in memory
1154pub async fn collect(
1155    plan: Arc<dyn ExecutionPlan>,
1156    context: Arc<TaskContext>,
1157) -> Result<Vec<RecordBatch>> {
1158    let stream = execute_stream(plan, context)?;
1159    crate::common::collect(stream).await
1160}
1161
1162/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
1163///
1164/// See [collect] to buffer the `RecordBatch`es in memory.
1165///
1166/// # Aborting Execution
1167///
1168/// Dropping the stream will abort the execution of the query, and free up
1169/// any allocated resources
1170pub fn execute_stream(
1171    plan: Arc<dyn ExecutionPlan>,
1172    context: Arc<TaskContext>,
1173) -> Result<SendableRecordBatchStream> {
1174    match plan.output_partitioning().partition_count() {
1175        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1176        1 => plan.execute(0, context),
1177        2.. => {
1178            // merge into a single partition
1179            let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
1180            // CoalescePartitionsExec must produce a single partition
1181            assert_eq!(1, plan.properties().output_partitioning().partition_count());
1182            plan.execute(0, context)
1183        }
1184    }
1185}
1186
1187/// Execute the [ExecutionPlan] and collect the results in memory
1188pub async fn collect_partitioned(
1189    plan: Arc<dyn ExecutionPlan>,
1190    context: Arc<TaskContext>,
1191) -> Result<Vec<Vec<RecordBatch>>> {
1192    let streams = execute_stream_partitioned(plan, context)?;
1193
1194    let mut join_set = JoinSet::new();
1195    // Execute the plan and collect the results into batches.
1196    streams.into_iter().enumerate().for_each(|(idx, stream)| {
1197        join_set.spawn(async move {
1198            let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
1199            (idx, result)
1200        });
1201    });
1202
1203    let mut batches = vec![];
1204    // Note that currently this doesn't identify the thread that panicked
1205    //
1206    // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
1207    // once it is stable
1208    while let Some(result) = join_set.join_next().await {
1209        match result {
1210            Ok((idx, res)) => batches.push((idx, res?)),
1211            Err(e) => {
1212                if e.is_panic() {
1213                    std::panic::resume_unwind(e.into_panic());
1214                } else {
1215                    unreachable!();
1216                }
1217            }
1218        }
1219    }
1220
1221    batches.sort_by_key(|(idx, _)| *idx);
1222    let batches = batches.into_iter().map(|(_, batch)| batch).collect();
1223
1224    Ok(batches)
1225}
1226
1227/// Execute the [ExecutionPlan] and return a vec with one stream per output
1228/// partition
1229///
1230/// # Aborting Execution
1231///
1232/// Dropping the stream will abort the execution of the query, and free up
1233/// any allocated resources
1234pub fn execute_stream_partitioned(
1235    plan: Arc<dyn ExecutionPlan>,
1236    context: Arc<TaskContext>,
1237) -> Result<Vec<SendableRecordBatchStream>> {
1238    let num_partitions = plan.output_partitioning().partition_count();
1239    let mut streams = Vec::with_capacity(num_partitions);
1240    for i in 0..num_partitions {
1241        streams.push(plan.execute(i, Arc::clone(&context))?);
1242    }
1243    Ok(streams)
1244}
1245
1246/// Executes an input stream and ensures that the resulting stream adheres to
1247/// the `not null` constraints specified in the `sink_schema`.
1248///
1249/// # Arguments
1250///
1251/// * `input` - An execution plan
1252/// * `sink_schema` - The schema to be applied to the output stream
1253/// * `partition` - The partition index to be executed
1254/// * `context` - The task context
1255///
1256/// # Returns
1257///
1258/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
1259///
1260/// This function first executes the given input plan for the specified partition
1261/// and context. It then checks if there are any columns in the input that might
1262/// violate the `not null` constraints specified in the `sink_schema`. If there are
1263/// such columns, it wraps the resulting stream to enforce the `not null` constraints
1264/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1265pub fn execute_input_stream(
1266    input: Arc<dyn ExecutionPlan>,
1267    sink_schema: SchemaRef,
1268    partition: usize,
1269    context: Arc<TaskContext>,
1270) -> Result<SendableRecordBatchStream> {
1271    let input_stream = input.execute(partition, context)?;
1272
1273    debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
1274
1275    // Find input columns that may violate the not null constraint.
1276    let risky_columns: Vec<_> = sink_schema
1277        .fields()
1278        .iter()
1279        .zip(input.schema().fields().iter())
1280        .enumerate()
1281        .filter_map(|(idx, (sink_field, input_field))| {
1282            (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
1283        })
1284        .collect();
1285
1286    if risky_columns.is_empty() {
1287        Ok(input_stream)
1288    } else {
1289        // Check not null constraint on the input stream
1290        Ok(Box::pin(RecordBatchStreamAdapter::new(
1291            sink_schema,
1292            input_stream
1293                .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
1294        )))
1295    }
1296}
1297
1298/// Checks a `RecordBatch` for `not null` constraints on specified columns.
1299///
1300/// # Arguments
1301///
1302/// * `batch` - The `RecordBatch` to be checked
1303/// * `column_indices` - A vector of column indices that should be checked for
1304///   `not null` constraints.
1305///
1306/// # Returns
1307///
1308/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
1309///
1310/// This function iterates over the specified column indices and ensures that none
1311/// of the columns contain null values. If any column contains null values, an error
1312/// is returned.
1313pub fn check_not_null_constraints(
1314    batch: RecordBatch,
1315    column_indices: &Vec<usize>,
1316) -> Result<RecordBatch> {
1317    for &index in column_indices {
1318        if batch.num_columns() <= index {
1319            return exec_err!(
1320                "Invalid batch column count {} expected > {}",
1321                batch.num_columns(),
1322                index
1323            );
1324        }
1325
1326        if batch
1327            .column(index)
1328            .logical_nulls()
1329            .map(|nulls| nulls.null_count())
1330            .unwrap_or_default()
1331            > 0
1332        {
1333            return exec_err!(
1334                "Invalid batch column at '{}' has null but schema specifies non-nullable",
1335                index
1336            );
1337        }
1338    }
1339
1340    Ok(batch)
1341}
1342
1343/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1344pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1345    let formatted = displayable(plan.as_ref()).indent(true).to_string();
1346    let actual: Vec<&str> = formatted.trim().lines().collect();
1347    actual.iter().map(|elem| (*elem).to_string()).collect()
1348}
1349
1350/// Indicates the effect an execution plan operator will have on the cardinality
1351/// of its input stream
1352pub enum CardinalityEffect {
1353    /// Unknown effect. This is the default
1354    Unknown,
1355    /// The operator is guaranteed to produce exactly one row for
1356    /// each input row
1357    Equal,
1358    /// The operator may produce fewer output rows than it receives input rows
1359    LowerEqual,
1360    /// The operator may produce more output rows than it receives input rows
1361    GreaterEqual,
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366    use std::any::Any;
1367    use std::sync::Arc;
1368
1369    use super::*;
1370    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1371
1372    use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1373    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1374    use datafusion_common::{Result, Statistics};
1375    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1376
1377    #[derive(Debug)]
1378    pub struct EmptyExec;
1379
1380    impl EmptyExec {
1381        pub fn new(_schema: SchemaRef) -> Self {
1382            Self
1383        }
1384    }
1385
1386    impl DisplayAs for EmptyExec {
1387        fn fmt_as(
1388            &self,
1389            _t: DisplayFormatType,
1390            _f: &mut std::fmt::Formatter,
1391        ) -> std::fmt::Result {
1392            unimplemented!()
1393        }
1394    }
1395
1396    impl ExecutionPlan for EmptyExec {
1397        fn name(&self) -> &'static str {
1398            Self::static_name()
1399        }
1400
1401        fn as_any(&self) -> &dyn Any {
1402            self
1403        }
1404
1405        fn properties(&self) -> &PlanProperties {
1406            unimplemented!()
1407        }
1408
1409        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1410            vec![]
1411        }
1412
1413        fn with_new_children(
1414            self: Arc<Self>,
1415            _: Vec<Arc<dyn ExecutionPlan>>,
1416        ) -> Result<Arc<dyn ExecutionPlan>> {
1417            unimplemented!()
1418        }
1419
1420        fn execute(
1421            &self,
1422            _partition: usize,
1423            _context: Arc<TaskContext>,
1424        ) -> Result<SendableRecordBatchStream> {
1425            unimplemented!()
1426        }
1427
1428        fn statistics(&self) -> Result<Statistics> {
1429            unimplemented!()
1430        }
1431
1432        fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1433            unimplemented!()
1434        }
1435    }
1436
1437    #[derive(Debug)]
1438    pub struct RenamedEmptyExec;
1439
1440    impl RenamedEmptyExec {
1441        pub fn new(_schema: SchemaRef) -> Self {
1442            Self
1443        }
1444    }
1445
1446    impl DisplayAs for RenamedEmptyExec {
1447        fn fmt_as(
1448            &self,
1449            _t: DisplayFormatType,
1450            _f: &mut std::fmt::Formatter,
1451        ) -> std::fmt::Result {
1452            unimplemented!()
1453        }
1454    }
1455
1456    impl ExecutionPlan for RenamedEmptyExec {
1457        fn name(&self) -> &'static str {
1458            Self::static_name()
1459        }
1460
1461        fn static_name() -> &'static str
1462        where
1463            Self: Sized,
1464        {
1465            "MyRenamedEmptyExec"
1466        }
1467
1468        fn as_any(&self) -> &dyn Any {
1469            self
1470        }
1471
1472        fn properties(&self) -> &PlanProperties {
1473            unimplemented!()
1474        }
1475
1476        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1477            vec![]
1478        }
1479
1480        fn with_new_children(
1481            self: Arc<Self>,
1482            _: Vec<Arc<dyn ExecutionPlan>>,
1483        ) -> Result<Arc<dyn ExecutionPlan>> {
1484            unimplemented!()
1485        }
1486
1487        fn execute(
1488            &self,
1489            _partition: usize,
1490            _context: Arc<TaskContext>,
1491        ) -> Result<SendableRecordBatchStream> {
1492            unimplemented!()
1493        }
1494
1495        fn statistics(&self) -> Result<Statistics> {
1496            unimplemented!()
1497        }
1498
1499        fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1500            unimplemented!()
1501        }
1502    }
1503
1504    #[test]
1505    fn test_execution_plan_name() {
1506        let schema1 = Arc::new(Schema::empty());
1507        let default_name_exec = EmptyExec::new(schema1);
1508        assert_eq!(default_name_exec.name(), "EmptyExec");
1509
1510        let schema2 = Arc::new(Schema::empty());
1511        let renamed_exec = RenamedEmptyExec::new(schema2);
1512        assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1513        assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1514    }
1515
1516    /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1517    /// be called from a trait object.
1518    /// Related ticket: https://github.com/apache/datafusion/pull/11047
1519    #[allow(dead_code)]
1520    fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1521        let _ = plan.name();
1522    }
1523
1524    #[test]
1525    fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1526        check_not_null_constraints(
1527            RecordBatch::try_new(
1528                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1529                vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1530            )?,
1531            &vec![0],
1532        )?;
1533        Ok(())
1534    }
1535
1536    #[test]
1537    fn test_check_not_null_constraints_reject_null() -> Result<()> {
1538        let result = check_not_null_constraints(
1539            RecordBatch::try_new(
1540                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1541                vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1542            )?,
1543            &vec![0],
1544        );
1545        assert!(result.is_err());
1546        assert_eq!(
1547            result.err().unwrap().strip_backtrace(),
1548            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1549        );
1550        Ok(())
1551    }
1552
1553    #[test]
1554    fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1555        // some null value inside REE array
1556        let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1557        let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1558        let run_end_array = RunArray::try_new(&run_ends, &values)?;
1559        let result = check_not_null_constraints(
1560            RecordBatch::try_new(
1561                Arc::new(Schema::new(vec![Field::new(
1562                    "a",
1563                    run_end_array.data_type().to_owned(),
1564                    true,
1565                )])),
1566                vec![Arc::new(run_end_array)],
1567            )?,
1568            &vec![0],
1569        );
1570        assert!(result.is_err());
1571        assert_eq!(
1572            result.err().unwrap().strip_backtrace(),
1573            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1574        );
1575        Ok(())
1576    }
1577
1578    #[test]
1579    fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1580        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1581        let keys = Int32Array::from(vec![0, 1, 2, 3]);
1582        let dictionary = DictionaryArray::new(keys, values);
1583        let result = check_not_null_constraints(
1584            RecordBatch::try_new(
1585                Arc::new(Schema::new(vec![Field::new(
1586                    "a",
1587                    dictionary.data_type().to_owned(),
1588                    true,
1589                )])),
1590                vec![Arc::new(dictionary)],
1591            )?,
1592            &vec![0],
1593        );
1594        assert!(result.is_err());
1595        assert_eq!(
1596            result.err().unwrap().strip_backtrace(),
1597            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1598        );
1599        Ok(())
1600    }
1601
1602    #[test]
1603    fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1604        // some null value marked out by dictionary array
1605        let values = Arc::new(Int32Array::from(vec![
1606            Some(1),
1607            None, // this null value is masked by dictionary keys
1608            Some(3),
1609            Some(4),
1610        ]));
1611        let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1612        let dictionary = DictionaryArray::new(keys, values);
1613        check_not_null_constraints(
1614            RecordBatch::try_new(
1615                Arc::new(Schema::new(vec![Field::new(
1616                    "a",
1617                    dictionary.data_type().to_owned(),
1618                    true,
1619                )])),
1620                vec![Arc::new(dictionary)],
1621            )?,
1622            &vec![0],
1623        )?;
1624        Ok(())
1625    }
1626
1627    #[test]
1628    fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1629        // null value of Null type
1630        let result = check_not_null_constraints(
1631            RecordBatch::try_new(
1632                Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1633                vec![Arc::new(NullArray::new(3))],
1634            )?,
1635            &vec![0],
1636        );
1637        assert!(result.is_err());
1638        assert_eq!(
1639            result.err().unwrap().strip_backtrace(),
1640            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1641        );
1642        Ok(())
1643    }
1644}