datafusion_physical_plan/execution_plan.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19use crate::filter_pushdown::{
20 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21 FilterPushdownPropagation,
22};
23pub use crate::metrics::Metric;
24pub use crate::ordering::InputOrderMode;
25use crate::sort_pushdown::SortOrderPushdownResult;
26pub use crate::stream::EmptyRecordBatchStream;
27
28use arrow_schema::Schema;
29pub use datafusion_common::hash_utils;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31pub use datafusion_common::utils::project_schema;
32pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
33pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
34pub use datafusion_expr::{Accumulator, ColumnarValue};
35pub use datafusion_physical_expr::window::WindowExpr;
36pub use datafusion_physical_expr::{
37 Distribution, Partitioning, PhysicalExpr, expressions,
38};
39
40use std::any::Any;
41use std::fmt::Debug;
42use std::sync::{Arc, LazyLock};
43
44use crate::coalesce_partitions::CoalescePartitionsExec;
45use crate::display::DisplayableExecutionPlan;
46use crate::metrics::MetricsSet;
47use crate::projection::ProjectionExec;
48use crate::stream::RecordBatchStreamAdapter;
49
50use arrow::array::{Array, RecordBatch};
51use arrow::datatypes::SchemaRef;
52use datafusion_common::config::ConfigOptions;
53use datafusion_common::{
54 Constraints, DataFusionError, Result, assert_eq_or_internal_err,
55 assert_or_internal_err, exec_err,
56};
57use datafusion_common_runtime::JoinSet;
58use datafusion_execution::TaskContext;
59use datafusion_physical_expr::EquivalenceProperties;
60use datafusion_physical_expr_common::sort_expr::{
61 LexOrdering, OrderingRequirements, PhysicalSortExpr,
62};
63
64use futures::stream::{StreamExt, TryStreamExt};
65
66/// Represent nodes in the DataFusion Physical Plan.
67///
68/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
69/// [`RecordBatch`] that incrementally computes a partition of the
70/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
71/// details on partitioning.
72///
73/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
74/// properties of the output to the DataFusion optimizer, and methods such as
75/// [`required_input_distribution`] and [`required_input_ordering`] express
76/// requirements of the `ExecutionPlan` from its input.
77///
78/// [`ExecutionPlan`] can be displayed in a simplified form using the
79/// return value from [`displayable`] in addition to the (normally
80/// quite verbose) `Debug` output.
81///
82/// [`execute`]: ExecutionPlan::execute
83/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
84/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
85///
86/// # Examples
87///
88/// See [`datafusion-examples`] for examples, including
89/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom
90/// `ExecutionPlan` with memory tracking and spilling support.
91///
92/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
93/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
94pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
95 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
96 ///
97 /// Implementation note: this method can just proxy to
98 /// [`static_name`](ExecutionPlan::static_name) if no special action is
99 /// needed. It doesn't provide a default implementation like that because
100 /// this method doesn't require the `Sized` constrain to allow a wilder
101 /// range of use cases.
102 fn name(&self) -> &str;
103
104 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
105 /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
106 fn static_name() -> &'static str
107 where
108 Self: Sized,
109 {
110 let full_name = std::any::type_name::<Self>();
111 let maybe_start_idx = full_name.rfind(':');
112 match maybe_start_idx {
113 Some(start_idx) => &full_name[start_idx + 1..],
114 None => "UNKNOWN",
115 }
116 }
117
118 /// Returns the plan that provides this plan's public
119 /// [`ExecutionPlan`] downcast identity.
120 ///
121 /// This hook is for wrapper nodes that delegate their public downcast
122 /// identity to another plan while adding cross-cutting behavior such as
123 /// instrumentation. The default implementation returns `None`, meaning this
124 /// plan's concrete type is used for type introspection.
125 ///
126 /// Most `ExecutionPlan` implementations should use the default `None`;
127 /// override this only for wrapper plans that intentionally delegate their
128 /// public downcast identity to another plan.
129 ///
130 /// The `is` and `downcast_ref` helpers follow the returned delegate instead
131 /// of checking the current concrete type, making intermediate delegating
132 /// wrappers invisible to normal downcast-based inspection.
133 ///
134 /// Implementations that opt in should return the delegate plan, not `self`.
135 ///
136 /// This is independent from [`Self::children`] and should not be used for
137 /// plan traversal or optimizer rewrites.
138 fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
139 None
140 }
141
142 /// Get the schema for this execution plan
143 fn schema(&self) -> SchemaRef {
144 Arc::clone(self.properties().schema())
145 }
146
147 /// Return properties of the output of the `ExecutionPlan`, such as output
148 /// ordering(s), partitioning information etc.
149 ///
150 /// This information is available via methods on [`ExecutionPlanProperties`]
151 /// trait, which is implemented for all `ExecutionPlan`s.
152 fn properties(&self) -> &Arc<PlanProperties>;
153
154 /// Returns an error if this individual node does not conform to its invariants.
155 /// These invariants are typically only checked in debug mode.
156 ///
157 /// A default set of invariants is provided in the [check_default_invariants] function.
158 /// The default implementation of `check_invariants` calls this function.
159 /// Extension nodes can provide their own invariants.
160 fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
161 check_default_invariants(self, check)
162 }
163
164 /// Specifies the data distribution requirements for all the
165 /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
166 fn required_input_distribution(&self) -> Vec<Distribution> {
167 vec![Distribution::UnspecifiedDistribution; self.children().len()]
168 }
169
170 /// Specifies the ordering required for all of the children of this
171 /// `ExecutionPlan`.
172 ///
173 /// For each child, it's the local ordering requirement within
174 /// each partition rather than the global ordering
175 ///
176 /// NOTE that checking `!is_empty()` does **not** check for a
177 /// required input ordering. Instead, the correct check is that at
178 /// least one entry must be `Some`
179 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
180 vec![None; self.children().len()]
181 }
182
183 /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
184 /// rows within or between partitions.
185 ///
186 /// For example, Projection, Filter, and Limit maintain the order
187 /// of inputs -- they may transform values (Projection) or not
188 /// produce the same number of rows that went in (Filter and
189 /// Limit), but the rows that are produced go in the same way.
190 ///
191 /// DataFusion uses this metadata to apply certain optimizations
192 /// such as automatically repartitioning correctly.
193 ///
194 /// The default implementation returns `false`
195 ///
196 /// WARNING: if you override this default, you *MUST* ensure that
197 /// the `ExecutionPlan`'s maintains the ordering invariant or else
198 /// DataFusion may produce incorrect results.
199 fn maintains_input_order(&self) -> Vec<bool> {
200 vec![false; self.children().len()]
201 }
202
203 /// Specifies whether the `ExecutionPlan` benefits from increased
204 /// parallelization at its input for each child.
205 ///
206 /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
207 /// its corresponding child (and thus from more parallelism). For
208 /// `ExecutionPlan` that do very little work the overhead of extra
209 /// parallelism may outweigh any benefits
210 ///
211 /// The default implementation returns `true` unless this `ExecutionPlan`
212 /// has signalled it requires a single child input partition.
213 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
214 // By default try to maximize parallelism with more CPUs if
215 // possible
216 self.required_input_distribution()
217 .into_iter()
218 .map(|dist| !matches!(dist, Distribution::SinglePartition))
219 .collect()
220 }
221
222 /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
223 /// The returned list will be empty for leaf nodes such as scans, will contain
224 /// a single value for unary nodes, or two values for binary nodes (such as
225 /// joins).
226 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
227
228 /// Returns a new `ExecutionPlan` where all existing children were replaced
229 /// by the `children`, in order
230 fn with_new_children(
231 self: Arc<Self>,
232 children: Vec<Arc<dyn ExecutionPlan>>,
233 ) -> Result<Arc<dyn ExecutionPlan>>;
234
235 /// Reset any internal state within this [`ExecutionPlan`].
236 ///
237 /// This method is called when an [`ExecutionPlan`] needs to be re-executed,
238 /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
239 /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
240 /// are reset to their initial state.
241 ///
242 /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
243 /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
244 /// necessarily resetting any internal state. Implementations that require resetting of some
245 /// internal state should override this method to provide the necessary logic.
246 ///
247 /// This method should *not* reset state recursively for children, as it is expected that
248 /// it will be called from within a walk of the execution plan tree so that it will be called on each child later
249 /// or was already called on each child.
250 ///
251 /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
252 /// thus it is expected that any cached plan properties will remain valid after the reset.
253 ///
254 /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
255 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
256 let children = self.children().into_iter().cloned().collect();
257 self.with_new_children(children)
258 }
259
260 /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
261 /// produce `target_partitions` partitions.
262 ///
263 /// If the `ExecutionPlan` does not support changing its partitioning,
264 /// returns `Ok(None)` (the default).
265 ///
266 /// It is the `ExecutionPlan` can increase its partitioning, but not to the
267 /// `target_partitions`, it may return an ExecutionPlan with fewer
268 /// partitions. This might happen, for example, if each new partition would
269 /// be too small to be efficiently processed individually.
270 ///
271 /// The DataFusion optimizer attempts to use as many threads as possible by
272 /// repartitioning its inputs to match the target number of threads
273 /// available (`target_partitions`). Some data sources, such as the built in
274 /// CSV and Parquet readers, implement this method as they are able to read
275 /// from their input files in parallel, regardless of how the source data is
276 /// split amongst files.
277 fn repartitioned(
278 &self,
279 _target_partitions: usize,
280 _config: &ConfigOptions,
281 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
282 Ok(None)
283 }
284
285 /// Begin execution of `partition`, returning a [`Stream`] of
286 /// [`RecordBatch`]es.
287 ///
288 /// # Notes
289 ///
290 /// The `execute` method itself is not `async` but it returns an `async`
291 /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
292 /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
293 /// Most `ExecutionPlan`s should not do any work before the first
294 /// `RecordBatch` is requested from the stream.
295 ///
296 /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
297 /// [`Stream`] into a [`SendableRecordBatchStream`].
298 ///
299 /// Using `async` `Streams` allows for network I/O during execution and
300 /// takes advantage of Rust's built in support for `async` continuations and
301 /// crate ecosystem.
302 ///
303 /// [`Stream`]: futures::stream::Stream
304 /// [`StreamExt`]: futures::stream::StreamExt
305 /// [`TryStreamExt`]: futures::stream::TryStreamExt
306 /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
307 ///
308 /// # Error handling
309 ///
310 /// Any error that occurs during execution is sent as an `Err` in the output
311 /// stream.
312 ///
313 /// `ExecutionPlan` implementations in DataFusion cancel additional work
314 /// immediately once an error occurs. The rationale is that if the overall
315 /// query will return an error, any additional work such as continued
316 /// polling of inputs will be wasted as it will be thrown away.
317 ///
318 /// # Cancellation / Aborting Execution
319 ///
320 /// The [`Stream`] that is returned must ensure that any allocated resources
321 /// are freed when the stream itself is dropped. This is particularly
322 /// important for [`spawn`]ed tasks or threads. Unless care is taken to
323 /// "abort" such tasks, they may continue to consume resources even after
324 /// the plan is dropped, generating intermediate results that are never
325 /// used.
326 /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
327 ///
328 /// To enable timely cancellation, the [`Stream`] that is returned must not
329 /// block the CPU indefinitely and must yield back to the tokio runtime regularly.
330 /// In a typical [`ExecutionPlan`], this automatically happens unless there are
331 /// special circumstances; e.g. when the computational complexity of processing a
332 /// batch is superlinear. See this [general guideline][async-guideline] for more context
333 /// on this point, which explains why one should avoid spending a long time without
334 /// reaching an `await`/yield point in asynchronous runtimes.
335 /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
336 /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
337 /// [`tokio::task::yield_now()`] when appropriate.
338 /// In special cases that warrant manual yielding, determination for "regularly" may be
339 /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
340 /// a timer (being careful with the overhead-heavy system call needed to take the time), or by
341 /// counting rows or batches.
342 ///
343 /// The [cancellation benchmark] tracks some cases of how quickly queries can
344 /// be cancelled.
345 ///
346 /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
347 /// for structures to help ensure all background tasks are cancelled.
348 ///
349 /// [`spawn`]: tokio::task::spawn
350 /// [cancellation benchmark]: https://github.com/apache/datafusion/blob/main/benchmarks/README.md#cancellation
351 /// [`JoinSet`]: datafusion_common_runtime::JoinSet
352 /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
353 /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
354 /// [`Poll::Pending`]: std::task::Poll::Pending
355 /// [async-guideline]: https://ryhl.io/blog/async-what-is-blocking/
356 ///
357 /// # Implementation Examples
358 ///
359 /// While `async` `Stream`s have a non trivial learning curve, the
360 /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
361 /// which help simplify many common operations.
362 ///
363 /// Here are some common patterns:
364 ///
365 /// ## Return Precomputed `RecordBatch`
366 ///
367 /// We can return a precomputed `RecordBatch` as a `Stream`:
368 ///
369 /// ```
370 /// # use std::sync::Arc;
371 /// # use arrow::array::RecordBatch;
372 /// # use arrow::datatypes::SchemaRef;
373 /// # use datafusion_common::Result;
374 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
375 /// # use datafusion_physical_plan::memory::MemoryStream;
376 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
377 /// struct MyPlan {
378 /// batch: RecordBatch,
379 /// }
380 ///
381 /// impl MyPlan {
382 /// fn execute(
383 /// &self,
384 /// partition: usize,
385 /// context: Arc<TaskContext>,
386 /// ) -> Result<SendableRecordBatchStream> {
387 /// // use functions from futures crate convert the batch into a stream
388 /// let fut = futures::future::ready(Ok(self.batch.clone()));
389 /// let stream = futures::stream::once(fut);
390 /// Ok(Box::pin(RecordBatchStreamAdapter::new(
391 /// self.batch.schema(),
392 /// stream,
393 /// )))
394 /// }
395 /// }
396 /// ```
397 ///
398 /// ## Lazily (async) Compute `RecordBatch`
399 ///
400 /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
401 ///
402 /// ```
403 /// # use std::sync::Arc;
404 /// # use arrow::array::RecordBatch;
405 /// # use arrow::datatypes::SchemaRef;
406 /// # use datafusion_common::Result;
407 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
408 /// # use datafusion_physical_plan::memory::MemoryStream;
409 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
410 /// struct MyPlan {
411 /// schema: SchemaRef,
412 /// }
413 ///
414 /// /// Returns a single batch when the returned stream is polled
415 /// async fn get_batch() -> Result<RecordBatch> {
416 /// todo!()
417 /// }
418 ///
419 /// impl MyPlan {
420 /// fn execute(
421 /// &self,
422 /// partition: usize,
423 /// context: Arc<TaskContext>,
424 /// ) -> Result<SendableRecordBatchStream> {
425 /// let fut = get_batch();
426 /// let stream = futures::stream::once(fut);
427 /// Ok(Box::pin(RecordBatchStreamAdapter::new(
428 /// self.schema.clone(),
429 /// stream,
430 /// )))
431 /// }
432 /// }
433 /// ```
434 ///
435 /// ## Lazily (async) create a Stream
436 ///
437 /// If you need to create the return `Stream` using an `async` function,
438 /// you can do so by flattening the result:
439 ///
440 /// ```
441 /// # use std::sync::Arc;
442 /// # use arrow::array::RecordBatch;
443 /// # use arrow::datatypes::SchemaRef;
444 /// # use futures::TryStreamExt;
445 /// # use datafusion_common::Result;
446 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
447 /// # use datafusion_physical_plan::memory::MemoryStream;
448 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
449 /// struct MyPlan {
450 /// schema: SchemaRef,
451 /// }
452 ///
453 /// /// async function that returns a stream
454 /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
455 /// todo!()
456 /// }
457 ///
458 /// impl MyPlan {
459 /// fn execute(
460 /// &self,
461 /// partition: usize,
462 /// context: Arc<TaskContext>,
463 /// ) -> Result<SendableRecordBatchStream> {
464 /// // A future that yields a stream
465 /// let fut = get_batch_stream();
466 /// // Use TryStreamExt::try_flatten to flatten the stream of streams
467 /// let stream = futures::stream::once(fut).try_flatten();
468 /// Ok(Box::pin(RecordBatchStreamAdapter::new(
469 /// self.schema.clone(),
470 /// stream,
471 /// )))
472 /// }
473 /// }
474 /// ```
475 fn execute(
476 &self,
477 partition: usize,
478 context: Arc<TaskContext>,
479 ) -> Result<SendableRecordBatchStream>;
480
481 /// Return a snapshot of the set of [`Metric`]s for this
482 /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
483 ///
484 /// While the values of the metrics in the returned
485 /// [`MetricsSet`]s may change as execution progresses, the
486 /// specific metrics will not.
487 ///
488 /// Once `self.execute()` has returned (technically the future is
489 /// resolved) for all available partitions, the set of metrics
490 /// should be complete. If this function is called prior to
491 /// `execute()` new metrics may appear in subsequent calls.
492 fn metrics(&self) -> Option<MetricsSet> {
493 None
494 }
495
496 /// Returns statistics for a specific partition of this `ExecutionPlan` node.
497 /// If statistics are not available, should return [`Statistics::new_unknown`]
498 /// (the default), not an error.
499 /// If `partition` is `None`, it returns statistics for the entire plan.
500 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
501 if let Some(idx) = partition {
502 // Validate partition index
503 let partition_count = self.properties().partitioning.partition_count();
504 assert_or_internal_err!(
505 idx < partition_count,
506 "Invalid partition index: {}, the partition count is {}",
507 idx,
508 partition_count
509 );
510 }
511 Ok(Arc::new(Statistics::new_unknown(&self.schema())))
512 }
513
514 /// Returns `true` if a limit can be safely pushed down through this
515 /// `ExecutionPlan` node.
516 ///
517 /// If this method returns `true`, and the query plan contains a limit at
518 /// the output of this node, DataFusion will push the limit to the input
519 /// of this node.
520 fn supports_limit_pushdown(&self) -> bool {
521 false
522 }
523
524 /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
525 /// fetch limits. Returns `None` otherwise.
526 ///
527 /// See physical optimizer rule [`limit_pushdown`] for details.
528 ///
529 /// [`limit_pushdown`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/limit_pushdown/index.html
530 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
531 None
532 }
533
534 /// Gets the fetch count for the operator, `None` means there is no fetch.
535 fn fetch(&self) -> Option<usize> {
536 None
537 }
538
539 /// Gets the effect on cardinality, if known
540 fn cardinality_effect(&self) -> CardinalityEffect {
541 CardinalityEffect::Unknown
542 }
543
544 /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
545 ///
546 /// If the operator supports this optimization, the resulting plan will be:
547 /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
548 /// Otherwise, it returns the current `ExecutionPlan` as-is.
549 ///
550 /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
551 /// or not possible, or `Err` on failure.
552 fn try_swapping_with_projection(
553 &self,
554 _projection: &ProjectionExec,
555 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
556 Ok(None)
557 }
558
559 /// Collect filters that this node can push down to its children.
560 /// Filters that are being pushed down from parents are passed in,
561 /// and the node may generate additional filters to push down.
562 /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
563 /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`:
564 /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent
565 /// filters so it only returns that `FilterExec` wants to push down its own predicate.
566 /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from
567 /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key)
568 /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
569 /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec`
570 /// and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything
571 /// since it has no children and no additional filters to push down.
572 /// It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse
573 /// up the plan that `DataSourceExec` can actually bind the filters.
574 ///
575 /// The default implementation bars all parent filters from being pushed down and adds no new filters.
576 /// This is the safest option, making filter pushdown opt-in on a per-node pasis.
577 ///
578 /// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
579 /// Depending on the phase the operator may or may not be allowed to modify the plan.
580 /// See [`FilterPushdownPhase`] for more details.
581 fn gather_filters_for_pushdown(
582 &self,
583 _phase: FilterPushdownPhase,
584 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
585 _config: &ConfigOptions,
586 ) -> Result<FilterDescription> {
587 Ok(FilterDescription::all_unsupported(
588 &parent_filters,
589 &self.children(),
590 ))
591 }
592
593 /// Handle the result of a child pushdown.
594 ///
595 /// This method is called as we recurse back up the plan tree after pushing
596 /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
597 /// It allows the current node to process the results of filter pushdown from
598 /// its children, deciding whether to absorb filters, modify the plan, or pass
599 /// filters back up to its parent.
600 ///
601 /// **Purpose and Context:**
602 /// Filter pushdown is a critical optimization in DataFusion that aims to
603 /// reduce the amount of data processed by applying filters as early as
604 /// possible in the query plan. This method is part of the second phase of
605 /// filter pushdown, where results are propagated back up the tree after
606 /// being pushed down. Each node can inspect the pushdown results from its
607 /// children and decide how to handle any unapplied filters, potentially
608 /// optimizing the plan structure or filter application.
609 ///
610 /// **Behavior in Different Nodes:**
611 /// - For a `DataSourceExec`, this often means absorbing the filters to apply
612 /// them during the scan phase (late materialization), reducing the data
613 /// read from the source.
614 /// - A `FilterExec` may absorb any filters its children could not handle,
615 /// combining them with its own predicate. If no filters remain (i.e., the
616 /// predicate becomes trivially true), it may remove itself from the plan
617 /// altogether. It typically marks parent filters as supported, indicating
618 /// they have been handled.
619 /// - A `HashJoinExec` might ignore the pushdown result if filters need to
620 /// be applied during the join operation. It passes the parent filters back
621 /// up wrapped in [`FilterPushdownPropagation::if_any`], discarding
622 /// any self-filters from children.
623 ///
624 /// **Example Walkthrough:**
625 /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
626 /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
627 /// `FilterExec`, the filter `f1` is gathered and pushed down to
628 /// `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
629 /// the join or add its own filters (e.g., a min-max filter from the build side),
630 /// then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
631 /// has no children to push to, so it prepares to handle filters in the
632 /// upward phase.
633 /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
634 /// `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
635 /// for late materialization during scanning, marking them as supported.
636 /// `HashJoinExec` receives the result, decides whether to apply any
637 /// remaining filters during the join, and passes unhandled filters back
638 /// up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
639 /// updates its predicate if necessary, or removes itself if the predicate
640 /// becomes trivial (e.g., `lit(true)`), and marks filters as supported
641 /// for its parent.
642 ///
643 /// The default implementation is a no-op that passes the result of pushdown
644 /// from the children to its parent transparently, ensuring no filters are
645 /// lost if a node does not override this behavior.
646 ///
647 /// **Notes for Implementation:**
648 /// When returning filters via [`FilterPushdownPropagation`], the order of
649 /// filters need not match the order they were passed in via
650 /// `child_pushdown_result`. However, preserving the order is recommended for
651 /// debugging and ease of reasoning about the resulting plans.
652 ///
653 /// **Helper Methods for Customization:**
654 /// There are various helper methods to simplify implementing this method:
655 /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as
656 /// supported as long as at least one child supports them.
657 /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as
658 /// supported as long as all children support them.
659 /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters
660 /// to the propagation result, indicating which filters are supported by
661 /// the current node.
662 /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
663 /// current node in the propagation result, used if the node
664 /// has modified its plan based on the pushdown results.
665 ///
666 /// **Filter Pushdown Phases:**
667 /// There are two different phases in filter pushdown (`Pre` and others),
668 /// which some operators may handle differently. Depending on the phase, the
669 /// operator may or may not be allowed to modify the plan. See
670 /// [`FilterPushdownPhase`] for more details on phase-specific behavior.
671 ///
672 /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported
673 fn handle_child_pushdown_result(
674 &self,
675 _phase: FilterPushdownPhase,
676 child_pushdown_result: ChildPushdownResult,
677 _config: &ConfigOptions,
678 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
679 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
680 }
681
682 /// Injects arbitrary run-time state into this execution plan, returning a new plan
683 /// instance that incorporates that state *if* it is relevant to the concrete
684 /// node implementation.
685 ///
686 /// This is a generic entry point: the `state` can be any type wrapped in
687 /// `Arc<dyn Any + Send + Sync>`. A node that cares about the state should
688 /// down-cast it to the concrete type it expects and, if successful, return a
689 /// modified copy of itself that captures the provided value. If the state is
690 /// not applicable, the default behaviour is to return `None` so that parent
691 /// nodes can continue propagating the attempt further down the plan tree.
692 ///
693 /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
694 /// down-casts the supplied state to an `Arc<WorkTable>`
695 /// in order to wire up the working table used during recursive-CTE execution.
696 /// Similar patterns can be followed by custom nodes that need late-bound
697 /// dependencies or shared state.
698 fn with_new_state(
699 &self,
700 _state: Arc<dyn Any + Send + Sync>,
701 ) -> Option<Arc<dyn ExecutionPlan>> {
702 None
703 }
704
705 /// Try to push down sort ordering requirements to this node.
706 ///
707 /// This method is called during sort pushdown optimization to determine if this
708 /// node can optimize for a requested sort ordering. Implementations should:
709 ///
710 /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact
711 /// ordering (allowing the Sort operator to be removed)
712 /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the
713 /// ordering but cannot guarantee perfect sorting (Sort operator is kept)
714 /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize
715 /// for the ordering
716 ///
717 /// For transparent nodes (that preserve ordering), implement this to delegate to
718 /// children and wrap the result with a new instance of this node.
719 ///
720 /// Default implementation returns `Unsupported`.
721 fn try_pushdown_sort(
722 &self,
723 _order: &[PhysicalSortExpr],
724 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
725 Ok(SortOrderPushdownResult::Unsupported)
726 }
727
728 /// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity.
729 ///
730 /// This is used to signal to data sources that the output ordering must be
731 /// preserved, even if it might be more efficient to ignore it (e.g. by
732 /// skipping some row groups in Parquet).
733 ///
734 fn with_preserve_order(
735 &self,
736 _preserve_order: bool,
737 ) -> Option<Arc<dyn ExecutionPlan>> {
738 None
739 }
740}
741
742impl dyn ExecutionPlan {
743 /// Returns `true` if the plan is of type `T`.
744 ///
745 /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
746 /// to it.
747 ///
748 /// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
749 /// called on `Arc<dyn ExecutionPlan>` via auto-deref.
750 pub fn is<T: ExecutionPlan>(&self) -> bool {
751 match self.downcast_delegate() {
752 Some(delegate) => delegate.is::<T>(),
753 None => (self as &dyn Any).is::<T>(),
754 }
755 }
756
757 /// Attempts to downcast this plan to a concrete type `T`, returning `None`
758 /// if the plan is not of that type.
759 ///
760 /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
761 /// to it.
762 ///
763 /// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref,
764 /// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
765 /// downcast the `Arc` itself.
766 pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> {
767 match self.downcast_delegate() {
768 Some(delegate) => delegate.downcast_ref::<T>(),
769 None => (self as &dyn Any).downcast_ref(),
770 }
771 }
772}
773
774/// [`ExecutionPlan`] Invariant Level
775///
776/// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan`
777///
778/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
779#[derive(Clone, Copy)]
780pub enum InvariantLevel {
781 /// Invariants that are always true for the [`ExecutionPlan`] node
782 /// such as the number of expected children.
783 Always,
784 /// Invariants that must hold true for the [`ExecutionPlan`] node
785 /// to be "executable", such as ordering and/or distribution requirements
786 /// being fulfilled.
787 Executable,
788}
789
790/// Extension trait provides an easy API to fetch various properties of
791/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
792pub trait ExecutionPlanProperties {
793 /// Specifies how the output of this `ExecutionPlan` is split into
794 /// partitions.
795 fn output_partitioning(&self) -> &Partitioning;
796
797 /// If the output of this `ExecutionPlan` within each partition is sorted,
798 /// returns `Some(keys)` describing the ordering. A `None` return value
799 /// indicates no assumptions should be made on the output ordering.
800 ///
801 /// For example, `SortExec` (obviously) produces sorted output as does
802 /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
803 /// output if its input is sorted as it does not reorder the input rows.
804 fn output_ordering(&self) -> Option<&LexOrdering>;
805
806 /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
807 /// For more details, see [`Boundedness`].
808 fn boundedness(&self) -> Boundedness;
809
810 /// Indicates how the stream of this `ExecutionPlan` emits its results.
811 /// For more details, see [`EmissionType`].
812 fn pipeline_behavior(&self) -> EmissionType;
813
814 /// Get the [`EquivalenceProperties`] within the plan.
815 ///
816 /// Equivalence properties tell DataFusion what columns are known to be
817 /// equal, during various optimization passes. By default, this returns "no
818 /// known equivalences" which is always correct, but may cause DataFusion to
819 /// unnecessarily resort data.
820 ///
821 /// If this ExecutionPlan makes no changes to the schema of the rows flowing
822 /// through it or how columns within each row relate to each other, it
823 /// should return the equivalence properties of its input. For
824 /// example, since [`FilterExec`] may remove rows from its input, but does not
825 /// otherwise modify them, it preserves its input equivalence properties.
826 /// However, since `ProjectionExec` may calculate derived expressions, it
827 /// needs special handling.
828 ///
829 /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
830 /// for related concepts.
831 ///
832 /// [`FilterExec`]: crate::filter::FilterExec
833 fn equivalence_properties(&self) -> &EquivalenceProperties;
834}
835
836impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
837 fn output_partitioning(&self) -> &Partitioning {
838 self.properties().output_partitioning()
839 }
840
841 fn output_ordering(&self) -> Option<&LexOrdering> {
842 self.properties().output_ordering()
843 }
844
845 fn boundedness(&self) -> Boundedness {
846 self.properties().boundedness
847 }
848
849 fn pipeline_behavior(&self) -> EmissionType {
850 self.properties().emission_type
851 }
852
853 fn equivalence_properties(&self) -> &EquivalenceProperties {
854 self.properties().equivalence_properties()
855 }
856}
857
858impl ExecutionPlanProperties for &dyn ExecutionPlan {
859 fn output_partitioning(&self) -> &Partitioning {
860 self.properties().output_partitioning()
861 }
862
863 fn output_ordering(&self) -> Option<&LexOrdering> {
864 self.properties().output_ordering()
865 }
866
867 fn boundedness(&self) -> Boundedness {
868 self.properties().boundedness
869 }
870
871 fn pipeline_behavior(&self) -> EmissionType {
872 self.properties().emission_type
873 }
874
875 fn equivalence_properties(&self) -> &EquivalenceProperties {
876 self.properties().equivalence_properties()
877 }
878}
879
880/// Represents whether a stream of data **generated** by an operator is bounded (finite)
881/// or unbounded (infinite).
882///
883/// This is used to determine whether an execution plan will eventually complete
884/// processing all its data (bounded) or could potentially run forever (unbounded).
885///
886/// For unbounded streams, it also tracks whether the operator requires finite memory
887/// to process the stream or if memory usage could grow unbounded.
888///
889/// Boundedness of the output stream is based on the boundedness of the input stream and the nature of
890/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
891#[derive(Debug, Clone, Copy, PartialEq, Eq)]
892pub enum Boundedness {
893 /// The data stream is bounded (finite) and will eventually complete
894 Bounded,
895 /// The data stream is unbounded (infinite) and could run forever
896 Unbounded {
897 /// Whether this operator requires infinite memory to process the unbounded stream.
898 /// If false, the operator can process an infinite stream with bounded memory.
899 /// If true, memory usage may grow unbounded while processing the stream.
900 ///
901 /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
902 /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
903 requires_infinite_memory: bool,
904 },
905}
906
907impl Boundedness {
908 pub fn is_unbounded(&self) -> bool {
909 matches!(self, Boundedness::Unbounded { .. })
910 }
911}
912
913/// Represents how an operator emits its output records.
914///
915/// This is used to determine whether an operator emits records incrementally as they arrive,
916/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
917/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
918///
919/// For example, in the following plan:
920/// ```text
921/// SortExec [EmissionType::Final]
922/// |_ on: [col1 ASC]
923/// FilterExec [EmissionType::Incremental]
924/// |_ pred: col2 > 100
925/// DataSourceExec [EmissionType::Incremental]
926/// |_ file: "data.csv"
927/// ```
928/// - DataSourceExec emits records incrementally as it reads from the file
929/// - FilterExec processes and emits filtered records incrementally as they arrive
930/// - SortExec must wait for all input records before it can emit the sorted result,
931/// since it needs to see all values to determine their final order
932///
933/// Left joins can emit both incrementally and finally:
934/// - Incrementally emit matches as they are found
935/// - Finally emit non-matches after all input is processed
936#[derive(Debug, Clone, Copy, PartialEq, Eq)]
937pub enum EmissionType {
938 /// Records are emitted incrementally as they arrive and are processed
939 Incremental,
940 /// Records are only emitted once all input has been processed
941 Final,
942 /// Records can be emitted both incrementally and as a final result
943 Both,
944}
945
946/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the
947/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details.
948#[derive(Debug, Clone, Copy, PartialEq, Eq)]
949pub enum SchedulingType {
950 /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in
951 /// cooperative scheduling. This means the implementation of the `Stream` returned by
952 /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as
953 /// [`tokio::task::coop::consume_budget`].
954 ///
955 /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to
956 /// the [`coop`](crate::coop) module for details on when it may be useful to use
957 /// `Cooperative` instead.
958 NonCooperative,
959 /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in
960 /// cooperative scheduling by consuming task budget when it was able to produce a
961 /// [`RecordBatch`].
962 Cooperative,
963}
964
965/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
966///
967/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
968/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
969///
970/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
971/// is known as data-driven or eager evaluation.
972#[derive(Debug, Clone, Copy, PartialEq, Eq)]
973pub enum EvaluationType {
974 /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
975 /// instances when it is demanded by invoking `Stream::poll_next`.
976 /// Filter, projection, and join are examples of such lazy operators.
977 ///
978 /// Lazy operators are also known as demand-driven operators.
979 Lazy,
980 /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
981 /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
982 /// `Stream::poll_next` is called.
983 /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
984 ///
985 /// Eager operators are also known as a data-driven operators.
986 Eager,
987}
988
989/// Utility to determine an operator's boundedness based on its children's boundedness.
990///
991/// Assumes boundedness can be inferred from child operators:
992/// - Unbounded (requires_infinite_memory: true) takes precedence.
993/// - Unbounded (requires_infinite_memory: false) is considered next.
994/// - Otherwise, the operator is bounded.
995///
996/// **Note:** This is a general-purpose utility and may not apply to
997/// all multi-child operators. Ensure your operator's behavior aligns
998/// with these assumptions before using.
999pub(crate) fn boundedness_from_children<'a>(
1000 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
1001) -> Boundedness {
1002 let mut unbounded_with_finite_mem = false;
1003
1004 for child in children {
1005 match child.boundedness() {
1006 Boundedness::Unbounded {
1007 requires_infinite_memory: true,
1008 } => {
1009 return Boundedness::Unbounded {
1010 requires_infinite_memory: true,
1011 };
1012 }
1013 Boundedness::Unbounded {
1014 requires_infinite_memory: false,
1015 } => {
1016 unbounded_with_finite_mem = true;
1017 }
1018 Boundedness::Bounded => {}
1019 }
1020 }
1021
1022 if unbounded_with_finite_mem {
1023 Boundedness::Unbounded {
1024 requires_infinite_memory: false,
1025 }
1026 } else {
1027 Boundedness::Bounded
1028 }
1029}
1030
1031/// Determines the emission type of an operator based on its children's pipeline behavior.
1032///
1033/// The precedence of emission types is:
1034/// - `Final` has the highest precedence.
1035/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
1036/// - `Incremental` is the default if all children emit incremental results.
1037///
1038/// **Note:** This is a general-purpose utility and may not apply to
1039/// all multi-child operators. Verify your operator's behavior aligns
1040/// with these assumptions.
1041pub(crate) fn emission_type_from_children<'a>(
1042 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
1043) -> EmissionType {
1044 let mut inc_and_final = false;
1045
1046 for child in children {
1047 match child.pipeline_behavior() {
1048 EmissionType::Final => return EmissionType::Final,
1049 EmissionType::Both => inc_and_final = true,
1050 EmissionType::Incremental => continue,
1051 }
1052 }
1053
1054 if inc_and_final {
1055 EmissionType::Both
1056 } else {
1057 EmissionType::Incremental
1058 }
1059}
1060
1061/// Stores certain, often expensive to compute, plan properties used in query
1062/// optimization.
1063///
1064/// These properties are stored a single structure to permit this information to
1065/// be computed once and then those cached results used multiple times without
1066/// recomputation (aka a cache)
1067#[derive(Debug, Clone)]
1068pub struct PlanProperties {
1069 /// See [ExecutionPlanProperties::equivalence_properties]
1070 pub eq_properties: EquivalenceProperties,
1071 /// See [ExecutionPlanProperties::output_partitioning]
1072 pub partitioning: Partitioning,
1073 /// See [ExecutionPlanProperties::pipeline_behavior]
1074 pub emission_type: EmissionType,
1075 /// See [ExecutionPlanProperties::boundedness]
1076 pub boundedness: Boundedness,
1077 pub evaluation_type: EvaluationType,
1078 pub scheduling_type: SchedulingType,
1079 /// See [ExecutionPlanProperties::output_ordering]
1080 output_ordering: Option<LexOrdering>,
1081}
1082
1083impl PlanProperties {
1084 /// Construct a new `PlanPropertiesCache` from the
1085 pub fn new(
1086 eq_properties: EquivalenceProperties,
1087 partitioning: Partitioning,
1088 emission_type: EmissionType,
1089 boundedness: Boundedness,
1090 ) -> Self {
1091 // Output ordering can be derived from `eq_properties`.
1092 let output_ordering = eq_properties.output_ordering();
1093 Self {
1094 eq_properties,
1095 partitioning,
1096 emission_type,
1097 boundedness,
1098 evaluation_type: EvaluationType::Lazy,
1099 scheduling_type: SchedulingType::NonCooperative,
1100 output_ordering,
1101 }
1102 }
1103
1104 /// Overwrite output partitioning with its new value.
1105 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
1106 self.partitioning = partitioning;
1107 self
1108 }
1109
1110 /// Set equivalence properties having mut reference.
1111 pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) {
1112 // Changing equivalence properties also changes output ordering, so
1113 // make sure to overwrite it:
1114 self.output_ordering = eq_properties.output_ordering();
1115 self.eq_properties = eq_properties;
1116 }
1117
1118 /// Overwrite equivalence properties with its new value.
1119 pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
1120 self.set_eq_properties(eq_properties);
1121 self
1122 }
1123
1124 /// Overwrite boundedness with its new value.
1125 pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
1126 self.boundedness = boundedness;
1127 self
1128 }
1129
1130 /// Overwrite emission type with its new value.
1131 pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
1132 self.emission_type = emission_type;
1133 self
1134 }
1135
1136 /// Set the [`SchedulingType`].
1137 ///
1138 /// Defaults to [`SchedulingType::NonCooperative`]
1139 pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
1140 self.scheduling_type = scheduling_type;
1141 self
1142 }
1143
1144 /// Set the [`EvaluationType`].
1145 ///
1146 /// Defaults to [`EvaluationType::Lazy`]
1147 pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
1148 self.evaluation_type = drive_type;
1149 self
1150 }
1151
1152 /// Set constraints having mut reference.
1153 pub fn set_constraints(&mut self, constraints: Constraints) {
1154 self.eq_properties.set_constraints(constraints);
1155 }
1156
1157 /// Overwrite constraints with its new value.
1158 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1159 self.set_constraints(constraints);
1160 self
1161 }
1162
1163 pub fn equivalence_properties(&self) -> &EquivalenceProperties {
1164 &self.eq_properties
1165 }
1166
1167 pub fn output_partitioning(&self) -> &Partitioning {
1168 &self.partitioning
1169 }
1170
1171 pub fn output_ordering(&self) -> Option<&LexOrdering> {
1172 self.output_ordering.as_ref()
1173 }
1174
1175 /// Get schema of the node.
1176 pub(crate) fn schema(&self) -> &SchemaRef {
1177 self.eq_properties.schema()
1178 }
1179}
1180
1181macro_rules! check_len {
1182 ($target:expr, $func_name:ident, $expected_len:expr) => {
1183 let actual_len = $target.$func_name().len();
1184 assert_eq_or_internal_err!(
1185 actual_len,
1186 $expected_len,
1187 "{}::{} returned Vec with incorrect size: {} != {}",
1188 $target.name(),
1189 stringify!($func_name),
1190 actual_len,
1191 $expected_len
1192 );
1193 };
1194}
1195
1196/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1197/// Returns an error if the given node does not conform.
1198pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1199 plan: &P,
1200 _check: InvariantLevel,
1201) -> Result<(), DataFusionError> {
1202 let children_len = plan.children().len();
1203
1204 check_len!(plan, maintains_input_order, children_len);
1205 check_len!(plan, required_input_ordering, children_len);
1206 check_len!(plan, required_input_distribution, children_len);
1207 check_len!(plan, benefits_from_input_partitioning, children_len);
1208
1209 Ok(())
1210}
1211
1212/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
1213/// especially for the distributed engine to judge whether need to deal with shuffling.
1214/// Currently, there are 3 kinds of execution plan which needs data exchange
1215/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
1216/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
1217/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1218#[expect(clippy::needless_pass_by_value)]
1219pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
1220 plan.properties().evaluation_type == EvaluationType::Eager
1221}
1222
1223/// Returns a copy of this plan if we change any child according to the pointer comparison.
1224/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1225pub fn with_new_children_if_necessary(
1226 plan: Arc<dyn ExecutionPlan>,
1227 children: Vec<Arc<dyn ExecutionPlan>>,
1228) -> Result<Arc<dyn ExecutionPlan>> {
1229 let old_children = plan.children();
1230 assert_eq_or_internal_err!(
1231 children.len(),
1232 old_children.len(),
1233 "Wrong number of children"
1234 );
1235 if children.is_empty()
1236 || children
1237 .iter()
1238 .zip(old_children.iter())
1239 .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
1240 {
1241 plan.with_new_children(children)
1242 } else {
1243 Ok(plan)
1244 }
1245}
1246
1247/// Return a [`DisplayableExecutionPlan`] wrapper around an
1248/// [`ExecutionPlan`] which can be displayed in various easier to
1249/// understand ways.
1250///
1251/// See examples on [`DisplayableExecutionPlan`]
1252pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
1253 DisplayableExecutionPlan::new(plan)
1254}
1255
1256/// Execute the [ExecutionPlan] and collect the results in memory
1257pub async fn collect(
1258 plan: Arc<dyn ExecutionPlan>,
1259 context: Arc<TaskContext>,
1260) -> Result<Vec<RecordBatch>> {
1261 let stream = execute_stream(plan, context)?;
1262 crate::common::collect(stream).await
1263}
1264
1265/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
1266///
1267/// See [collect] to buffer the `RecordBatch`es in memory.
1268///
1269/// # Aborting Execution
1270///
1271/// Dropping the stream will abort the execution of the query, and free up
1272/// any allocated resources
1273#[expect(
1274 clippy::needless_pass_by_value,
1275 reason = "Public API that historically takes owned Arcs"
1276)]
1277pub fn execute_stream(
1278 plan: Arc<dyn ExecutionPlan>,
1279 context: Arc<TaskContext>,
1280) -> Result<SendableRecordBatchStream> {
1281 match plan.output_partitioning().partition_count() {
1282 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1283 1 => plan.execute(0, context),
1284 2.. => {
1285 // merge into a single partition
1286 let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
1287 // CoalescePartitionsExec must produce a single partition
1288 assert_eq!(1, plan.properties().output_partitioning().partition_count());
1289 plan.execute(0, context)
1290 }
1291 }
1292}
1293
1294/// Execute the [ExecutionPlan] and collect the results in memory
1295pub async fn collect_partitioned(
1296 plan: Arc<dyn ExecutionPlan>,
1297 context: Arc<TaskContext>,
1298) -> Result<Vec<Vec<RecordBatch>>> {
1299 let streams = execute_stream_partitioned(plan, context)?;
1300
1301 let mut join_set = JoinSet::new();
1302 // Execute the plan and collect the results into batches.
1303 streams.into_iter().enumerate().for_each(|(idx, stream)| {
1304 join_set.spawn(async move {
1305 let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
1306 (idx, result)
1307 });
1308 });
1309
1310 let mut batches = vec![];
1311 // Note that currently this doesn't identify the thread that panicked
1312 //
1313 // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
1314 // once it is stable
1315 while let Some(result) = join_set.join_next().await {
1316 match result {
1317 Ok((idx, res)) => batches.push((idx, res?)),
1318 Err(e) => {
1319 if e.is_panic() {
1320 std::panic::resume_unwind(e.into_panic());
1321 } else {
1322 unreachable!();
1323 }
1324 }
1325 }
1326 }
1327
1328 batches.sort_by_key(|(idx, _)| *idx);
1329 let batches = batches.into_iter().map(|(_, batch)| batch).collect();
1330
1331 Ok(batches)
1332}
1333
1334/// Execute the [ExecutionPlan] and return a vec with one stream per output
1335/// partition
1336///
1337/// # Aborting Execution
1338///
1339/// Dropping the stream will abort the execution of the query, and free up
1340/// any allocated resources
1341#[expect(
1342 clippy::needless_pass_by_value,
1343 reason = "Public API that historically takes owned Arcs"
1344)]
1345pub fn execute_stream_partitioned(
1346 plan: Arc<dyn ExecutionPlan>,
1347 context: Arc<TaskContext>,
1348) -> Result<Vec<SendableRecordBatchStream>> {
1349 let num_partitions = plan.output_partitioning().partition_count();
1350 let mut streams = Vec::with_capacity(num_partitions);
1351 for i in 0..num_partitions {
1352 streams.push(plan.execute(i, Arc::clone(&context))?);
1353 }
1354 Ok(streams)
1355}
1356
1357/// Executes an input stream and ensures that the resulting stream adheres to
1358/// the `not null` constraints specified in the `sink_schema`.
1359///
1360/// # Arguments
1361///
1362/// * `input` - An execution plan
1363/// * `sink_schema` - The schema to be applied to the output stream
1364/// * `partition` - The partition index to be executed
1365/// * `context` - The task context
1366///
1367/// # Returns
1368///
1369/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
1370///
1371/// This function first executes the given input plan for the specified partition
1372/// and context. It then checks if there are any columns in the input that might
1373/// violate the `not null` constraints specified in the `sink_schema`. If there are
1374/// such columns, it wraps the resulting stream to enforce the `not null` constraints
1375/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1376#[expect(
1377 clippy::needless_pass_by_value,
1378 reason = "Public API that historically takes owned Arcs"
1379)]
1380pub fn execute_input_stream(
1381 input: Arc<dyn ExecutionPlan>,
1382 sink_schema: SchemaRef,
1383 partition: usize,
1384 context: Arc<TaskContext>,
1385) -> Result<SendableRecordBatchStream> {
1386 let input_stream = input.execute(partition, context)?;
1387
1388 debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
1389
1390 // Find input columns that may violate the not null constraint.
1391 let risky_columns: Vec<_> = sink_schema
1392 .fields()
1393 .iter()
1394 .zip(input.schema().fields().iter())
1395 .enumerate()
1396 .filter_map(|(idx, (sink_field, input_field))| {
1397 (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
1398 })
1399 .collect();
1400
1401 if risky_columns.is_empty() {
1402 Ok(input_stream)
1403 } else {
1404 // Check not null constraint on the input stream
1405 Ok(Box::pin(RecordBatchStreamAdapter::new(
1406 sink_schema,
1407 input_stream
1408 .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
1409 )))
1410 }
1411}
1412
1413/// Checks a `RecordBatch` for `not null` constraints on specified columns.
1414///
1415/// # Arguments
1416///
1417/// * `batch` - The `RecordBatch` to be checked
1418/// * `column_indices` - A vector of column indices that should be checked for
1419/// `not null` constraints.
1420///
1421/// # Returns
1422///
1423/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
1424///
1425/// This function iterates over the specified column indices and ensures that none
1426/// of the columns contain null values. If any column contains null values, an error
1427/// is returned.
1428pub fn check_not_null_constraints(
1429 batch: RecordBatch,
1430 column_indices: &Vec<usize>,
1431) -> Result<RecordBatch> {
1432 for &index in column_indices {
1433 if batch.num_columns() <= index {
1434 return exec_err!(
1435 "Invalid batch column count {} expected > {}",
1436 batch.num_columns(),
1437 index
1438 );
1439 }
1440
1441 if batch
1442 .column(index)
1443 .logical_nulls()
1444 .map(|nulls| nulls.null_count())
1445 .unwrap_or_default()
1446 > 0
1447 {
1448 return exec_err!(
1449 "Invalid batch column at '{}' has null but schema specifies non-nullable",
1450 index
1451 );
1452 }
1453 }
1454
1455 Ok(batch)
1456}
1457
1458/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1459///
1460/// Some plans will change their internal states after execution, making them unable to be executed again.
1461/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1462///
1463/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1464/// However, if the data of the left table is derived from the work table, it will become outdated
1465/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1466///
1467/// # Limitations
1468///
1469/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1470///
1471/// * uses dynamic filters,
1472/// * represents a recursive query.
1473///
1474pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1475 plan.transform_up(|plan| {
1476 let new_plan = Arc::clone(&plan).reset_state()?;
1477 Ok(Transformed::yes(new_plan))
1478 })
1479 .data()
1480}
1481
1482/// Check if the `plan` children has the same properties as passed `children`.
1483/// In this case plan can avoid self properties re-computation when its children
1484/// replace is requested.
1485/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1486pub fn has_same_children_properties(
1487 plan: &impl ExecutionPlan,
1488 children: &[Arc<dyn ExecutionPlan>],
1489) -> Result<bool> {
1490 let old_children = plan.children();
1491 assert_eq_or_internal_err!(
1492 children.len(),
1493 old_children.len(),
1494 "Wrong number of children"
1495 );
1496 for (lhs, rhs) in old_children.iter().zip(children.iter()) {
1497 if !Arc::ptr_eq(lhs.properties(), rhs.properties()) {
1498 return Ok(false);
1499 }
1500 }
1501 Ok(true)
1502}
1503
1504/// Helper macro to avoid properties re-computation if passed children properties
1505/// the same as plan already has. Could be used to implement fast-path for method
1506/// [`ExecutionPlan::with_new_children`].
1507#[macro_export]
1508macro_rules! check_if_same_properties {
1509 ($plan: expr, $children: expr) => {
1510 if $crate::execution_plan::has_same_children_properties(
1511 $plan.as_ref(),
1512 &$children,
1513 )? {
1514 let plan = $plan.with_new_children_and_same_properties($children);
1515 return Ok(::std::sync::Arc::new(plan));
1516 }
1517 };
1518}
1519
1520/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1521pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1522 let formatted = displayable(plan.as_ref()).indent(true).to_string();
1523 let actual: Vec<&str> = formatted.trim().lines().collect();
1524 actual.iter().map(|elem| (*elem).to_string()).collect()
1525}
1526
1527/// Indicates the effect an execution plan operator will have on the cardinality
1528/// of its input stream
1529pub enum CardinalityEffect {
1530 /// Unknown effect. This is the default
1531 Unknown,
1532 /// The operator is guaranteed to produce exactly one row for
1533 /// each input row
1534 Equal,
1535 /// The operator may produce fewer output rows than it receives input rows
1536 LowerEqual,
1537 /// The operator may produce more output rows than it receives input rows
1538 GreaterEqual,
1539}
1540
1541/// Can be used in contexts where properties have not yet been initialized properly.
1542pub(crate) fn stub_properties() -> Arc<PlanProperties> {
1543 static STUB_PROPERTIES: LazyLock<Arc<PlanProperties>> = LazyLock::new(|| {
1544 Arc::new(PlanProperties::new(
1545 EquivalenceProperties::new(Arc::new(Schema::empty())),
1546 Partitioning::UnknownPartitioning(1),
1547 EmissionType::Final,
1548 Boundedness::Bounded,
1549 ))
1550 });
1551
1552 Arc::clone(&STUB_PROPERTIES)
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557
1558 use super::*;
1559 use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1560
1561 use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1562 use arrow::datatypes::{DataType, Field, Schema};
1563
1564 #[derive(Debug)]
1565 pub struct EmptyExec;
1566
1567 impl EmptyExec {
1568 pub fn new(_schema: SchemaRef) -> Self {
1569 Self
1570 }
1571 }
1572
1573 impl DisplayAs for EmptyExec {
1574 fn fmt_as(
1575 &self,
1576 _t: DisplayFormatType,
1577 _f: &mut std::fmt::Formatter,
1578 ) -> std::fmt::Result {
1579 unimplemented!()
1580 }
1581 }
1582
1583 impl ExecutionPlan for EmptyExec {
1584 fn name(&self) -> &'static str {
1585 Self::static_name()
1586 }
1587
1588 fn properties(&self) -> &Arc<PlanProperties> {
1589 unimplemented!()
1590 }
1591
1592 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1593 vec![]
1594 }
1595
1596 fn with_new_children(
1597 self: Arc<Self>,
1598 _: Vec<Arc<dyn ExecutionPlan>>,
1599 ) -> Result<Arc<dyn ExecutionPlan>> {
1600 unimplemented!()
1601 }
1602
1603 fn execute(
1604 &self,
1605 _partition: usize,
1606 _context: Arc<TaskContext>,
1607 ) -> Result<SendableRecordBatchStream> {
1608 unimplemented!()
1609 }
1610
1611 fn partition_statistics(
1612 &self,
1613 _partition: Option<usize>,
1614 ) -> Result<Arc<Statistics>> {
1615 unimplemented!()
1616 }
1617 }
1618
1619 #[derive(Debug)]
1620 pub struct RenamedEmptyExec;
1621
1622 impl RenamedEmptyExec {
1623 pub fn new(_schema: SchemaRef) -> Self {
1624 Self
1625 }
1626 }
1627
1628 impl DisplayAs for RenamedEmptyExec {
1629 fn fmt_as(
1630 &self,
1631 _t: DisplayFormatType,
1632 _f: &mut std::fmt::Formatter,
1633 ) -> std::fmt::Result {
1634 unimplemented!()
1635 }
1636 }
1637
1638 impl ExecutionPlan for RenamedEmptyExec {
1639 fn name(&self) -> &'static str {
1640 Self::static_name()
1641 }
1642
1643 fn static_name() -> &'static str
1644 where
1645 Self: Sized,
1646 {
1647 "MyRenamedEmptyExec"
1648 }
1649
1650 fn properties(&self) -> &Arc<PlanProperties> {
1651 unimplemented!()
1652 }
1653
1654 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1655 vec![]
1656 }
1657
1658 fn with_new_children(
1659 self: Arc<Self>,
1660 _: Vec<Arc<dyn ExecutionPlan>>,
1661 ) -> Result<Arc<dyn ExecutionPlan>> {
1662 unimplemented!()
1663 }
1664
1665 fn execute(
1666 &self,
1667 _partition: usize,
1668 _context: Arc<TaskContext>,
1669 ) -> Result<SendableRecordBatchStream> {
1670 unimplemented!()
1671 }
1672
1673 fn partition_statistics(
1674 &self,
1675 _partition: Option<usize>,
1676 ) -> Result<Arc<Statistics>> {
1677 unimplemented!()
1678 }
1679 }
1680
1681 #[derive(Debug)]
1682 struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>);
1683
1684 impl DisplayAs for DowncastDelegatingExec {
1685 fn fmt_as(
1686 &self,
1687 _t: DisplayFormatType,
1688 _f: &mut std::fmt::Formatter,
1689 ) -> std::fmt::Result {
1690 unimplemented!()
1691 }
1692 }
1693
1694 impl ExecutionPlan for DowncastDelegatingExec {
1695 fn name(&self) -> &'static str {
1696 Self::static_name()
1697 }
1698
1699 fn properties(&self) -> &Arc<PlanProperties> {
1700 unimplemented!()
1701 }
1702
1703 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1704 vec![]
1705 }
1706
1707 fn with_new_children(
1708 self: Arc<Self>,
1709 _: Vec<Arc<dyn ExecutionPlan>>,
1710 ) -> Result<Arc<dyn ExecutionPlan>> {
1711 unimplemented!()
1712 }
1713
1714 fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
1715 Some(self.0.as_ref())
1716 }
1717
1718 fn execute(
1719 &self,
1720 _partition: usize,
1721 _context: Arc<TaskContext>,
1722 ) -> Result<SendableRecordBatchStream> {
1723 unimplemented!()
1724 }
1725
1726 fn partition_statistics(
1727 &self,
1728 _partition: Option<usize>,
1729 ) -> Result<Arc<Statistics>> {
1730 unimplemented!()
1731 }
1732 }
1733 #[test]
1734 fn test_execution_plan_name() {
1735 let schema1 = Arc::new(Schema::empty());
1736 let default_name_exec = EmptyExec::new(schema1);
1737 assert_eq!(default_name_exec.name(), "EmptyExec");
1738
1739 let schema2 = Arc::new(Schema::empty());
1740 let renamed_exec = RenamedEmptyExec::new(schema2);
1741 assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1742 assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1743 }
1744
1745 #[test]
1746 fn test_execution_plan_downcast_delegates_to_downcast_delegate() {
1747 let schema = Arc::new(Schema::empty());
1748 let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
1749 let wrapped: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec(inner));
1750 let nested: Arc<dyn ExecutionPlan> =
1751 Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped)));
1752
1753 for plan in [wrapped.as_ref(), nested.as_ref()] {
1754 assert!(!plan.is::<DowncastDelegatingExec>());
1755 assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none());
1756 assert!(plan.is::<EmptyExec>());
1757 assert!(plan.downcast_ref::<EmptyExec>().is_some());
1758 assert!(!plan.is::<RenamedEmptyExec>());
1759 assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none());
1760 }
1761 }
1762
1763 /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1764 /// be called from a trait object.
1765 /// Related ticket: https://github.com/apache/datafusion/pull/11047
1766 #[expect(unused)]
1767 fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1768 let _ = plan.name();
1769 }
1770
1771 #[test]
1772 fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1773 check_not_null_constraints(
1774 RecordBatch::try_new(
1775 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1776 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1777 )?,
1778 &vec![0],
1779 )?;
1780 Ok(())
1781 }
1782
1783 #[test]
1784 fn test_check_not_null_constraints_reject_null() -> Result<()> {
1785 let result = check_not_null_constraints(
1786 RecordBatch::try_new(
1787 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1788 vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1789 )?,
1790 &vec![0],
1791 );
1792 assert!(result.is_err());
1793 assert_eq!(
1794 result.err().unwrap().strip_backtrace(),
1795 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1796 );
1797 Ok(())
1798 }
1799
1800 #[test]
1801 fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1802 // some null value inside REE array
1803 let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1804 let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1805 let run_end_array = RunArray::try_new(&run_ends, &values)?;
1806 let result = check_not_null_constraints(
1807 RecordBatch::try_new(
1808 Arc::new(Schema::new(vec![Field::new(
1809 "a",
1810 run_end_array.data_type().to_owned(),
1811 true,
1812 )])),
1813 vec![Arc::new(run_end_array)],
1814 )?,
1815 &vec![0],
1816 );
1817 assert!(result.is_err());
1818 assert_eq!(
1819 result.err().unwrap().strip_backtrace(),
1820 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1821 );
1822 Ok(())
1823 }
1824
1825 #[test]
1826 fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1827 let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1828 let keys = Int32Array::from(vec![0, 1, 2, 3]);
1829 let dictionary = DictionaryArray::new(keys, values);
1830 let result = check_not_null_constraints(
1831 RecordBatch::try_new(
1832 Arc::new(Schema::new(vec![Field::new(
1833 "a",
1834 dictionary.data_type().to_owned(),
1835 true,
1836 )])),
1837 vec![Arc::new(dictionary)],
1838 )?,
1839 &vec![0],
1840 );
1841 assert!(result.is_err());
1842 assert_eq!(
1843 result.err().unwrap().strip_backtrace(),
1844 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1845 );
1846 Ok(())
1847 }
1848
1849 #[test]
1850 fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1851 // some null value marked out by dictionary array
1852 let values = Arc::new(Int32Array::from(vec![
1853 Some(1),
1854 None, // this null value is masked by dictionary keys
1855 Some(3),
1856 Some(4),
1857 ]));
1858 let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1859 let dictionary = DictionaryArray::new(keys, values);
1860 check_not_null_constraints(
1861 RecordBatch::try_new(
1862 Arc::new(Schema::new(vec![Field::new(
1863 "a",
1864 dictionary.data_type().to_owned(),
1865 true,
1866 )])),
1867 vec![Arc::new(dictionary)],
1868 )?,
1869 &vec![0],
1870 )?;
1871 Ok(())
1872 }
1873
1874 #[test]
1875 fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1876 // null value of Null type
1877 let result = check_not_null_constraints(
1878 RecordBatch::try_new(
1879 Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1880 vec![Arc::new(NullArray::new(3))],
1881 )?,
1882 &vec![0],
1883 );
1884 assert!(result.is_err());
1885 assert_eq!(
1886 result.err().unwrap().strip_backtrace(),
1887 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1888 );
1889 Ok(())
1890 }
1891}