Skip to main content

datafusion_physical_plan/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19use crate::filter_pushdown::{
20    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21    FilterPushdownPropagation,
22};
23pub use crate::metrics::Metric;
24pub use crate::ordering::InputOrderMode;
25use crate::sort_pushdown::SortOrderPushdownResult;
26pub use crate::stream::EmptyRecordBatchStream;
27
28use arrow_schema::Schema;
29pub use datafusion_common::hash_utils;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31pub use datafusion_common::utils::project_schema;
32pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
33pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
34pub use datafusion_expr::{Accumulator, ColumnarValue};
35pub use datafusion_physical_expr::window::WindowExpr;
36pub use datafusion_physical_expr::{
37    Distribution, Partitioning, PhysicalExpr, expressions,
38};
39
40use std::any::Any;
41use std::fmt::Debug;
42use std::sync::{Arc, LazyLock};
43
44use crate::coalesce_partitions::CoalescePartitionsExec;
45use crate::display::DisplayableExecutionPlan;
46use crate::metrics::MetricsSet;
47use crate::projection::ProjectionExec;
48use crate::stream::RecordBatchStreamAdapter;
49
50use arrow::array::{Array, RecordBatch};
51use arrow::datatypes::SchemaRef;
52use datafusion_common::config::ConfigOptions;
53use datafusion_common::{
54    Constraints, DataFusionError, Result, assert_eq_or_internal_err,
55    assert_or_internal_err, exec_err,
56};
57use datafusion_common_runtime::JoinSet;
58use datafusion_execution::TaskContext;
59use datafusion_physical_expr::EquivalenceProperties;
60use datafusion_physical_expr_common::sort_expr::{
61    LexOrdering, OrderingRequirements, PhysicalSortExpr,
62};
63
64use futures::stream::{StreamExt, TryStreamExt};
65
66/// Represent nodes in the DataFusion Physical Plan.
67///
68/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
69/// [`RecordBatch`] that incrementally computes a partition of the
70/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
71/// details on partitioning.
72///
73/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
74/// properties of the output to the DataFusion optimizer, and methods such as
75/// [`required_input_distribution`] and [`required_input_ordering`] express
76/// requirements of the `ExecutionPlan` from its input.
77///
78/// [`ExecutionPlan`] can be displayed in a simplified form using the
79/// return value from [`displayable`] in addition to the (normally
80/// quite verbose) `Debug` output.
81///
82/// [`execute`]: ExecutionPlan::execute
83/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
84/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
85///
86/// # Examples
87///
88/// See [`datafusion-examples`] for examples, including
89/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom
90/// `ExecutionPlan` with memory tracking and spilling support.
91///
92/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
93/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
94pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
95    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
96    ///
97    /// Implementation note: this method can just proxy to
98    /// [`static_name`](ExecutionPlan::static_name) if no special action is
99    /// needed. It doesn't provide a default implementation like that because
100    /// this method doesn't require the `Sized` constrain to allow a wilder
101    /// range of use cases.
102    fn name(&self) -> &str;
103
104    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
105    /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
106    fn static_name() -> &'static str
107    where
108        Self: Sized,
109    {
110        let full_name = std::any::type_name::<Self>();
111        let maybe_start_idx = full_name.rfind(':');
112        match maybe_start_idx {
113            Some(start_idx) => &full_name[start_idx + 1..],
114            None => "UNKNOWN",
115        }
116    }
117
118    /// Returns the plan that provides this plan's public
119    /// [`ExecutionPlan`] downcast identity.
120    ///
121    /// This hook is for wrapper nodes that delegate their public downcast
122    /// identity to another plan while adding cross-cutting behavior such as
123    /// instrumentation. The default implementation returns `None`, meaning this
124    /// plan's concrete type is used for type introspection.
125    ///
126    /// Most `ExecutionPlan` implementations should use the default `None`;
127    /// override this only for wrapper plans that intentionally delegate their
128    /// public downcast identity to another plan.
129    ///
130    /// The `is` and `downcast_ref` helpers follow the returned delegate instead
131    /// of checking the current concrete type, making intermediate delegating
132    /// wrappers invisible to normal downcast-based inspection.
133    ///
134    /// Implementations that opt in should return the delegate plan, not `self`.
135    ///
136    /// This is independent from [`Self::children`] and should not be used for
137    /// plan traversal or optimizer rewrites.
138    fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
139        None
140    }
141
142    /// Get the schema for this execution plan
143    fn schema(&self) -> SchemaRef {
144        Arc::clone(self.properties().schema())
145    }
146
147    /// Return properties of the output of the `ExecutionPlan`, such as output
148    /// ordering(s), partitioning information etc.
149    ///
150    /// This information is available via methods on [`ExecutionPlanProperties`]
151    /// trait, which is implemented for all `ExecutionPlan`s.
152    fn properties(&self) -> &Arc<PlanProperties>;
153
154    /// Returns an error if this individual node does not conform to its invariants.
155    /// These invariants are typically only checked in debug mode.
156    ///
157    /// A default set of invariants is provided in the [check_default_invariants] function.
158    /// The default implementation of `check_invariants` calls this function.
159    /// Extension nodes can provide their own invariants.
160    fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
161        check_default_invariants(self, check)
162    }
163
164    /// Specifies the data distribution requirements for all the
165    /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
166    fn required_input_distribution(&self) -> Vec<Distribution> {
167        vec![Distribution::UnspecifiedDistribution; self.children().len()]
168    }
169
170    /// Specifies the ordering required for all of the children of this
171    /// `ExecutionPlan`.
172    ///
173    /// For each child, it's the local ordering requirement within
174    /// each partition rather than the global ordering
175    ///
176    /// NOTE that checking `!is_empty()` does **not** check for a
177    /// required input ordering. Instead, the correct check is that at
178    /// least one entry must be `Some`
179    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
180        vec![None; self.children().len()]
181    }
182
183    /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
184    /// rows within or between partitions.
185    ///
186    /// For example, Projection, Filter, and Limit maintain the order
187    /// of inputs -- they may transform values (Projection) or not
188    /// produce the same number of rows that went in (Filter and
189    /// Limit), but the rows that are produced go in the same way.
190    ///
191    /// DataFusion uses this metadata to apply certain optimizations
192    /// such as automatically repartitioning correctly.
193    ///
194    /// The default implementation returns `false`
195    ///
196    /// WARNING: if you override this default, you *MUST* ensure that
197    /// the `ExecutionPlan`'s maintains the ordering invariant or else
198    /// DataFusion may produce incorrect results.
199    fn maintains_input_order(&self) -> Vec<bool> {
200        vec![false; self.children().len()]
201    }
202
203    /// Specifies whether the `ExecutionPlan` benefits from increased
204    /// parallelization at its input for each child.
205    ///
206    /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
207    /// its corresponding child (and thus from more parallelism). For
208    /// `ExecutionPlan` that do very little work the overhead of extra
209    /// parallelism may outweigh any benefits
210    ///
211    /// The default implementation returns `true` unless this `ExecutionPlan`
212    /// has signalled it requires a single child input partition.
213    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
214        // By default try to maximize parallelism with more CPUs if
215        // possible
216        self.required_input_distribution()
217            .into_iter()
218            .map(|dist| !matches!(dist, Distribution::SinglePartition))
219            .collect()
220    }
221
222    /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
223    /// The returned list will be empty for leaf nodes such as scans, will contain
224    /// a single value for unary nodes, or two values for binary nodes (such as
225    /// joins).
226    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
227
228    /// Returns a new `ExecutionPlan` where all existing children were replaced
229    /// by the `children`, in order
230    fn with_new_children(
231        self: Arc<Self>,
232        children: Vec<Arc<dyn ExecutionPlan>>,
233    ) -> Result<Arc<dyn ExecutionPlan>>;
234
235    /// Reset any internal state within this [`ExecutionPlan`].
236    ///
237    /// This method is called when an [`ExecutionPlan`] needs to be re-executed,
238    /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
239    /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
240    /// are reset to their initial state.
241    ///
242    /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
243    /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
244    /// necessarily resetting any internal state. Implementations that require resetting of some
245    /// internal state should override this method to provide the necessary logic.
246    ///
247    /// This method should *not* reset state recursively for children, as it is expected that
248    /// it will be called from within a walk of the execution plan tree so that it will be called on each child later
249    /// or was already called on each child.
250    ///
251    /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
252    /// thus it is expected that any cached plan properties will remain valid after the reset.
253    ///
254    /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
255    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
256        let children = self.children().into_iter().cloned().collect();
257        self.with_new_children(children)
258    }
259
260    /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
261    /// produce `target_partitions` partitions.
262    ///
263    /// If the `ExecutionPlan` does not support changing its partitioning,
264    /// returns `Ok(None)` (the default).
265    ///
266    /// It is the `ExecutionPlan` can increase its partitioning, but not to the
267    /// `target_partitions`, it may return an ExecutionPlan with fewer
268    /// partitions. This might happen, for example, if each new partition would
269    /// be too small to be efficiently processed individually.
270    ///
271    /// The DataFusion optimizer attempts to use as many threads as possible by
272    /// repartitioning its inputs to match the target number of threads
273    /// available (`target_partitions`). Some data sources, such as the built in
274    /// CSV and Parquet readers, implement this method as they are able to read
275    /// from their input files in parallel, regardless of how the source data is
276    /// split amongst files.
277    fn repartitioned(
278        &self,
279        _target_partitions: usize,
280        _config: &ConfigOptions,
281    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
282        Ok(None)
283    }
284
285    /// Begin execution of `partition`, returning a [`Stream`] of
286    /// [`RecordBatch`]es.
287    ///
288    /// # Notes
289    ///
290    /// The `execute` method itself is not `async` but it returns an `async`
291    /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
292    /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
293    /// Most `ExecutionPlan`s should not do any work before the first
294    /// `RecordBatch` is requested from the stream.
295    ///
296    /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
297    /// [`Stream`] into a [`SendableRecordBatchStream`].
298    ///
299    /// Using `async` `Streams` allows for network I/O during execution and
300    /// takes advantage of Rust's built in support for `async` continuations and
301    /// crate ecosystem.
302    ///
303    /// [`Stream`]: futures::stream::Stream
304    /// [`StreamExt`]: futures::stream::StreamExt
305    /// [`TryStreamExt`]: futures::stream::TryStreamExt
306    /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
307    ///
308    /// # Error handling
309    ///
310    /// Any error that occurs during execution is sent as an `Err` in the output
311    /// stream.
312    ///
313    /// `ExecutionPlan` implementations in DataFusion cancel additional work
314    /// immediately once an error occurs. The rationale is that if the overall
315    /// query will return an error,  any additional work such as continued
316    /// polling of inputs will be wasted as it will be thrown away.
317    ///
318    /// # Cancellation / Aborting Execution
319    ///
320    /// The [`Stream`] that is returned must ensure that any allocated resources
321    /// are freed when the stream itself is dropped. This is particularly
322    /// important for [`spawn`]ed tasks or threads. Unless care is taken to
323    /// "abort" such tasks, they may continue to consume resources even after
324    /// the plan is dropped, generating intermediate results that are never
325    /// used.
326    /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
327    ///
328    /// To enable timely cancellation, the [`Stream`] that is returned must not
329    /// block the CPU indefinitely and must yield back to the tokio runtime regularly.
330    /// In a typical [`ExecutionPlan`], this automatically happens unless there are
331    /// special circumstances; e.g. when the computational complexity of processing a
332    /// batch is superlinear. See this [general guideline][async-guideline] for more context
333    /// on this point, which explains why one should avoid spending a long time without
334    /// reaching an `await`/yield point in asynchronous runtimes.
335    /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
336    /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
337    /// [`tokio::task::yield_now()`] when appropriate.
338    /// In special cases that warrant manual yielding, determination for "regularly" may be
339    /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
340    /// a timer (being careful with the overhead-heavy system call needed to take the time), or by
341    /// counting rows or batches.
342    ///
343    /// The [cancellation benchmark] tracks some cases of how quickly queries can
344    /// be cancelled.
345    ///
346    /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
347    /// for structures to help ensure all background tasks are cancelled.
348    ///
349    /// [`spawn`]: tokio::task::spawn
350    /// [cancellation benchmark]: https://github.com/apache/datafusion/blob/main/benchmarks/README.md#cancellation
351    /// [`JoinSet`]: datafusion_common_runtime::JoinSet
352    /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
353    /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
354    /// [`Poll::Pending`]: std::task::Poll::Pending
355    /// [async-guideline]: https://ryhl.io/blog/async-what-is-blocking/
356    ///
357    /// # Implementation Examples
358    ///
359    /// While `async` `Stream`s have a non trivial learning curve, the
360    /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
361    /// which help simplify many common operations.
362    ///
363    /// Here are some common patterns:
364    ///
365    /// ## Return Precomputed `RecordBatch`
366    ///
367    /// We can return a precomputed `RecordBatch` as a `Stream`:
368    ///
369    /// ```
370    /// # use std::sync::Arc;
371    /// # use arrow::array::RecordBatch;
372    /// # use arrow::datatypes::SchemaRef;
373    /// # use datafusion_common::Result;
374    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
375    /// # use datafusion_physical_plan::memory::MemoryStream;
376    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
377    /// struct MyPlan {
378    ///     batch: RecordBatch,
379    /// }
380    ///
381    /// impl MyPlan {
382    ///     fn execute(
383    ///         &self,
384    ///         partition: usize,
385    ///         context: Arc<TaskContext>,
386    ///     ) -> Result<SendableRecordBatchStream> {
387    ///         // use functions from futures crate convert the batch into a stream
388    ///         let fut = futures::future::ready(Ok(self.batch.clone()));
389    ///         let stream = futures::stream::once(fut);
390    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
391    ///             self.batch.schema(),
392    ///             stream,
393    ///         )))
394    ///     }
395    /// }
396    /// ```
397    ///
398    /// ## Lazily (async) Compute `RecordBatch`
399    ///
400    /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
401    ///
402    /// ```
403    /// # use std::sync::Arc;
404    /// # use arrow::array::RecordBatch;
405    /// # use arrow::datatypes::SchemaRef;
406    /// # use datafusion_common::Result;
407    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
408    /// # use datafusion_physical_plan::memory::MemoryStream;
409    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
410    /// struct MyPlan {
411    ///     schema: SchemaRef,
412    /// }
413    ///
414    /// /// Returns a single batch when the returned stream is polled
415    /// async fn get_batch() -> Result<RecordBatch> {
416    ///     todo!()
417    /// }
418    ///
419    /// impl MyPlan {
420    ///     fn execute(
421    ///         &self,
422    ///         partition: usize,
423    ///         context: Arc<TaskContext>,
424    ///     ) -> Result<SendableRecordBatchStream> {
425    ///         let fut = get_batch();
426    ///         let stream = futures::stream::once(fut);
427    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
428    ///             self.schema.clone(),
429    ///             stream,
430    ///         )))
431    ///     }
432    /// }
433    /// ```
434    ///
435    /// ## Lazily (async) create a Stream
436    ///
437    /// If you need to create the return `Stream` using an `async` function,
438    /// you can do so by flattening the result:
439    ///
440    /// ```
441    /// # use std::sync::Arc;
442    /// # use arrow::array::RecordBatch;
443    /// # use arrow::datatypes::SchemaRef;
444    /// # use futures::TryStreamExt;
445    /// # use datafusion_common::Result;
446    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
447    /// # use datafusion_physical_plan::memory::MemoryStream;
448    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
449    /// struct MyPlan {
450    ///     schema: SchemaRef,
451    /// }
452    ///
453    /// /// async function that returns a stream
454    /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
455    ///     todo!()
456    /// }
457    ///
458    /// impl MyPlan {
459    ///     fn execute(
460    ///         &self,
461    ///         partition: usize,
462    ///         context: Arc<TaskContext>,
463    ///     ) -> Result<SendableRecordBatchStream> {
464    ///         // A future that yields a stream
465    ///         let fut = get_batch_stream();
466    ///         // Use TryStreamExt::try_flatten to flatten the stream of streams
467    ///         let stream = futures::stream::once(fut).try_flatten();
468    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(
469    ///             self.schema.clone(),
470    ///             stream,
471    ///         )))
472    ///     }
473    /// }
474    /// ```
475    fn execute(
476        &self,
477        partition: usize,
478        context: Arc<TaskContext>,
479    ) -> Result<SendableRecordBatchStream>;
480
481    /// Return a snapshot of the set of [`Metric`]s for this
482    /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
483    ///
484    /// While the values of the metrics in the returned
485    /// [`MetricsSet`]s may change as execution progresses, the
486    /// specific metrics will not.
487    ///
488    /// Once `self.execute()` has returned (technically the future is
489    /// resolved) for all available partitions, the set of metrics
490    /// should be complete. If this function is called prior to
491    /// `execute()` new metrics may appear in subsequent calls.
492    fn metrics(&self) -> Option<MetricsSet> {
493        None
494    }
495
496    /// Returns statistics for a specific partition of this `ExecutionPlan` node.
497    /// If statistics are not available, should return [`Statistics::new_unknown`]
498    /// (the default), not an error.
499    /// If `partition` is `None`, it returns statistics for the entire plan.
500    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
501        if let Some(idx) = partition {
502            // Validate partition index
503            let partition_count = self.properties().partitioning.partition_count();
504            assert_or_internal_err!(
505                idx < partition_count,
506                "Invalid partition index: {}, the partition count is {}",
507                idx,
508                partition_count
509            );
510        }
511        Ok(Arc::new(Statistics::new_unknown(&self.schema())))
512    }
513
514    /// Returns `true` if a limit can be safely pushed down through this
515    /// `ExecutionPlan` node.
516    ///
517    /// If this method returns `true`, and the query plan contains a limit at
518    /// the output of this node, DataFusion will push the limit to the input
519    /// of this node.
520    fn supports_limit_pushdown(&self) -> bool {
521        false
522    }
523
524    /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
525    /// fetch limits. Returns `None` otherwise.
526    ///
527    /// See physical optimizer rule [`limit_pushdown`] for details.
528    ///
529    /// [`limit_pushdown`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/limit_pushdown/index.html
530    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
531        None
532    }
533
534    /// Gets the fetch count for the operator, `None` means there is no fetch.
535    fn fetch(&self) -> Option<usize> {
536        None
537    }
538
539    /// Gets the effect on cardinality, if known
540    fn cardinality_effect(&self) -> CardinalityEffect {
541        CardinalityEffect::Unknown
542    }
543
544    /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
545    ///
546    /// If the operator supports this optimization, the resulting plan will be:
547    /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
548    /// Otherwise, it returns the current `ExecutionPlan` as-is.
549    ///
550    /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
551    /// or not possible, or `Err` on failure.
552    fn try_swapping_with_projection(
553        &self,
554        _projection: &ProjectionExec,
555    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
556        Ok(None)
557    }
558
559    /// Collect filters that this node can push down to its children.
560    /// Filters that are being pushed down from parents are passed in,
561    /// and the node may generate additional filters to push down.
562    /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
563    /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`:
564    /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent
565    ///    filters so it only returns that `FilterExec` wants to push down its own predicate.
566    /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from
567    ///    `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key)
568    ///    but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
569    /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec`
570    ///    and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything
571    ///    since it has no children and no additional filters to push down.
572    ///    It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse
573    ///    up the plan that `DataSourceExec` can actually bind the filters.
574    ///
575    /// The default implementation bars all parent filters from being pushed down and adds no new filters.
576    /// This is the safest option, making filter pushdown opt-in on a per-node pasis.
577    ///
578    /// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
579    /// Depending on the phase the operator may or may not be allowed to modify the plan.
580    /// See [`FilterPushdownPhase`] for more details.
581    fn gather_filters_for_pushdown(
582        &self,
583        _phase: FilterPushdownPhase,
584        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
585        _config: &ConfigOptions,
586    ) -> Result<FilterDescription> {
587        Ok(FilterDescription::all_unsupported(
588            &parent_filters,
589            &self.children(),
590        ))
591    }
592
593    /// Handle the result of a child pushdown.
594    ///
595    /// This method is called as we recurse back up the plan tree after pushing
596    /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
597    /// It allows the current node to process the results of filter pushdown from
598    /// its children, deciding whether to absorb filters, modify the plan, or pass
599    /// filters back up to its parent.
600    ///
601    /// **Purpose and Context:**
602    /// Filter pushdown is a critical optimization in DataFusion that aims to
603    /// reduce the amount of data processed by applying filters as early as
604    /// possible in the query plan. This method is part of the second phase of
605    /// filter pushdown, where results are propagated back up the tree after
606    /// being pushed down. Each node can inspect the pushdown results from its
607    /// children and decide how to handle any unapplied filters, potentially
608    /// optimizing the plan structure or filter application.
609    ///
610    /// **Behavior in Different Nodes:**
611    /// - For a `DataSourceExec`, this often means absorbing the filters to apply
612    ///   them during the scan phase (late materialization), reducing the data
613    ///   read from the source.
614    /// - A `FilterExec` may absorb any filters its children could not handle,
615    ///   combining them with its own predicate. If no filters remain (i.e., the
616    ///   predicate becomes trivially true), it may remove itself from the plan
617    ///   altogether. It typically marks parent filters as supported, indicating
618    ///   they have been handled.
619    /// - A `HashJoinExec` might ignore the pushdown result if filters need to
620    ///   be applied during the join operation. It passes the parent filters back
621    ///   up wrapped in [`FilterPushdownPropagation::if_any`], discarding
622    ///   any self-filters from children.
623    ///
624    /// **Example Walkthrough:**
625    /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
626    /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
627    ///    `FilterExec`, the filter `f1` is gathered and pushed down to
628    ///    `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
629    ///    the join or add its own filters (e.g., a min-max filter from the build side),
630    ///    then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
631    ///    has no children to push to, so it prepares to handle filters in the
632    ///    upward phase.
633    /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
634    ///    `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
635    ///    for late materialization during scanning, marking them as supported.
636    ///    `HashJoinExec` receives the result, decides whether to apply any
637    ///    remaining filters during the join, and passes unhandled filters back
638    ///    up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
639    ///    updates its predicate if necessary, or removes itself if the predicate
640    ///    becomes trivial (e.g., `lit(true)`), and marks filters as supported
641    ///    for its parent.
642    ///
643    /// The default implementation is a no-op that passes the result of pushdown
644    /// from the children to its parent transparently, ensuring no filters are
645    /// lost if a node does not override this behavior.
646    ///
647    /// **Notes for Implementation:**
648    /// When returning filters via [`FilterPushdownPropagation`], the order of
649    /// filters need not match the order they were passed in via
650    /// `child_pushdown_result`. However, preserving the order is recommended for
651    /// debugging and ease of reasoning about the resulting plans.
652    ///
653    /// **Helper Methods for Customization:**
654    /// There are various helper methods to simplify implementing this method:
655    /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as
656    ///   supported as long as at least one child supports them.
657    /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as
658    ///   supported as long as all children support them.
659    /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters
660    ///   to the propagation result, indicating which filters are supported by
661    ///   the current node.
662    /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
663    ///   current node in the propagation result, used if the node
664    ///   has modified its plan based on the pushdown results.
665    ///
666    /// **Filter Pushdown Phases:**
667    /// There are two different phases in filter pushdown (`Pre` and others),
668    /// which some operators may handle differently. Depending on the phase, the
669    /// operator may or may not be allowed to modify the plan. See
670    /// [`FilterPushdownPhase`] for more details on phase-specific behavior.
671    ///
672    /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported
673    fn handle_child_pushdown_result(
674        &self,
675        _phase: FilterPushdownPhase,
676        child_pushdown_result: ChildPushdownResult,
677        _config: &ConfigOptions,
678    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
679        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
680    }
681
682    /// Injects arbitrary run-time state into this execution plan, returning a new plan
683    /// instance that incorporates that state *if* it is relevant to the concrete
684    /// node implementation.
685    ///
686    /// This is a generic entry point: the `state` can be any type wrapped in
687    /// `Arc<dyn Any + Send + Sync>`.  A node that cares about the state should
688    /// down-cast it to the concrete type it expects and, if successful, return a
689    /// modified copy of itself that captures the provided value.  If the state is
690    /// not applicable, the default behaviour is to return `None` so that parent
691    /// nodes can continue propagating the attempt further down the plan tree.
692    ///
693    /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
694    /// down-casts the supplied state to an `Arc<WorkTable>`
695    /// in order to wire up the working table used during recursive-CTE execution.
696    /// Similar patterns can be followed by custom nodes that need late-bound
697    /// dependencies or shared state.
698    fn with_new_state(
699        &self,
700        _state: Arc<dyn Any + Send + Sync>,
701    ) -> Option<Arc<dyn ExecutionPlan>> {
702        None
703    }
704
705    /// Try to push down sort ordering requirements to this node.
706    ///
707    /// This method is called during sort pushdown optimization to determine if this
708    /// node can optimize for a requested sort ordering. Implementations should:
709    ///
710    /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact
711    ///   ordering (allowing the Sort operator to be removed)
712    /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the
713    ///   ordering but cannot guarantee perfect sorting (Sort operator is kept)
714    /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize
715    ///   for the ordering
716    ///
717    /// For transparent nodes (that preserve ordering), implement this to delegate to
718    /// children and wrap the result with a new instance of this node.
719    ///
720    /// Default implementation returns `Unsupported`.
721    fn try_pushdown_sort(
722        &self,
723        _order: &[PhysicalSortExpr],
724    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
725        Ok(SortOrderPushdownResult::Unsupported)
726    }
727
728    /// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity.
729    ///
730    /// This is used to signal to data sources that the output ordering must be
731    /// preserved, even if it might be more efficient to ignore it (e.g. by
732    /// skipping some row groups in Parquet).
733    ///
734    fn with_preserve_order(
735        &self,
736        _preserve_order: bool,
737    ) -> Option<Arc<dyn ExecutionPlan>> {
738        None
739    }
740}
741
742impl dyn ExecutionPlan {
743    /// Returns `true` if the plan is of type `T`.
744    ///
745    /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
746    /// to it.
747    ///
748    /// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
749    /// called on `Arc<dyn ExecutionPlan>` via auto-deref.
750    pub fn is<T: ExecutionPlan>(&self) -> bool {
751        match self.downcast_delegate() {
752            Some(delegate) => delegate.is::<T>(),
753            None => (self as &dyn Any).is::<T>(),
754        }
755    }
756
757    /// Attempts to downcast this plan to a concrete type `T`, returning `None`
758    /// if the plan is not of that type.
759    ///
760    /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
761    /// to it.
762    ///
763    /// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref,
764    /// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
765    /// downcast the `Arc` itself.
766    pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> {
767        match self.downcast_delegate() {
768            Some(delegate) => delegate.downcast_ref::<T>(),
769            None => (self as &dyn Any).downcast_ref(),
770        }
771    }
772}
773
774/// [`ExecutionPlan`] Invariant Level
775///
776/// What set of assertions ([Invariant]s)  holds for a particular `ExecutionPlan`
777///
778/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
779#[derive(Clone, Copy)]
780pub enum InvariantLevel {
781    /// Invariants that are always true for the [`ExecutionPlan`] node
782    /// such as the number of expected children.
783    Always,
784    /// Invariants that must hold true for the [`ExecutionPlan`] node
785    /// to be "executable", such as ordering and/or distribution requirements
786    /// being fulfilled.
787    Executable,
788}
789
790/// Extension trait provides an easy API to fetch various properties of
791/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
792pub trait ExecutionPlanProperties {
793    /// Specifies how the output of this `ExecutionPlan` is split into
794    /// partitions.
795    fn output_partitioning(&self) -> &Partitioning;
796
797    /// If the output of this `ExecutionPlan` within each partition is sorted,
798    /// returns `Some(keys)` describing the ordering. A `None` return value
799    /// indicates no assumptions should be made on the output ordering.
800    ///
801    /// For example, `SortExec` (obviously) produces sorted output as does
802    /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
803    /// output if its input is sorted as it does not reorder the input rows.
804    fn output_ordering(&self) -> Option<&LexOrdering>;
805
806    /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
807    /// For more details, see [`Boundedness`].
808    fn boundedness(&self) -> Boundedness;
809
810    /// Indicates how the stream of this `ExecutionPlan` emits its results.
811    /// For more details, see [`EmissionType`].
812    fn pipeline_behavior(&self) -> EmissionType;
813
814    /// Get the [`EquivalenceProperties`] within the plan.
815    ///
816    /// Equivalence properties tell DataFusion what columns are known to be
817    /// equal, during various optimization passes. By default, this returns "no
818    /// known equivalences" which is always correct, but may cause DataFusion to
819    /// unnecessarily resort data.
820    ///
821    /// If this ExecutionPlan makes no changes to the schema of the rows flowing
822    /// through it or how columns within each row relate to each other, it
823    /// should return the equivalence properties of its input. For
824    /// example, since [`FilterExec`] may remove rows from its input, but does not
825    /// otherwise modify them, it preserves its input equivalence properties.
826    /// However, since `ProjectionExec` may calculate derived expressions, it
827    /// needs special handling.
828    ///
829    /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
830    /// for related concepts.
831    ///
832    /// [`FilterExec`]: crate::filter::FilterExec
833    fn equivalence_properties(&self) -> &EquivalenceProperties;
834}
835
836impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
837    fn output_partitioning(&self) -> &Partitioning {
838        self.properties().output_partitioning()
839    }
840
841    fn output_ordering(&self) -> Option<&LexOrdering> {
842        self.properties().output_ordering()
843    }
844
845    fn boundedness(&self) -> Boundedness {
846        self.properties().boundedness
847    }
848
849    fn pipeline_behavior(&self) -> EmissionType {
850        self.properties().emission_type
851    }
852
853    fn equivalence_properties(&self) -> &EquivalenceProperties {
854        self.properties().equivalence_properties()
855    }
856}
857
858impl ExecutionPlanProperties for &dyn ExecutionPlan {
859    fn output_partitioning(&self) -> &Partitioning {
860        self.properties().output_partitioning()
861    }
862
863    fn output_ordering(&self) -> Option<&LexOrdering> {
864        self.properties().output_ordering()
865    }
866
867    fn boundedness(&self) -> Boundedness {
868        self.properties().boundedness
869    }
870
871    fn pipeline_behavior(&self) -> EmissionType {
872        self.properties().emission_type
873    }
874
875    fn equivalence_properties(&self) -> &EquivalenceProperties {
876        self.properties().equivalence_properties()
877    }
878}
879
880/// Represents whether a stream of data **generated** by an operator is bounded (finite)
881/// or unbounded (infinite).
882///
883/// This is used to determine whether an execution plan will eventually complete
884/// processing all its data (bounded) or could potentially run forever (unbounded).
885///
886/// For unbounded streams, it also tracks whether the operator requires finite memory
887/// to process the stream or if memory usage could grow unbounded.
888///
889/// Boundedness of the output stream is based on the boundedness of the input stream and the nature of
890/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
891#[derive(Debug, Clone, Copy, PartialEq, Eq)]
892pub enum Boundedness {
893    /// The data stream is bounded (finite) and will eventually complete
894    Bounded,
895    /// The data stream is unbounded (infinite) and could run forever
896    Unbounded {
897        /// Whether this operator requires infinite memory to process the unbounded stream.
898        /// If false, the operator can process an infinite stream with bounded memory.
899        /// If true, memory usage may grow unbounded while processing the stream.
900        ///
901        /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
902        /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
903        requires_infinite_memory: bool,
904    },
905}
906
907impl Boundedness {
908    pub fn is_unbounded(&self) -> bool {
909        matches!(self, Boundedness::Unbounded { .. })
910    }
911}
912
913/// Represents how an operator emits its output records.
914///
915/// This is used to determine whether an operator emits records incrementally as they arrive,
916/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
917/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
918///
919/// For example, in the following plan:
920/// ```text
921///   SortExec [EmissionType::Final]
922///     |_ on: [col1 ASC]
923///     FilterExec [EmissionType::Incremental]
924///       |_ pred: col2 > 100
925///       DataSourceExec [EmissionType::Incremental]
926///         |_ file: "data.csv"
927/// ```
928/// - DataSourceExec emits records incrementally as it reads from the file
929/// - FilterExec processes and emits filtered records incrementally as they arrive
930/// - SortExec must wait for all input records before it can emit the sorted result,
931///   since it needs to see all values to determine their final order
932///
933/// Left joins can emit both incrementally and finally:
934/// - Incrementally emit matches as they are found
935/// - Finally emit non-matches after all input is processed
936#[derive(Debug, Clone, Copy, PartialEq, Eq)]
937pub enum EmissionType {
938    /// Records are emitted incrementally as they arrive and are processed
939    Incremental,
940    /// Records are only emitted once all input has been processed
941    Final,
942    /// Records can be emitted both incrementally and as a final result
943    Both,
944}
945
946/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the
947/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details.
948#[derive(Debug, Clone, Copy, PartialEq, Eq)]
949pub enum SchedulingType {
950    /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in
951    /// cooperative scheduling. This means the implementation of the `Stream` returned by
952    /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as
953    /// [`tokio::task::coop::consume_budget`].
954    ///
955    /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to
956    /// the [`coop`](crate::coop) module for details on when it may be useful to use
957    /// `Cooperative` instead.
958    NonCooperative,
959    /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in
960    /// cooperative scheduling by consuming task budget when it was able to produce a
961    /// [`RecordBatch`].
962    Cooperative,
963}
964
965/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
966///
967/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
968/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
969///
970/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
971/// is known as data-driven or eager evaluation.
972#[derive(Debug, Clone, Copy, PartialEq, Eq)]
973pub enum EvaluationType {
974    /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
975    /// instances when it is demanded by invoking `Stream::poll_next`.
976    /// Filter, projection, and join are examples of such lazy operators.
977    ///
978    /// Lazy operators are also known as demand-driven operators.
979    Lazy,
980    /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
981    /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
982    /// `Stream::poll_next` is called.
983    /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
984    ///
985    /// Eager operators are also known as a data-driven operators.
986    Eager,
987}
988
989/// Utility to determine an operator's boundedness based on its children's boundedness.
990///
991/// Assumes boundedness can be inferred from child operators:
992/// - Unbounded (requires_infinite_memory: true) takes precedence.
993/// - Unbounded (requires_infinite_memory: false) is considered next.
994/// - Otherwise, the operator is bounded.
995///
996/// **Note:** This is a general-purpose utility and may not apply to
997/// all multi-child operators. Ensure your operator's behavior aligns
998/// with these assumptions before using.
999pub(crate) fn boundedness_from_children<'a>(
1000    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
1001) -> Boundedness {
1002    let mut unbounded_with_finite_mem = false;
1003
1004    for child in children {
1005        match child.boundedness() {
1006            Boundedness::Unbounded {
1007                requires_infinite_memory: true,
1008            } => {
1009                return Boundedness::Unbounded {
1010                    requires_infinite_memory: true,
1011                };
1012            }
1013            Boundedness::Unbounded {
1014                requires_infinite_memory: false,
1015            } => {
1016                unbounded_with_finite_mem = true;
1017            }
1018            Boundedness::Bounded => {}
1019        }
1020    }
1021
1022    if unbounded_with_finite_mem {
1023        Boundedness::Unbounded {
1024            requires_infinite_memory: false,
1025        }
1026    } else {
1027        Boundedness::Bounded
1028    }
1029}
1030
1031/// Determines the emission type of an operator based on its children's pipeline behavior.
1032///
1033/// The precedence of emission types is:
1034/// - `Final` has the highest precedence.
1035/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
1036/// - `Incremental` is the default if all children emit incremental results.
1037///
1038/// **Note:** This is a general-purpose utility and may not apply to
1039/// all multi-child operators. Verify your operator's behavior aligns
1040/// with these assumptions.
1041pub(crate) fn emission_type_from_children<'a>(
1042    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
1043) -> EmissionType {
1044    let mut inc_and_final = false;
1045
1046    for child in children {
1047        match child.pipeline_behavior() {
1048            EmissionType::Final => return EmissionType::Final,
1049            EmissionType::Both => inc_and_final = true,
1050            EmissionType::Incremental => continue,
1051        }
1052    }
1053
1054    if inc_and_final {
1055        EmissionType::Both
1056    } else {
1057        EmissionType::Incremental
1058    }
1059}
1060
1061/// Stores certain, often expensive to compute, plan properties used in query
1062/// optimization.
1063///
1064/// These properties are stored a single structure to permit this information to
1065/// be computed once and then those cached results used multiple times without
1066/// recomputation (aka a cache)
1067#[derive(Debug, Clone)]
1068pub struct PlanProperties {
1069    /// See [ExecutionPlanProperties::equivalence_properties]
1070    pub eq_properties: EquivalenceProperties,
1071    /// See [ExecutionPlanProperties::output_partitioning]
1072    pub partitioning: Partitioning,
1073    /// See [ExecutionPlanProperties::pipeline_behavior]
1074    pub emission_type: EmissionType,
1075    /// See [ExecutionPlanProperties::boundedness]
1076    pub boundedness: Boundedness,
1077    pub evaluation_type: EvaluationType,
1078    pub scheduling_type: SchedulingType,
1079    /// See [ExecutionPlanProperties::output_ordering]
1080    output_ordering: Option<LexOrdering>,
1081}
1082
1083impl PlanProperties {
1084    /// Construct a new `PlanPropertiesCache` from the
1085    pub fn new(
1086        eq_properties: EquivalenceProperties,
1087        partitioning: Partitioning,
1088        emission_type: EmissionType,
1089        boundedness: Boundedness,
1090    ) -> Self {
1091        // Output ordering can be derived from `eq_properties`.
1092        let output_ordering = eq_properties.output_ordering();
1093        Self {
1094            eq_properties,
1095            partitioning,
1096            emission_type,
1097            boundedness,
1098            evaluation_type: EvaluationType::Lazy,
1099            scheduling_type: SchedulingType::NonCooperative,
1100            output_ordering,
1101        }
1102    }
1103
1104    /// Overwrite output partitioning with its new value.
1105    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
1106        self.partitioning = partitioning;
1107        self
1108    }
1109
1110    /// Set equivalence properties having mut reference.
1111    pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) {
1112        // Changing equivalence properties also changes output ordering, so
1113        // make sure to overwrite it:
1114        self.output_ordering = eq_properties.output_ordering();
1115        self.eq_properties = eq_properties;
1116    }
1117
1118    /// Overwrite equivalence properties with its new value.
1119    pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
1120        self.set_eq_properties(eq_properties);
1121        self
1122    }
1123
1124    /// Overwrite boundedness with its new value.
1125    pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
1126        self.boundedness = boundedness;
1127        self
1128    }
1129
1130    /// Overwrite emission type with its new value.
1131    pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
1132        self.emission_type = emission_type;
1133        self
1134    }
1135
1136    /// Set the [`SchedulingType`].
1137    ///
1138    /// Defaults to [`SchedulingType::NonCooperative`]
1139    pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
1140        self.scheduling_type = scheduling_type;
1141        self
1142    }
1143
1144    /// Set the [`EvaluationType`].
1145    ///
1146    /// Defaults to [`EvaluationType::Lazy`]
1147    pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
1148        self.evaluation_type = drive_type;
1149        self
1150    }
1151
1152    /// Set constraints having mut reference.
1153    pub fn set_constraints(&mut self, constraints: Constraints) {
1154        self.eq_properties.set_constraints(constraints);
1155    }
1156
1157    /// Overwrite constraints with its new value.
1158    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1159        self.set_constraints(constraints);
1160        self
1161    }
1162
1163    pub fn equivalence_properties(&self) -> &EquivalenceProperties {
1164        &self.eq_properties
1165    }
1166
1167    pub fn output_partitioning(&self) -> &Partitioning {
1168        &self.partitioning
1169    }
1170
1171    pub fn output_ordering(&self) -> Option<&LexOrdering> {
1172        self.output_ordering.as_ref()
1173    }
1174
1175    /// Get schema of the node.
1176    pub(crate) fn schema(&self) -> &SchemaRef {
1177        self.eq_properties.schema()
1178    }
1179}
1180
1181macro_rules! check_len {
1182    ($target:expr, $func_name:ident, $expected_len:expr) => {
1183        let actual_len = $target.$func_name().len();
1184        assert_eq_or_internal_err!(
1185            actual_len,
1186            $expected_len,
1187            "{}::{} returned Vec with incorrect size: {} != {}",
1188            $target.name(),
1189            stringify!($func_name),
1190            actual_len,
1191            $expected_len
1192        );
1193    };
1194}
1195
1196/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1197/// Returns an error if the given node does not conform.
1198pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1199    plan: &P,
1200    _check: InvariantLevel,
1201) -> Result<(), DataFusionError> {
1202    let children_len = plan.children().len();
1203
1204    check_len!(plan, maintains_input_order, children_len);
1205    check_len!(plan, required_input_ordering, children_len);
1206    check_len!(plan, required_input_distribution, children_len);
1207    check_len!(plan, benefits_from_input_partitioning, children_len);
1208
1209    Ok(())
1210}
1211
1212/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
1213/// especially for the distributed engine to judge whether need to deal with shuffling.
1214/// Currently, there are 3 kinds of execution plan which needs data exchange
1215///     1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
1216///     2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
1217///     3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1218#[expect(clippy::needless_pass_by_value)]
1219pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
1220    plan.properties().evaluation_type == EvaluationType::Eager
1221}
1222
1223/// Returns a copy of this plan if we change any child according to the pointer comparison.
1224/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1225pub fn with_new_children_if_necessary(
1226    plan: Arc<dyn ExecutionPlan>,
1227    children: Vec<Arc<dyn ExecutionPlan>>,
1228) -> Result<Arc<dyn ExecutionPlan>> {
1229    let old_children = plan.children();
1230    assert_eq_or_internal_err!(
1231        children.len(),
1232        old_children.len(),
1233        "Wrong number of children"
1234    );
1235    if children.is_empty()
1236        || children
1237            .iter()
1238            .zip(old_children.iter())
1239            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
1240    {
1241        plan.with_new_children(children)
1242    } else {
1243        Ok(plan)
1244    }
1245}
1246
1247/// Return a [`DisplayableExecutionPlan`] wrapper around an
1248/// [`ExecutionPlan`] which can be displayed in various easier to
1249/// understand ways.
1250///
1251/// See examples on [`DisplayableExecutionPlan`]
1252pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
1253    DisplayableExecutionPlan::new(plan)
1254}
1255
1256/// Execute the [ExecutionPlan] and collect the results in memory
1257pub async fn collect(
1258    plan: Arc<dyn ExecutionPlan>,
1259    context: Arc<TaskContext>,
1260) -> Result<Vec<RecordBatch>> {
1261    let stream = execute_stream(plan, context)?;
1262    crate::common::collect(stream).await
1263}
1264
1265/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
1266///
1267/// See [collect] to buffer the `RecordBatch`es in memory.
1268///
1269/// # Aborting Execution
1270///
1271/// Dropping the stream will abort the execution of the query, and free up
1272/// any allocated resources
1273#[expect(
1274    clippy::needless_pass_by_value,
1275    reason = "Public API that historically takes owned Arcs"
1276)]
1277pub fn execute_stream(
1278    plan: Arc<dyn ExecutionPlan>,
1279    context: Arc<TaskContext>,
1280) -> Result<SendableRecordBatchStream> {
1281    match plan.output_partitioning().partition_count() {
1282        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1283        1 => plan.execute(0, context),
1284        2.. => {
1285            // merge into a single partition
1286            let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
1287            // CoalescePartitionsExec must produce a single partition
1288            assert_eq!(1, plan.properties().output_partitioning().partition_count());
1289            plan.execute(0, context)
1290        }
1291    }
1292}
1293
1294/// Execute the [ExecutionPlan] and collect the results in memory
1295pub async fn collect_partitioned(
1296    plan: Arc<dyn ExecutionPlan>,
1297    context: Arc<TaskContext>,
1298) -> Result<Vec<Vec<RecordBatch>>> {
1299    let streams = execute_stream_partitioned(plan, context)?;
1300
1301    let mut join_set = JoinSet::new();
1302    // Execute the plan and collect the results into batches.
1303    streams.into_iter().enumerate().for_each(|(idx, stream)| {
1304        join_set.spawn(async move {
1305            let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
1306            (idx, result)
1307        });
1308    });
1309
1310    let mut batches = vec![];
1311    // Note that currently this doesn't identify the thread that panicked
1312    //
1313    // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
1314    // once it is stable
1315    while let Some(result) = join_set.join_next().await {
1316        match result {
1317            Ok((idx, res)) => batches.push((idx, res?)),
1318            Err(e) => {
1319                if e.is_panic() {
1320                    std::panic::resume_unwind(e.into_panic());
1321                } else {
1322                    unreachable!();
1323                }
1324            }
1325        }
1326    }
1327
1328    batches.sort_by_key(|(idx, _)| *idx);
1329    let batches = batches.into_iter().map(|(_, batch)| batch).collect();
1330
1331    Ok(batches)
1332}
1333
1334/// Execute the [ExecutionPlan] and return a vec with one stream per output
1335/// partition
1336///
1337/// # Aborting Execution
1338///
1339/// Dropping the stream will abort the execution of the query, and free up
1340/// any allocated resources
1341#[expect(
1342    clippy::needless_pass_by_value,
1343    reason = "Public API that historically takes owned Arcs"
1344)]
1345pub fn execute_stream_partitioned(
1346    plan: Arc<dyn ExecutionPlan>,
1347    context: Arc<TaskContext>,
1348) -> Result<Vec<SendableRecordBatchStream>> {
1349    let num_partitions = plan.output_partitioning().partition_count();
1350    let mut streams = Vec::with_capacity(num_partitions);
1351    for i in 0..num_partitions {
1352        streams.push(plan.execute(i, Arc::clone(&context))?);
1353    }
1354    Ok(streams)
1355}
1356
1357/// Executes an input stream and ensures that the resulting stream adheres to
1358/// the `not null` constraints specified in the `sink_schema`.
1359///
1360/// # Arguments
1361///
1362/// * `input` - An execution plan
1363/// * `sink_schema` - The schema to be applied to the output stream
1364/// * `partition` - The partition index to be executed
1365/// * `context` - The task context
1366///
1367/// # Returns
1368///
1369/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
1370///
1371/// This function first executes the given input plan for the specified partition
1372/// and context. It then checks if there are any columns in the input that might
1373/// violate the `not null` constraints specified in the `sink_schema`. If there are
1374/// such columns, it wraps the resulting stream to enforce the `not null` constraints
1375/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1376#[expect(
1377    clippy::needless_pass_by_value,
1378    reason = "Public API that historically takes owned Arcs"
1379)]
1380pub fn execute_input_stream(
1381    input: Arc<dyn ExecutionPlan>,
1382    sink_schema: SchemaRef,
1383    partition: usize,
1384    context: Arc<TaskContext>,
1385) -> Result<SendableRecordBatchStream> {
1386    let input_stream = input.execute(partition, context)?;
1387
1388    debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
1389
1390    // Find input columns that may violate the not null constraint.
1391    let risky_columns: Vec<_> = sink_schema
1392        .fields()
1393        .iter()
1394        .zip(input.schema().fields().iter())
1395        .enumerate()
1396        .filter_map(|(idx, (sink_field, input_field))| {
1397            (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
1398        })
1399        .collect();
1400
1401    if risky_columns.is_empty() {
1402        Ok(input_stream)
1403    } else {
1404        // Check not null constraint on the input stream
1405        Ok(Box::pin(RecordBatchStreamAdapter::new(
1406            sink_schema,
1407            input_stream
1408                .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
1409        )))
1410    }
1411}
1412
1413/// Checks a `RecordBatch` for `not null` constraints on specified columns.
1414///
1415/// # Arguments
1416///
1417/// * `batch` - The `RecordBatch` to be checked
1418/// * `column_indices` - A vector of column indices that should be checked for
1419///   `not null` constraints.
1420///
1421/// # Returns
1422///
1423/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
1424///
1425/// This function iterates over the specified column indices and ensures that none
1426/// of the columns contain null values. If any column contains null values, an error
1427/// is returned.
1428pub fn check_not_null_constraints(
1429    batch: RecordBatch,
1430    column_indices: &Vec<usize>,
1431) -> Result<RecordBatch> {
1432    for &index in column_indices {
1433        if batch.num_columns() <= index {
1434            return exec_err!(
1435                "Invalid batch column count {} expected > {}",
1436                batch.num_columns(),
1437                index
1438            );
1439        }
1440
1441        if batch
1442            .column(index)
1443            .logical_nulls()
1444            .map(|nulls| nulls.null_count())
1445            .unwrap_or_default()
1446            > 0
1447        {
1448            return exec_err!(
1449                "Invalid batch column at '{}' has null but schema specifies non-nullable",
1450                index
1451            );
1452        }
1453    }
1454
1455    Ok(batch)
1456}
1457
1458/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1459///
1460/// Some plans will change their internal states after execution, making them unable to be executed again.
1461/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1462///
1463/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1464/// However, if the data of the left table is derived from the work table, it will become outdated
1465/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1466///
1467/// # Limitations
1468///
1469/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1470///
1471/// * uses dynamic filters,
1472/// * represents a recursive query.
1473///
1474pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1475    plan.transform_up(|plan| {
1476        let new_plan = Arc::clone(&plan).reset_state()?;
1477        Ok(Transformed::yes(new_plan))
1478    })
1479    .data()
1480}
1481
1482/// Check if the `plan` children has the same properties as passed `children`.
1483/// In this case plan can avoid self properties re-computation when its children
1484/// replace is requested.
1485/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1486pub fn has_same_children_properties(
1487    plan: &impl ExecutionPlan,
1488    children: &[Arc<dyn ExecutionPlan>],
1489) -> Result<bool> {
1490    let old_children = plan.children();
1491    assert_eq_or_internal_err!(
1492        children.len(),
1493        old_children.len(),
1494        "Wrong number of children"
1495    );
1496    for (lhs, rhs) in old_children.iter().zip(children.iter()) {
1497        if !Arc::ptr_eq(lhs.properties(), rhs.properties()) {
1498            return Ok(false);
1499        }
1500    }
1501    Ok(true)
1502}
1503
1504/// Helper macro to avoid properties re-computation if passed children properties
1505/// the same as plan already has. Could be used to implement fast-path for method
1506/// [`ExecutionPlan::with_new_children`].
1507#[macro_export]
1508macro_rules! check_if_same_properties {
1509    ($plan: expr, $children: expr) => {
1510        if $crate::execution_plan::has_same_children_properties(
1511            $plan.as_ref(),
1512            &$children,
1513        )? {
1514            let plan = $plan.with_new_children_and_same_properties($children);
1515            return Ok(::std::sync::Arc::new(plan));
1516        }
1517    };
1518}
1519
1520/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1521pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1522    let formatted = displayable(plan.as_ref()).indent(true).to_string();
1523    let actual: Vec<&str> = formatted.trim().lines().collect();
1524    actual.iter().map(|elem| (*elem).to_string()).collect()
1525}
1526
1527/// Indicates the effect an execution plan operator will have on the cardinality
1528/// of its input stream
1529pub enum CardinalityEffect {
1530    /// Unknown effect. This is the default
1531    Unknown,
1532    /// The operator is guaranteed to produce exactly one row for
1533    /// each input row
1534    Equal,
1535    /// The operator may produce fewer output rows than it receives input rows
1536    LowerEqual,
1537    /// The operator may produce more output rows than it receives input rows
1538    GreaterEqual,
1539}
1540
1541/// Can be used in contexts where properties have not yet been initialized properly.
1542pub(crate) fn stub_properties() -> Arc<PlanProperties> {
1543    static STUB_PROPERTIES: LazyLock<Arc<PlanProperties>> = LazyLock::new(|| {
1544        Arc::new(PlanProperties::new(
1545            EquivalenceProperties::new(Arc::new(Schema::empty())),
1546            Partitioning::UnknownPartitioning(1),
1547            EmissionType::Final,
1548            Boundedness::Bounded,
1549        ))
1550    });
1551
1552    Arc::clone(&STUB_PROPERTIES)
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557
1558    use super::*;
1559    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1560
1561    use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1562    use arrow::datatypes::{DataType, Field, Schema};
1563
1564    #[derive(Debug)]
1565    pub struct EmptyExec;
1566
1567    impl EmptyExec {
1568        pub fn new(_schema: SchemaRef) -> Self {
1569            Self
1570        }
1571    }
1572
1573    impl DisplayAs for EmptyExec {
1574        fn fmt_as(
1575            &self,
1576            _t: DisplayFormatType,
1577            _f: &mut std::fmt::Formatter,
1578        ) -> std::fmt::Result {
1579            unimplemented!()
1580        }
1581    }
1582
1583    impl ExecutionPlan for EmptyExec {
1584        fn name(&self) -> &'static str {
1585            Self::static_name()
1586        }
1587
1588        fn properties(&self) -> &Arc<PlanProperties> {
1589            unimplemented!()
1590        }
1591
1592        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1593            vec![]
1594        }
1595
1596        fn with_new_children(
1597            self: Arc<Self>,
1598            _: Vec<Arc<dyn ExecutionPlan>>,
1599        ) -> Result<Arc<dyn ExecutionPlan>> {
1600            unimplemented!()
1601        }
1602
1603        fn execute(
1604            &self,
1605            _partition: usize,
1606            _context: Arc<TaskContext>,
1607        ) -> Result<SendableRecordBatchStream> {
1608            unimplemented!()
1609        }
1610
1611        fn partition_statistics(
1612            &self,
1613            _partition: Option<usize>,
1614        ) -> Result<Arc<Statistics>> {
1615            unimplemented!()
1616        }
1617    }
1618
1619    #[derive(Debug)]
1620    pub struct RenamedEmptyExec;
1621
1622    impl RenamedEmptyExec {
1623        pub fn new(_schema: SchemaRef) -> Self {
1624            Self
1625        }
1626    }
1627
1628    impl DisplayAs for RenamedEmptyExec {
1629        fn fmt_as(
1630            &self,
1631            _t: DisplayFormatType,
1632            _f: &mut std::fmt::Formatter,
1633        ) -> std::fmt::Result {
1634            unimplemented!()
1635        }
1636    }
1637
1638    impl ExecutionPlan for RenamedEmptyExec {
1639        fn name(&self) -> &'static str {
1640            Self::static_name()
1641        }
1642
1643        fn static_name() -> &'static str
1644        where
1645            Self: Sized,
1646        {
1647            "MyRenamedEmptyExec"
1648        }
1649
1650        fn properties(&self) -> &Arc<PlanProperties> {
1651            unimplemented!()
1652        }
1653
1654        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1655            vec![]
1656        }
1657
1658        fn with_new_children(
1659            self: Arc<Self>,
1660            _: Vec<Arc<dyn ExecutionPlan>>,
1661        ) -> Result<Arc<dyn ExecutionPlan>> {
1662            unimplemented!()
1663        }
1664
1665        fn execute(
1666            &self,
1667            _partition: usize,
1668            _context: Arc<TaskContext>,
1669        ) -> Result<SendableRecordBatchStream> {
1670            unimplemented!()
1671        }
1672
1673        fn partition_statistics(
1674            &self,
1675            _partition: Option<usize>,
1676        ) -> Result<Arc<Statistics>> {
1677            unimplemented!()
1678        }
1679    }
1680
1681    #[derive(Debug)]
1682    struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>);
1683
1684    impl DisplayAs for DowncastDelegatingExec {
1685        fn fmt_as(
1686            &self,
1687            _t: DisplayFormatType,
1688            _f: &mut std::fmt::Formatter,
1689        ) -> std::fmt::Result {
1690            unimplemented!()
1691        }
1692    }
1693
1694    impl ExecutionPlan for DowncastDelegatingExec {
1695        fn name(&self) -> &'static str {
1696            Self::static_name()
1697        }
1698
1699        fn properties(&self) -> &Arc<PlanProperties> {
1700            unimplemented!()
1701        }
1702
1703        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1704            vec![]
1705        }
1706
1707        fn with_new_children(
1708            self: Arc<Self>,
1709            _: Vec<Arc<dyn ExecutionPlan>>,
1710        ) -> Result<Arc<dyn ExecutionPlan>> {
1711            unimplemented!()
1712        }
1713
1714        fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
1715            Some(self.0.as_ref())
1716        }
1717
1718        fn execute(
1719            &self,
1720            _partition: usize,
1721            _context: Arc<TaskContext>,
1722        ) -> Result<SendableRecordBatchStream> {
1723            unimplemented!()
1724        }
1725
1726        fn partition_statistics(
1727            &self,
1728            _partition: Option<usize>,
1729        ) -> Result<Arc<Statistics>> {
1730            unimplemented!()
1731        }
1732    }
1733    #[test]
1734    fn test_execution_plan_name() {
1735        let schema1 = Arc::new(Schema::empty());
1736        let default_name_exec = EmptyExec::new(schema1);
1737        assert_eq!(default_name_exec.name(), "EmptyExec");
1738
1739        let schema2 = Arc::new(Schema::empty());
1740        let renamed_exec = RenamedEmptyExec::new(schema2);
1741        assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1742        assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1743    }
1744
1745    #[test]
1746    fn test_execution_plan_downcast_delegates_to_downcast_delegate() {
1747        let schema = Arc::new(Schema::empty());
1748        let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
1749        let wrapped: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec(inner));
1750        let nested: Arc<dyn ExecutionPlan> =
1751            Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped)));
1752
1753        for plan in [wrapped.as_ref(), nested.as_ref()] {
1754            assert!(!plan.is::<DowncastDelegatingExec>());
1755            assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none());
1756            assert!(plan.is::<EmptyExec>());
1757            assert!(plan.downcast_ref::<EmptyExec>().is_some());
1758            assert!(!plan.is::<RenamedEmptyExec>());
1759            assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none());
1760        }
1761    }
1762
1763    /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1764    /// be called from a trait object.
1765    /// Related ticket: https://github.com/apache/datafusion/pull/11047
1766    #[expect(unused)]
1767    fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1768        let _ = plan.name();
1769    }
1770
1771    #[test]
1772    fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1773        check_not_null_constraints(
1774            RecordBatch::try_new(
1775                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1776                vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1777            )?,
1778            &vec![0],
1779        )?;
1780        Ok(())
1781    }
1782
1783    #[test]
1784    fn test_check_not_null_constraints_reject_null() -> Result<()> {
1785        let result = check_not_null_constraints(
1786            RecordBatch::try_new(
1787                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1788                vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1789            )?,
1790            &vec![0],
1791        );
1792        assert!(result.is_err());
1793        assert_eq!(
1794            result.err().unwrap().strip_backtrace(),
1795            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1796        );
1797        Ok(())
1798    }
1799
1800    #[test]
1801    fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1802        // some null value inside REE array
1803        let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1804        let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1805        let run_end_array = RunArray::try_new(&run_ends, &values)?;
1806        let result = check_not_null_constraints(
1807            RecordBatch::try_new(
1808                Arc::new(Schema::new(vec![Field::new(
1809                    "a",
1810                    run_end_array.data_type().to_owned(),
1811                    true,
1812                )])),
1813                vec![Arc::new(run_end_array)],
1814            )?,
1815            &vec![0],
1816        );
1817        assert!(result.is_err());
1818        assert_eq!(
1819            result.err().unwrap().strip_backtrace(),
1820            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1821        );
1822        Ok(())
1823    }
1824
1825    #[test]
1826    fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1827        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1828        let keys = Int32Array::from(vec![0, 1, 2, 3]);
1829        let dictionary = DictionaryArray::new(keys, values);
1830        let result = check_not_null_constraints(
1831            RecordBatch::try_new(
1832                Arc::new(Schema::new(vec![Field::new(
1833                    "a",
1834                    dictionary.data_type().to_owned(),
1835                    true,
1836                )])),
1837                vec![Arc::new(dictionary)],
1838            )?,
1839            &vec![0],
1840        );
1841        assert!(result.is_err());
1842        assert_eq!(
1843            result.err().unwrap().strip_backtrace(),
1844            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1845        );
1846        Ok(())
1847    }
1848
1849    #[test]
1850    fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1851        // some null value marked out by dictionary array
1852        let values = Arc::new(Int32Array::from(vec![
1853            Some(1),
1854            None, // this null value is masked by dictionary keys
1855            Some(3),
1856            Some(4),
1857        ]));
1858        let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1859        let dictionary = DictionaryArray::new(keys, values);
1860        check_not_null_constraints(
1861            RecordBatch::try_new(
1862                Arc::new(Schema::new(vec![Field::new(
1863                    "a",
1864                    dictionary.data_type().to_owned(),
1865                    true,
1866                )])),
1867                vec![Arc::new(dictionary)],
1868            )?,
1869            &vec![0],
1870        )?;
1871        Ok(())
1872    }
1873
1874    #[test]
1875    fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1876        // null value of Null type
1877        let result = check_not_null_constraints(
1878            RecordBatch::try_new(
1879                Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1880                vec![Arc::new(NullArray::new(3))],
1881            )?,
1882            &vec![0],
1883        );
1884        assert!(result.is_err());
1885        assert_eq!(
1886            result.err().unwrap().strip_backtrace(),
1887            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1888        );
1889        Ok(())
1890    }
1891}