datafusion_physical_plan/execution_plan.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19use crate::filter_pushdown::{
20 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21 FilterPushdownPropagation,
22};
23pub use crate::metrics::Metric;
24pub use crate::ordering::InputOrderMode;
25use crate::sort_pushdown::SortOrderPushdownResult;
26pub use crate::stream::EmptyRecordBatchStream;
27
28use arrow_schema::Schema;
29pub use datafusion_common::hash_utils;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31pub use datafusion_common::utils::project_schema;
32pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
33pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
34pub use datafusion_expr::{Accumulator, ColumnarValue};
35pub use datafusion_physical_expr::window::WindowExpr;
36pub use datafusion_physical_expr::{
37 Distribution, Partitioning, PhysicalExpr, expressions,
38};
39
40use std::any::Any;
41use std::fmt::Debug;
42use std::sync::{Arc, LazyLock};
43
44use crate::coalesce_partitions::CoalescePartitionsExec;
45use crate::display::DisplayableExecutionPlan;
46use crate::metrics::MetricsSet;
47use crate::projection::ProjectionExec;
48use crate::stream::RecordBatchStreamAdapter;
49
50use arrow::array::{Array, RecordBatch};
51use arrow::datatypes::SchemaRef;
52use datafusion_common::config::ConfigOptions;
53use datafusion_common::{
54 Constraints, DataFusionError, Result, assert_eq_or_internal_err,
55 assert_or_internal_err, exec_err,
56};
57use datafusion_common_runtime::JoinSet;
58use datafusion_execution::TaskContext;
59use datafusion_physical_expr::EquivalenceProperties;
60use datafusion_physical_expr_common::sort_expr::{
61 LexOrdering, OrderingRequirements, PhysicalSortExpr,
62};
63
64use futures::stream::{StreamExt, TryStreamExt};
65
66/// Represent nodes in the DataFusion Physical Plan.
67///
68/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
69/// [`RecordBatch`] that incrementally computes a partition of the
70/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
71/// details on partitioning.
72///
73/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
74/// properties of the output to the DataFusion optimizer, and methods such as
75/// [`required_input_distribution`] and [`required_input_ordering`] express
76/// requirements of the `ExecutionPlan` from its input.
77///
78/// [`ExecutionPlan`] can be displayed in a simplified form using the
79/// return value from [`displayable`] in addition to the (normally
80/// quite verbose) `Debug` output.
81///
82/// [`execute`]: ExecutionPlan::execute
83/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
84/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
85///
86/// # Examples
87///
88/// See [`datafusion-examples`] for examples, including
89/// [`memory_pool_execution_plan.rs`] which shows how to implement a custom
90/// `ExecutionPlan` with memory tracking and spilling support.
91///
92/// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples
93/// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
94pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
95 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
96 ///
97 /// Implementation note: this method can just proxy to
98 /// [`static_name`](ExecutionPlan::static_name) if no special action is
99 /// needed. It doesn't provide a default implementation like that because
100 /// this method doesn't require the `Sized` constrain to allow a wilder
101 /// range of use cases.
102 fn name(&self) -> &str;
103
104 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
105 /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
106 fn static_name() -> &'static str
107 where
108 Self: Sized,
109 {
110 let full_name = std::any::type_name::<Self>();
111 let maybe_start_idx = full_name.rfind(':');
112 match maybe_start_idx {
113 Some(start_idx) => &full_name[start_idx + 1..],
114 None => "UNKNOWN",
115 }
116 }
117
118 /// Returns the execution plan as [`Any`] so that it can be
119 /// downcast to a specific implementation.
120 fn as_any(&self) -> &dyn Any;
121
122 /// Get the schema for this execution plan
123 fn schema(&self) -> SchemaRef {
124 Arc::clone(self.properties().schema())
125 }
126
127 /// Return properties of the output of the `ExecutionPlan`, such as output
128 /// ordering(s), partitioning information etc.
129 ///
130 /// This information is available via methods on [`ExecutionPlanProperties`]
131 /// trait, which is implemented for all `ExecutionPlan`s.
132 fn properties(&self) -> &Arc<PlanProperties>;
133
134 /// Returns an error if this individual node does not conform to its invariants.
135 /// These invariants are typically only checked in debug mode.
136 ///
137 /// A default set of invariants is provided in the [check_default_invariants] function.
138 /// The default implementation of `check_invariants` calls this function.
139 /// Extension nodes can provide their own invariants.
140 fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
141 check_default_invariants(self, check)
142 }
143
144 /// Specifies the data distribution requirements for all the
145 /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
146 fn required_input_distribution(&self) -> Vec<Distribution> {
147 vec![Distribution::UnspecifiedDistribution; self.children().len()]
148 }
149
150 /// Specifies the ordering required for all of the children of this
151 /// `ExecutionPlan`.
152 ///
153 /// For each child, it's the local ordering requirement within
154 /// each partition rather than the global ordering
155 ///
156 /// NOTE that checking `!is_empty()` does **not** check for a
157 /// required input ordering. Instead, the correct check is that at
158 /// least one entry must be `Some`
159 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
160 vec![None; self.children().len()]
161 }
162
163 /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
164 /// rows within or between partitions.
165 ///
166 /// For example, Projection, Filter, and Limit maintain the order
167 /// of inputs -- they may transform values (Projection) or not
168 /// produce the same number of rows that went in (Filter and
169 /// Limit), but the rows that are produced go in the same way.
170 ///
171 /// DataFusion uses this metadata to apply certain optimizations
172 /// such as automatically repartitioning correctly.
173 ///
174 /// The default implementation returns `false`
175 ///
176 /// WARNING: if you override this default, you *MUST* ensure that
177 /// the `ExecutionPlan`'s maintains the ordering invariant or else
178 /// DataFusion may produce incorrect results.
179 fn maintains_input_order(&self) -> Vec<bool> {
180 vec![false; self.children().len()]
181 }
182
183 /// Specifies whether the `ExecutionPlan` benefits from increased
184 /// parallelization at its input for each child.
185 ///
186 /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
187 /// its corresponding child (and thus from more parallelism). For
188 /// `ExecutionPlan` that do very little work the overhead of extra
189 /// parallelism may outweigh any benefits
190 ///
191 /// The default implementation returns `true` unless this `ExecutionPlan`
192 /// has signalled it requires a single child input partition.
193 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
194 // By default try to maximize parallelism with more CPUs if
195 // possible
196 self.required_input_distribution()
197 .into_iter()
198 .map(|dist| !matches!(dist, Distribution::SinglePartition))
199 .collect()
200 }
201
202 /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
203 /// The returned list will be empty for leaf nodes such as scans, will contain
204 /// a single value for unary nodes, or two values for binary nodes (such as
205 /// joins).
206 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
207
208 /// Returns a new `ExecutionPlan` where all existing children were replaced
209 /// by the `children`, in order
210 fn with_new_children(
211 self: Arc<Self>,
212 children: Vec<Arc<dyn ExecutionPlan>>,
213 ) -> Result<Arc<dyn ExecutionPlan>>;
214
215 /// Reset any internal state within this [`ExecutionPlan`].
216 ///
217 /// This method is called when an [`ExecutionPlan`] needs to be re-executed,
218 /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
219 /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
220 /// are reset to their initial state.
221 ///
222 /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
223 /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
224 /// necessarily resetting any internal state. Implementations that require resetting of some
225 /// internal state should override this method to provide the necessary logic.
226 ///
227 /// This method should *not* reset state recursively for children, as it is expected that
228 /// it will be called from within a walk of the execution plan tree so that it will be called on each child later
229 /// or was already called on each child.
230 ///
231 /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
232 /// thus it is expected that any cached plan properties will remain valid after the reset.
233 ///
234 /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
235 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
236 let children = self.children().into_iter().cloned().collect();
237 self.with_new_children(children)
238 }
239
240 /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
241 /// produce `target_partitions` partitions.
242 ///
243 /// If the `ExecutionPlan` does not support changing its partitioning,
244 /// returns `Ok(None)` (the default).
245 ///
246 /// It is the `ExecutionPlan` can increase its partitioning, but not to the
247 /// `target_partitions`, it may return an ExecutionPlan with fewer
248 /// partitions. This might happen, for example, if each new partition would
249 /// be too small to be efficiently processed individually.
250 ///
251 /// The DataFusion optimizer attempts to use as many threads as possible by
252 /// repartitioning its inputs to match the target number of threads
253 /// available (`target_partitions`). Some data sources, such as the built in
254 /// CSV and Parquet readers, implement this method as they are able to read
255 /// from their input files in parallel, regardless of how the source data is
256 /// split amongst files.
257 fn repartitioned(
258 &self,
259 _target_partitions: usize,
260 _config: &ConfigOptions,
261 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
262 Ok(None)
263 }
264
265 /// Begin execution of `partition`, returning a [`Stream`] of
266 /// [`RecordBatch`]es.
267 ///
268 /// # Notes
269 ///
270 /// The `execute` method itself is not `async` but it returns an `async`
271 /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
272 /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
273 /// Most `ExecutionPlan`s should not do any work before the first
274 /// `RecordBatch` is requested from the stream.
275 ///
276 /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
277 /// [`Stream`] into a [`SendableRecordBatchStream`].
278 ///
279 /// Using `async` `Streams` allows for network I/O during execution and
280 /// takes advantage of Rust's built in support for `async` continuations and
281 /// crate ecosystem.
282 ///
283 /// [`Stream`]: futures::stream::Stream
284 /// [`StreamExt`]: futures::stream::StreamExt
285 /// [`TryStreamExt`]: futures::stream::TryStreamExt
286 /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
287 ///
288 /// # Error handling
289 ///
290 /// Any error that occurs during execution is sent as an `Err` in the output
291 /// stream.
292 ///
293 /// `ExecutionPlan` implementations in DataFusion cancel additional work
294 /// immediately once an error occurs. The rationale is that if the overall
295 /// query will return an error, any additional work such as continued
296 /// polling of inputs will be wasted as it will be thrown away.
297 ///
298 /// # Cancellation / Aborting Execution
299 ///
300 /// The [`Stream`] that is returned must ensure that any allocated resources
301 /// are freed when the stream itself is dropped. This is particularly
302 /// important for [`spawn`]ed tasks or threads. Unless care is taken to
303 /// "abort" such tasks, they may continue to consume resources even after
304 /// the plan is dropped, generating intermediate results that are never
305 /// used.
306 /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
307 ///
308 /// To enable timely cancellation, the [`Stream`] that is returned must not
309 /// block the CPU indefinitely and must yield back to the tokio runtime regularly.
310 /// In a typical [`ExecutionPlan`], this automatically happens unless there are
311 /// special circumstances; e.g. when the computational complexity of processing a
312 /// batch is superlinear. See this [general guideline][async-guideline] for more context
313 /// on this point, which explains why one should avoid spending a long time without
314 /// reaching an `await`/yield point in asynchronous runtimes.
315 /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
316 /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
317 /// [`tokio::task::yield_now()`] when appropriate.
318 /// In special cases that warrant manual yielding, determination for "regularly" may be
319 /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
320 /// a timer (being careful with the overhead-heavy system call needed to take the time), or by
321 /// counting rows or batches.
322 ///
323 /// The [cancellation benchmark] tracks some cases of how quickly queries can
324 /// be cancelled.
325 ///
326 /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
327 /// for structures to help ensure all background tasks are cancelled.
328 ///
329 /// [`spawn`]: tokio::task::spawn
330 /// [cancellation benchmark]: https://github.com/apache/datafusion/blob/main/benchmarks/README.md#cancellation
331 /// [`JoinSet`]: datafusion_common_runtime::JoinSet
332 /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
333 /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
334 /// [`Poll::Pending`]: std::task::Poll::Pending
335 /// [async-guideline]: https://ryhl.io/blog/async-what-is-blocking/
336 ///
337 /// # Implementation Examples
338 ///
339 /// While `async` `Stream`s have a non trivial learning curve, the
340 /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
341 /// which help simplify many common operations.
342 ///
343 /// Here are some common patterns:
344 ///
345 /// ## Return Precomputed `RecordBatch`
346 ///
347 /// We can return a precomputed `RecordBatch` as a `Stream`:
348 ///
349 /// ```
350 /// # use std::sync::Arc;
351 /// # use arrow::array::RecordBatch;
352 /// # use arrow::datatypes::SchemaRef;
353 /// # use datafusion_common::Result;
354 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
355 /// # use datafusion_physical_plan::memory::MemoryStream;
356 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
357 /// struct MyPlan {
358 /// batch: RecordBatch,
359 /// }
360 ///
361 /// impl MyPlan {
362 /// fn execute(
363 /// &self,
364 /// partition: usize,
365 /// context: Arc<TaskContext>,
366 /// ) -> Result<SendableRecordBatchStream> {
367 /// // use functions from futures crate convert the batch into a stream
368 /// let fut = futures::future::ready(Ok(self.batch.clone()));
369 /// let stream = futures::stream::once(fut);
370 /// Ok(Box::pin(RecordBatchStreamAdapter::new(
371 /// self.batch.schema(),
372 /// stream,
373 /// )))
374 /// }
375 /// }
376 /// ```
377 ///
378 /// ## Lazily (async) Compute `RecordBatch`
379 ///
380 /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
381 ///
382 /// ```
383 /// # use std::sync::Arc;
384 /// # use arrow::array::RecordBatch;
385 /// # use arrow::datatypes::SchemaRef;
386 /// # use datafusion_common::Result;
387 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
388 /// # use datafusion_physical_plan::memory::MemoryStream;
389 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
390 /// struct MyPlan {
391 /// schema: SchemaRef,
392 /// }
393 ///
394 /// /// Returns a single batch when the returned stream is polled
395 /// async fn get_batch() -> Result<RecordBatch> {
396 /// todo!()
397 /// }
398 ///
399 /// impl MyPlan {
400 /// fn execute(
401 /// &self,
402 /// partition: usize,
403 /// context: Arc<TaskContext>,
404 /// ) -> Result<SendableRecordBatchStream> {
405 /// let fut = get_batch();
406 /// let stream = futures::stream::once(fut);
407 /// Ok(Box::pin(RecordBatchStreamAdapter::new(
408 /// self.schema.clone(),
409 /// stream,
410 /// )))
411 /// }
412 /// }
413 /// ```
414 ///
415 /// ## Lazily (async) create a Stream
416 ///
417 /// If you need to create the return `Stream` using an `async` function,
418 /// you can do so by flattening the result:
419 ///
420 /// ```
421 /// # use std::sync::Arc;
422 /// # use arrow::array::RecordBatch;
423 /// # use arrow::datatypes::SchemaRef;
424 /// # use futures::TryStreamExt;
425 /// # use datafusion_common::Result;
426 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
427 /// # use datafusion_physical_plan::memory::MemoryStream;
428 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
429 /// struct MyPlan {
430 /// schema: SchemaRef,
431 /// }
432 ///
433 /// /// async function that returns a stream
434 /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
435 /// todo!()
436 /// }
437 ///
438 /// impl MyPlan {
439 /// fn execute(
440 /// &self,
441 /// partition: usize,
442 /// context: Arc<TaskContext>,
443 /// ) -> Result<SendableRecordBatchStream> {
444 /// // A future that yields a stream
445 /// let fut = get_batch_stream();
446 /// // Use TryStreamExt::try_flatten to flatten the stream of streams
447 /// let stream = futures::stream::once(fut).try_flatten();
448 /// Ok(Box::pin(RecordBatchStreamAdapter::new(
449 /// self.schema.clone(),
450 /// stream,
451 /// )))
452 /// }
453 /// }
454 /// ```
455 fn execute(
456 &self,
457 partition: usize,
458 context: Arc<TaskContext>,
459 ) -> Result<SendableRecordBatchStream>;
460
461 /// Return a snapshot of the set of [`Metric`]s for this
462 /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
463 ///
464 /// While the values of the metrics in the returned
465 /// [`MetricsSet`]s may change as execution progresses, the
466 /// specific metrics will not.
467 ///
468 /// Once `self.execute()` has returned (technically the future is
469 /// resolved) for all available partitions, the set of metrics
470 /// should be complete. If this function is called prior to
471 /// `execute()` new metrics may appear in subsequent calls.
472 fn metrics(&self) -> Option<MetricsSet> {
473 None
474 }
475
476 /// Returns statistics for a specific partition of this `ExecutionPlan` node.
477 /// If statistics are not available, should return [`Statistics::new_unknown`]
478 /// (the default), not an error.
479 /// If `partition` is `None`, it returns statistics for the entire plan.
480 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
481 if let Some(idx) = partition {
482 // Validate partition index
483 let partition_count = self.properties().partitioning.partition_count();
484 assert_or_internal_err!(
485 idx < partition_count,
486 "Invalid partition index: {}, the partition count is {}",
487 idx,
488 partition_count
489 );
490 }
491 Ok(Statistics::new_unknown(&self.schema()))
492 }
493
494 /// Returns `true` if a limit can be safely pushed down through this
495 /// `ExecutionPlan` node.
496 ///
497 /// If this method returns `true`, and the query plan contains a limit at
498 /// the output of this node, DataFusion will push the limit to the input
499 /// of this node.
500 fn supports_limit_pushdown(&self) -> bool {
501 false
502 }
503
504 /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
505 /// fetch limits. Returns `None` otherwise.
506 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
507 None
508 }
509
510 /// Gets the fetch count for the operator, `None` means there is no fetch.
511 fn fetch(&self) -> Option<usize> {
512 None
513 }
514
515 /// Gets the effect on cardinality, if known
516 fn cardinality_effect(&self) -> CardinalityEffect {
517 CardinalityEffect::Unknown
518 }
519
520 /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
521 ///
522 /// If the operator supports this optimization, the resulting plan will be:
523 /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
524 /// Otherwise, it returns the current `ExecutionPlan` as-is.
525 ///
526 /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
527 /// or not possible, or `Err` on failure.
528 fn try_swapping_with_projection(
529 &self,
530 _projection: &ProjectionExec,
531 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
532 Ok(None)
533 }
534
535 /// Collect filters that this node can push down to its children.
536 /// Filters that are being pushed down from parents are passed in,
537 /// and the node may generate additional filters to push down.
538 /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
539 /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`:
540 /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent
541 /// filters so it only returns that `FilterExec` wants to push down its own predicate.
542 /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from
543 /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key)
544 /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
545 /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec`
546 /// and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything
547 /// since it has no children and no additional filters to push down.
548 /// It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse
549 /// up the plan that `DataSourceExec` can actually bind the filters.
550 ///
551 /// The default implementation bars all parent filters from being pushed down and adds no new filters.
552 /// This is the safest option, making filter pushdown opt-in on a per-node pasis.
553 ///
554 /// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
555 /// Depending on the phase the operator may or may not be allowed to modify the plan.
556 /// See [`FilterPushdownPhase`] for more details.
557 fn gather_filters_for_pushdown(
558 &self,
559 _phase: FilterPushdownPhase,
560 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
561 _config: &ConfigOptions,
562 ) -> Result<FilterDescription> {
563 Ok(FilterDescription::all_unsupported(
564 &parent_filters,
565 &self.children(),
566 ))
567 }
568
569 /// Handle the result of a child pushdown.
570 ///
571 /// This method is called as we recurse back up the plan tree after pushing
572 /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
573 /// It allows the current node to process the results of filter pushdown from
574 /// its children, deciding whether to absorb filters, modify the plan, or pass
575 /// filters back up to its parent.
576 ///
577 /// **Purpose and Context:**
578 /// Filter pushdown is a critical optimization in DataFusion that aims to
579 /// reduce the amount of data processed by applying filters as early as
580 /// possible in the query plan. This method is part of the second phase of
581 /// filter pushdown, where results are propagated back up the tree after
582 /// being pushed down. Each node can inspect the pushdown results from its
583 /// children and decide how to handle any unapplied filters, potentially
584 /// optimizing the plan structure or filter application.
585 ///
586 /// **Behavior in Different Nodes:**
587 /// - For a `DataSourceExec`, this often means absorbing the filters to apply
588 /// them during the scan phase (late materialization), reducing the data
589 /// read from the source.
590 /// - A `FilterExec` may absorb any filters its children could not handle,
591 /// combining them with its own predicate. If no filters remain (i.e., the
592 /// predicate becomes trivially true), it may remove itself from the plan
593 /// altogether. It typically marks parent filters as supported, indicating
594 /// they have been handled.
595 /// - A `HashJoinExec` might ignore the pushdown result if filters need to
596 /// be applied during the join operation. It passes the parent filters back
597 /// up wrapped in [`FilterPushdownPropagation::if_any`], discarding
598 /// any self-filters from children.
599 ///
600 /// **Example Walkthrough:**
601 /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
602 /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
603 /// `FilterExec`, the filter `f1` is gathered and pushed down to
604 /// `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
605 /// the join or add its own filters (e.g., a min-max filter from the build side),
606 /// then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
607 /// has no children to push to, so it prepares to handle filters in the
608 /// upward phase.
609 /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
610 /// `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
611 /// for late materialization during scanning, marking them as supported.
612 /// `HashJoinExec` receives the result, decides whether to apply any
613 /// remaining filters during the join, and passes unhandled filters back
614 /// up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
615 /// updates its predicate if necessary, or removes itself if the predicate
616 /// becomes trivial (e.g., `lit(true)`), and marks filters as supported
617 /// for its parent.
618 ///
619 /// The default implementation is a no-op that passes the result of pushdown
620 /// from the children to its parent transparently, ensuring no filters are
621 /// lost if a node does not override this behavior.
622 ///
623 /// **Notes for Implementation:**
624 /// When returning filters via [`FilterPushdownPropagation`], the order of
625 /// filters need not match the order they were passed in via
626 /// `child_pushdown_result`. However, preserving the order is recommended for
627 /// debugging and ease of reasoning about the resulting plans.
628 ///
629 /// **Helper Methods for Customization:**
630 /// There are various helper methods to simplify implementing this method:
631 /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as
632 /// supported as long as at least one child supports them.
633 /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as
634 /// supported as long as all children support them.
635 /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters
636 /// to the propagation result, indicating which filters are supported by
637 /// the current node.
638 /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
639 /// current node in the propagation result, used if the node
640 /// has modified its plan based on the pushdown results.
641 ///
642 /// **Filter Pushdown Phases:**
643 /// There are two different phases in filter pushdown (`Pre` and others),
644 /// which some operators may handle differently. Depending on the phase, the
645 /// operator may or may not be allowed to modify the plan. See
646 /// [`FilterPushdownPhase`] for more details on phase-specific behavior.
647 ///
648 /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported
649 fn handle_child_pushdown_result(
650 &self,
651 _phase: FilterPushdownPhase,
652 child_pushdown_result: ChildPushdownResult,
653 _config: &ConfigOptions,
654 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
655 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
656 }
657
658 /// Injects arbitrary run-time state into this execution plan, returning a new plan
659 /// instance that incorporates that state *if* it is relevant to the concrete
660 /// node implementation.
661 ///
662 /// This is a generic entry point: the `state` can be any type wrapped in
663 /// `Arc<dyn Any + Send + Sync>`. A node that cares about the state should
664 /// down-cast it to the concrete type it expects and, if successful, return a
665 /// modified copy of itself that captures the provided value. If the state is
666 /// not applicable, the default behaviour is to return `None` so that parent
667 /// nodes can continue propagating the attempt further down the plan tree.
668 ///
669 /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
670 /// down-casts the supplied state to an `Arc<WorkTable>`
671 /// in order to wire up the working table used during recursive-CTE execution.
672 /// Similar patterns can be followed by custom nodes that need late-bound
673 /// dependencies or shared state.
674 fn with_new_state(
675 &self,
676 _state: Arc<dyn Any + Send + Sync>,
677 ) -> Option<Arc<dyn ExecutionPlan>> {
678 None
679 }
680
681 /// Try to push down sort ordering requirements to this node.
682 ///
683 /// This method is called during sort pushdown optimization to determine if this
684 /// node can optimize for a requested sort ordering. Implementations should:
685 ///
686 /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact
687 /// ordering (allowing the Sort operator to be removed)
688 /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the
689 /// ordering but cannot guarantee perfect sorting (Sort operator is kept)
690 /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize
691 /// for the ordering
692 ///
693 /// For transparent nodes (that preserve ordering), implement this to delegate to
694 /// children and wrap the result with a new instance of this node.
695 ///
696 /// Default implementation returns `Unsupported`.
697 fn try_pushdown_sort(
698 &self,
699 _order: &[PhysicalSortExpr],
700 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
701 Ok(SortOrderPushdownResult::Unsupported)
702 }
703
704 /// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity.
705 ///
706 /// This is used to signal to data sources that the output ordering must be
707 /// preserved, even if it might be more efficient to ignore it (e.g. by
708 /// skipping some row groups in Parquet).
709 ///
710 fn with_preserve_order(
711 &self,
712 _preserve_order: bool,
713 ) -> Option<Arc<dyn ExecutionPlan>> {
714 None
715 }
716}
717
718/// [`ExecutionPlan`] Invariant Level
719///
720/// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan`
721///
722/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
723#[derive(Clone, Copy)]
724pub enum InvariantLevel {
725 /// Invariants that are always true for the [`ExecutionPlan`] node
726 /// such as the number of expected children.
727 Always,
728 /// Invariants that must hold true for the [`ExecutionPlan`] node
729 /// to be "executable", such as ordering and/or distribution requirements
730 /// being fulfilled.
731 Executable,
732}
733
734/// Extension trait provides an easy API to fetch various properties of
735/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
736pub trait ExecutionPlanProperties {
737 /// Specifies how the output of this `ExecutionPlan` is split into
738 /// partitions.
739 fn output_partitioning(&self) -> &Partitioning;
740
741 /// If the output of this `ExecutionPlan` within each partition is sorted,
742 /// returns `Some(keys)` describing the ordering. A `None` return value
743 /// indicates no assumptions should be made on the output ordering.
744 ///
745 /// For example, `SortExec` (obviously) produces sorted output as does
746 /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
747 /// output if its input is sorted as it does not reorder the input rows.
748 fn output_ordering(&self) -> Option<&LexOrdering>;
749
750 /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
751 /// For more details, see [`Boundedness`].
752 fn boundedness(&self) -> Boundedness;
753
754 /// Indicates how the stream of this `ExecutionPlan` emits its results.
755 /// For more details, see [`EmissionType`].
756 fn pipeline_behavior(&self) -> EmissionType;
757
758 /// Get the [`EquivalenceProperties`] within the plan.
759 ///
760 /// Equivalence properties tell DataFusion what columns are known to be
761 /// equal, during various optimization passes. By default, this returns "no
762 /// known equivalences" which is always correct, but may cause DataFusion to
763 /// unnecessarily resort data.
764 ///
765 /// If this ExecutionPlan makes no changes to the schema of the rows flowing
766 /// through it or how columns within each row relate to each other, it
767 /// should return the equivalence properties of its input. For
768 /// example, since [`FilterExec`] may remove rows from its input, but does not
769 /// otherwise modify them, it preserves its input equivalence properties.
770 /// However, since `ProjectionExec` may calculate derived expressions, it
771 /// needs special handling.
772 ///
773 /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
774 /// for related concepts.
775 ///
776 /// [`FilterExec`]: crate::filter::FilterExec
777 fn equivalence_properties(&self) -> &EquivalenceProperties;
778}
779
780impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
781 fn output_partitioning(&self) -> &Partitioning {
782 self.properties().output_partitioning()
783 }
784
785 fn output_ordering(&self) -> Option<&LexOrdering> {
786 self.properties().output_ordering()
787 }
788
789 fn boundedness(&self) -> Boundedness {
790 self.properties().boundedness
791 }
792
793 fn pipeline_behavior(&self) -> EmissionType {
794 self.properties().emission_type
795 }
796
797 fn equivalence_properties(&self) -> &EquivalenceProperties {
798 self.properties().equivalence_properties()
799 }
800}
801
802impl ExecutionPlanProperties for &dyn ExecutionPlan {
803 fn output_partitioning(&self) -> &Partitioning {
804 self.properties().output_partitioning()
805 }
806
807 fn output_ordering(&self) -> Option<&LexOrdering> {
808 self.properties().output_ordering()
809 }
810
811 fn boundedness(&self) -> Boundedness {
812 self.properties().boundedness
813 }
814
815 fn pipeline_behavior(&self) -> EmissionType {
816 self.properties().emission_type
817 }
818
819 fn equivalence_properties(&self) -> &EquivalenceProperties {
820 self.properties().equivalence_properties()
821 }
822}
823
824/// Represents whether a stream of data **generated** by an operator is bounded (finite)
825/// or unbounded (infinite).
826///
827/// This is used to determine whether an execution plan will eventually complete
828/// processing all its data (bounded) or could potentially run forever (unbounded).
829///
830/// For unbounded streams, it also tracks whether the operator requires finite memory
831/// to process the stream or if memory usage could grow unbounded.
832///
833/// Boundedness of the output stream is based on the boundedness of the input stream and the nature of
834/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
835#[derive(Debug, Clone, Copy, PartialEq, Eq)]
836pub enum Boundedness {
837 /// The data stream is bounded (finite) and will eventually complete
838 Bounded,
839 /// The data stream is unbounded (infinite) and could run forever
840 Unbounded {
841 /// Whether this operator requires infinite memory to process the unbounded stream.
842 /// If false, the operator can process an infinite stream with bounded memory.
843 /// If true, memory usage may grow unbounded while processing the stream.
844 ///
845 /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
846 /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
847 requires_infinite_memory: bool,
848 },
849}
850
851impl Boundedness {
852 pub fn is_unbounded(&self) -> bool {
853 matches!(self, Boundedness::Unbounded { .. })
854 }
855}
856
857/// Represents how an operator emits its output records.
858///
859/// This is used to determine whether an operator emits records incrementally as they arrive,
860/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
861/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
862///
863/// For example, in the following plan:
864/// ```text
865/// SortExec [EmissionType::Final]
866/// |_ on: [col1 ASC]
867/// FilterExec [EmissionType::Incremental]
868/// |_ pred: col2 > 100
869/// DataSourceExec [EmissionType::Incremental]
870/// |_ file: "data.csv"
871/// ```
872/// - DataSourceExec emits records incrementally as it reads from the file
873/// - FilterExec processes and emits filtered records incrementally as they arrive
874/// - SortExec must wait for all input records before it can emit the sorted result,
875/// since it needs to see all values to determine their final order
876///
877/// Left joins can emit both incrementally and finally:
878/// - Incrementally emit matches as they are found
879/// - Finally emit non-matches after all input is processed
880#[derive(Debug, Clone, Copy, PartialEq, Eq)]
881pub enum EmissionType {
882 /// Records are emitted incrementally as they arrive and are processed
883 Incremental,
884 /// Records are only emitted once all input has been processed
885 Final,
886 /// Records can be emitted both incrementally and as a final result
887 Both,
888}
889
890/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the
891/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details.
892#[derive(Debug, Clone, Copy, PartialEq, Eq)]
893pub enum SchedulingType {
894 /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in
895 /// cooperative scheduling. This means the implementation of the `Stream` returned by
896 /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as
897 /// [`tokio::task::coop::consume_budget`].
898 ///
899 /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to
900 /// the [`coop`](crate::coop) module for details on when it may be useful to use
901 /// `Cooperative` instead.
902 NonCooperative,
903 /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in
904 /// cooperative scheduling by consuming task budget when it was able to produce a
905 /// [`RecordBatch`].
906 Cooperative,
907}
908
909/// Represents how an operator's `Stream` implementation generates `RecordBatch`es.
910///
911/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to
912/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
913///
914/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This
915/// is known as data-driven or eager evaluation.
916#[derive(Debug, Clone, Copy, PartialEq, Eq)]
917pub enum EvaluationType {
918 /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
919 /// instances when it is demanded by invoking `Stream::poll_next`.
920 /// Filter, projection, and join are examples of such lazy operators.
921 ///
922 /// Lazy operators are also known as demand-driven operators.
923 Lazy,
924 /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
925 /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
926 /// `Stream::poll_next` is called.
927 /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge.
928 ///
929 /// Eager operators are also known as a data-driven operators.
930 Eager,
931}
932
933/// Utility to determine an operator's boundedness based on its children's boundedness.
934///
935/// Assumes boundedness can be inferred from child operators:
936/// - Unbounded (requires_infinite_memory: true) takes precedence.
937/// - Unbounded (requires_infinite_memory: false) is considered next.
938/// - Otherwise, the operator is bounded.
939///
940/// **Note:** This is a general-purpose utility and may not apply to
941/// all multi-child operators. Ensure your operator's behavior aligns
942/// with these assumptions before using.
943pub(crate) fn boundedness_from_children<'a>(
944 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
945) -> Boundedness {
946 let mut unbounded_with_finite_mem = false;
947
948 for child in children {
949 match child.boundedness() {
950 Boundedness::Unbounded {
951 requires_infinite_memory: true,
952 } => {
953 return Boundedness::Unbounded {
954 requires_infinite_memory: true,
955 };
956 }
957 Boundedness::Unbounded {
958 requires_infinite_memory: false,
959 } => {
960 unbounded_with_finite_mem = true;
961 }
962 Boundedness::Bounded => {}
963 }
964 }
965
966 if unbounded_with_finite_mem {
967 Boundedness::Unbounded {
968 requires_infinite_memory: false,
969 }
970 } else {
971 Boundedness::Bounded
972 }
973}
974
975/// Determines the emission type of an operator based on its children's pipeline behavior.
976///
977/// The precedence of emission types is:
978/// - `Final` has the highest precedence.
979/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
980/// - `Incremental` is the default if all children emit incremental results.
981///
982/// **Note:** This is a general-purpose utility and may not apply to
983/// all multi-child operators. Verify your operator's behavior aligns
984/// with these assumptions.
985pub(crate) fn emission_type_from_children<'a>(
986 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
987) -> EmissionType {
988 let mut inc_and_final = false;
989
990 for child in children {
991 match child.pipeline_behavior() {
992 EmissionType::Final => return EmissionType::Final,
993 EmissionType::Both => inc_and_final = true,
994 EmissionType::Incremental => continue,
995 }
996 }
997
998 if inc_and_final {
999 EmissionType::Both
1000 } else {
1001 EmissionType::Incremental
1002 }
1003}
1004
1005/// Stores certain, often expensive to compute, plan properties used in query
1006/// optimization.
1007///
1008/// These properties are stored a single structure to permit this information to
1009/// be computed once and then those cached results used multiple times without
1010/// recomputation (aka a cache)
1011#[derive(Debug, Clone)]
1012pub struct PlanProperties {
1013 /// See [ExecutionPlanProperties::equivalence_properties]
1014 pub eq_properties: EquivalenceProperties,
1015 /// See [ExecutionPlanProperties::output_partitioning]
1016 pub partitioning: Partitioning,
1017 /// See [ExecutionPlanProperties::pipeline_behavior]
1018 pub emission_type: EmissionType,
1019 /// See [ExecutionPlanProperties::boundedness]
1020 pub boundedness: Boundedness,
1021 pub evaluation_type: EvaluationType,
1022 pub scheduling_type: SchedulingType,
1023 /// See [ExecutionPlanProperties::output_ordering]
1024 output_ordering: Option<LexOrdering>,
1025}
1026
1027impl PlanProperties {
1028 /// Construct a new `PlanPropertiesCache` from the
1029 pub fn new(
1030 eq_properties: EquivalenceProperties,
1031 partitioning: Partitioning,
1032 emission_type: EmissionType,
1033 boundedness: Boundedness,
1034 ) -> Self {
1035 // Output ordering can be derived from `eq_properties`.
1036 let output_ordering = eq_properties.output_ordering();
1037 Self {
1038 eq_properties,
1039 partitioning,
1040 emission_type,
1041 boundedness,
1042 evaluation_type: EvaluationType::Lazy,
1043 scheduling_type: SchedulingType::NonCooperative,
1044 output_ordering,
1045 }
1046 }
1047
1048 /// Overwrite output partitioning with its new value.
1049 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
1050 self.partitioning = partitioning;
1051 self
1052 }
1053
1054 /// Set equivalence properties having mut reference.
1055 pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) {
1056 // Changing equivalence properties also changes output ordering, so
1057 // make sure to overwrite it:
1058 self.output_ordering = eq_properties.output_ordering();
1059 self.eq_properties = eq_properties;
1060 }
1061
1062 /// Overwrite equivalence properties with its new value.
1063 pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
1064 self.set_eq_properties(eq_properties);
1065 self
1066 }
1067
1068 /// Overwrite boundedness with its new value.
1069 pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
1070 self.boundedness = boundedness;
1071 self
1072 }
1073
1074 /// Overwrite emission type with its new value.
1075 pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
1076 self.emission_type = emission_type;
1077 self
1078 }
1079
1080 /// Set the [`SchedulingType`].
1081 ///
1082 /// Defaults to [`SchedulingType::NonCooperative`]
1083 pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
1084 self.scheduling_type = scheduling_type;
1085 self
1086 }
1087
1088 /// Set the [`EvaluationType`].
1089 ///
1090 /// Defaults to [`EvaluationType::Lazy`]
1091 pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
1092 self.evaluation_type = drive_type;
1093 self
1094 }
1095
1096 /// Set constraints having mut reference.
1097 pub fn set_constraints(&mut self, constraints: Constraints) {
1098 self.eq_properties.set_constraints(constraints);
1099 }
1100
1101 /// Overwrite constraints with its new value.
1102 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1103 self.set_constraints(constraints);
1104 self
1105 }
1106
1107 pub fn equivalence_properties(&self) -> &EquivalenceProperties {
1108 &self.eq_properties
1109 }
1110
1111 pub fn output_partitioning(&self) -> &Partitioning {
1112 &self.partitioning
1113 }
1114
1115 pub fn output_ordering(&self) -> Option<&LexOrdering> {
1116 self.output_ordering.as_ref()
1117 }
1118
1119 /// Get schema of the node.
1120 pub(crate) fn schema(&self) -> &SchemaRef {
1121 self.eq_properties.schema()
1122 }
1123}
1124
1125macro_rules! check_len {
1126 ($target:expr, $func_name:ident, $expected_len:expr) => {
1127 let actual_len = $target.$func_name().len();
1128 assert_eq_or_internal_err!(
1129 actual_len,
1130 $expected_len,
1131 "{}::{} returned Vec with incorrect size: {} != {}",
1132 $target.name(),
1133 stringify!($func_name),
1134 actual_len,
1135 $expected_len
1136 );
1137 };
1138}
1139
1140/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1141/// Returns an error if the given node does not conform.
1142pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1143 plan: &P,
1144 _check: InvariantLevel,
1145) -> Result<(), DataFusionError> {
1146 let children_len = plan.children().len();
1147
1148 check_len!(plan, maintains_input_order, children_len);
1149 check_len!(plan, required_input_ordering, children_len);
1150 check_len!(plan, required_input_distribution, children_len);
1151 check_len!(plan, benefits_from_input_partitioning, children_len);
1152
1153 Ok(())
1154}
1155
1156/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
1157/// especially for the distributed engine to judge whether need to deal with shuffling.
1158/// Currently, there are 3 kinds of execution plan which needs data exchange
1159/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
1160/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
1161/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
1162#[expect(clippy::needless_pass_by_value)]
1163pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
1164 plan.properties().evaluation_type == EvaluationType::Eager
1165}
1166
1167/// Returns a copy of this plan if we change any child according to the pointer comparison.
1168/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1169pub fn with_new_children_if_necessary(
1170 plan: Arc<dyn ExecutionPlan>,
1171 children: Vec<Arc<dyn ExecutionPlan>>,
1172) -> Result<Arc<dyn ExecutionPlan>> {
1173 let old_children = plan.children();
1174 assert_eq_or_internal_err!(
1175 children.len(),
1176 old_children.len(),
1177 "Wrong number of children"
1178 );
1179 if children.is_empty()
1180 || children
1181 .iter()
1182 .zip(old_children.iter())
1183 .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
1184 {
1185 plan.with_new_children(children)
1186 } else {
1187 Ok(plan)
1188 }
1189}
1190
1191/// Return a [`DisplayableExecutionPlan`] wrapper around an
1192/// [`ExecutionPlan`] which can be displayed in various easier to
1193/// understand ways.
1194///
1195/// See examples on [`DisplayableExecutionPlan`]
1196pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
1197 DisplayableExecutionPlan::new(plan)
1198}
1199
1200/// Execute the [ExecutionPlan] and collect the results in memory
1201pub async fn collect(
1202 plan: Arc<dyn ExecutionPlan>,
1203 context: Arc<TaskContext>,
1204) -> Result<Vec<RecordBatch>> {
1205 let stream = execute_stream(plan, context)?;
1206 crate::common::collect(stream).await
1207}
1208
1209/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
1210///
1211/// See [collect] to buffer the `RecordBatch`es in memory.
1212///
1213/// # Aborting Execution
1214///
1215/// Dropping the stream will abort the execution of the query, and free up
1216/// any allocated resources
1217#[expect(
1218 clippy::needless_pass_by_value,
1219 reason = "Public API that historically takes owned Arcs"
1220)]
1221pub fn execute_stream(
1222 plan: Arc<dyn ExecutionPlan>,
1223 context: Arc<TaskContext>,
1224) -> Result<SendableRecordBatchStream> {
1225 match plan.output_partitioning().partition_count() {
1226 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1227 1 => plan.execute(0, context),
1228 2.. => {
1229 // merge into a single partition
1230 let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
1231 // CoalescePartitionsExec must produce a single partition
1232 assert_eq!(1, plan.properties().output_partitioning().partition_count());
1233 plan.execute(0, context)
1234 }
1235 }
1236}
1237
1238/// Execute the [ExecutionPlan] and collect the results in memory
1239pub async fn collect_partitioned(
1240 plan: Arc<dyn ExecutionPlan>,
1241 context: Arc<TaskContext>,
1242) -> Result<Vec<Vec<RecordBatch>>> {
1243 let streams = execute_stream_partitioned(plan, context)?;
1244
1245 let mut join_set = JoinSet::new();
1246 // Execute the plan and collect the results into batches.
1247 streams.into_iter().enumerate().for_each(|(idx, stream)| {
1248 join_set.spawn(async move {
1249 let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
1250 (idx, result)
1251 });
1252 });
1253
1254 let mut batches = vec![];
1255 // Note that currently this doesn't identify the thread that panicked
1256 //
1257 // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
1258 // once it is stable
1259 while let Some(result) = join_set.join_next().await {
1260 match result {
1261 Ok((idx, res)) => batches.push((idx, res?)),
1262 Err(e) => {
1263 if e.is_panic() {
1264 std::panic::resume_unwind(e.into_panic());
1265 } else {
1266 unreachable!();
1267 }
1268 }
1269 }
1270 }
1271
1272 batches.sort_by_key(|(idx, _)| *idx);
1273 let batches = batches.into_iter().map(|(_, batch)| batch).collect();
1274
1275 Ok(batches)
1276}
1277
1278/// Execute the [ExecutionPlan] and return a vec with one stream per output
1279/// partition
1280///
1281/// # Aborting Execution
1282///
1283/// Dropping the stream will abort the execution of the query, and free up
1284/// any allocated resources
1285#[expect(
1286 clippy::needless_pass_by_value,
1287 reason = "Public API that historically takes owned Arcs"
1288)]
1289pub fn execute_stream_partitioned(
1290 plan: Arc<dyn ExecutionPlan>,
1291 context: Arc<TaskContext>,
1292) -> Result<Vec<SendableRecordBatchStream>> {
1293 let num_partitions = plan.output_partitioning().partition_count();
1294 let mut streams = Vec::with_capacity(num_partitions);
1295 for i in 0..num_partitions {
1296 streams.push(plan.execute(i, Arc::clone(&context))?);
1297 }
1298 Ok(streams)
1299}
1300
1301/// Executes an input stream and ensures that the resulting stream adheres to
1302/// the `not null` constraints specified in the `sink_schema`.
1303///
1304/// # Arguments
1305///
1306/// * `input` - An execution plan
1307/// * `sink_schema` - The schema to be applied to the output stream
1308/// * `partition` - The partition index to be executed
1309/// * `context` - The task context
1310///
1311/// # Returns
1312///
1313/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
1314///
1315/// This function first executes the given input plan for the specified partition
1316/// and context. It then checks if there are any columns in the input that might
1317/// violate the `not null` constraints specified in the `sink_schema`. If there are
1318/// such columns, it wraps the resulting stream to enforce the `not null` constraints
1319/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
1320#[expect(
1321 clippy::needless_pass_by_value,
1322 reason = "Public API that historically takes owned Arcs"
1323)]
1324pub fn execute_input_stream(
1325 input: Arc<dyn ExecutionPlan>,
1326 sink_schema: SchemaRef,
1327 partition: usize,
1328 context: Arc<TaskContext>,
1329) -> Result<SendableRecordBatchStream> {
1330 let input_stream = input.execute(partition, context)?;
1331
1332 debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
1333
1334 // Find input columns that may violate the not null constraint.
1335 let risky_columns: Vec<_> = sink_schema
1336 .fields()
1337 .iter()
1338 .zip(input.schema().fields().iter())
1339 .enumerate()
1340 .filter_map(|(idx, (sink_field, input_field))| {
1341 (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
1342 })
1343 .collect();
1344
1345 if risky_columns.is_empty() {
1346 Ok(input_stream)
1347 } else {
1348 // Check not null constraint on the input stream
1349 Ok(Box::pin(RecordBatchStreamAdapter::new(
1350 sink_schema,
1351 input_stream
1352 .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
1353 )))
1354 }
1355}
1356
1357/// Checks a `RecordBatch` for `not null` constraints on specified columns.
1358///
1359/// # Arguments
1360///
1361/// * `batch` - The `RecordBatch` to be checked
1362/// * `column_indices` - A vector of column indices that should be checked for
1363/// `not null` constraints.
1364///
1365/// # Returns
1366///
1367/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
1368///
1369/// This function iterates over the specified column indices and ensures that none
1370/// of the columns contain null values. If any column contains null values, an error
1371/// is returned.
1372pub fn check_not_null_constraints(
1373 batch: RecordBatch,
1374 column_indices: &Vec<usize>,
1375) -> Result<RecordBatch> {
1376 for &index in column_indices {
1377 if batch.num_columns() <= index {
1378 return exec_err!(
1379 "Invalid batch column count {} expected > {}",
1380 batch.num_columns(),
1381 index
1382 );
1383 }
1384
1385 if batch
1386 .column(index)
1387 .logical_nulls()
1388 .map(|nulls| nulls.null_count())
1389 .unwrap_or_default()
1390 > 0
1391 {
1392 return exec_err!(
1393 "Invalid batch column at '{}' has null but schema specifies non-nullable",
1394 index
1395 );
1396 }
1397 }
1398
1399 Ok(batch)
1400}
1401
1402/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1403///
1404/// Some plans will change their internal states after execution, making them unable to be executed again.
1405/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1406///
1407/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1408/// However, if the data of the left table is derived from the work table, it will become outdated
1409/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1410///
1411/// # Limitations
1412///
1413/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1414///
1415/// * uses dynamic filters,
1416/// * represents a recursive query.
1417///
1418pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1419 plan.transform_up(|plan| {
1420 let new_plan = Arc::clone(&plan).reset_state()?;
1421 Ok(Transformed::yes(new_plan))
1422 })
1423 .data()
1424}
1425
1426/// Check if the `plan` children has the same properties as passed `children`.
1427/// In this case plan can avoid self properties re-computation when its children
1428/// replace is requested.
1429/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
1430pub fn has_same_children_properties(
1431 plan: &impl ExecutionPlan,
1432 children: &[Arc<dyn ExecutionPlan>],
1433) -> Result<bool> {
1434 let old_children = plan.children();
1435 assert_eq_or_internal_err!(
1436 children.len(),
1437 old_children.len(),
1438 "Wrong number of children"
1439 );
1440 for (lhs, rhs) in old_children.iter().zip(children.iter()) {
1441 if !Arc::ptr_eq(lhs.properties(), rhs.properties()) {
1442 return Ok(false);
1443 }
1444 }
1445 Ok(true)
1446}
1447
1448/// Helper macro to avoid properties re-computation if passed children properties
1449/// the same as plan already has. Could be used to implement fast-path for method
1450/// [`ExecutionPlan::with_new_children`].
1451#[macro_export]
1452macro_rules! check_if_same_properties {
1453 ($plan: expr, $children: expr) => {
1454 if $crate::execution_plan::has_same_children_properties(
1455 $plan.as_ref(),
1456 &$children,
1457 )? {
1458 let plan = $plan.with_new_children_and_same_properties($children);
1459 return Ok(::std::sync::Arc::new(plan));
1460 }
1461 };
1462}
1463
1464/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1465pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1466 let formatted = displayable(plan.as_ref()).indent(true).to_string();
1467 let actual: Vec<&str> = formatted.trim().lines().collect();
1468 actual.iter().map(|elem| (*elem).to_string()).collect()
1469}
1470
1471/// Indicates the effect an execution plan operator will have on the cardinality
1472/// of its input stream
1473pub enum CardinalityEffect {
1474 /// Unknown effect. This is the default
1475 Unknown,
1476 /// The operator is guaranteed to produce exactly one row for
1477 /// each input row
1478 Equal,
1479 /// The operator may produce fewer output rows than it receives input rows
1480 LowerEqual,
1481 /// The operator may produce more output rows than it receives input rows
1482 GreaterEqual,
1483}
1484
1485/// Can be used in contexts where properties have not yet been initialized properly.
1486pub(crate) fn stub_properties() -> Arc<PlanProperties> {
1487 static STUB_PROPERTIES: LazyLock<Arc<PlanProperties>> = LazyLock::new(|| {
1488 Arc::new(PlanProperties::new(
1489 EquivalenceProperties::new(Arc::new(Schema::empty())),
1490 Partitioning::UnknownPartitioning(1),
1491 EmissionType::Final,
1492 Boundedness::Bounded,
1493 ))
1494 });
1495
1496 Arc::clone(&STUB_PROPERTIES)
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501 use std::any::Any;
1502 use std::sync::Arc;
1503
1504 use super::*;
1505 use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1506
1507 use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1508 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1509 use datafusion_common::{Result, Statistics};
1510 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1511
1512 #[derive(Debug)]
1513 pub struct EmptyExec;
1514
1515 impl EmptyExec {
1516 pub fn new(_schema: SchemaRef) -> Self {
1517 Self
1518 }
1519 }
1520
1521 impl DisplayAs for EmptyExec {
1522 fn fmt_as(
1523 &self,
1524 _t: DisplayFormatType,
1525 _f: &mut std::fmt::Formatter,
1526 ) -> std::fmt::Result {
1527 unimplemented!()
1528 }
1529 }
1530
1531 impl ExecutionPlan for EmptyExec {
1532 fn name(&self) -> &'static str {
1533 Self::static_name()
1534 }
1535
1536 fn as_any(&self) -> &dyn Any {
1537 self
1538 }
1539
1540 fn properties(&self) -> &Arc<PlanProperties> {
1541 unimplemented!()
1542 }
1543
1544 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1545 vec![]
1546 }
1547
1548 fn with_new_children(
1549 self: Arc<Self>,
1550 _: Vec<Arc<dyn ExecutionPlan>>,
1551 ) -> Result<Arc<dyn ExecutionPlan>> {
1552 unimplemented!()
1553 }
1554
1555 fn execute(
1556 &self,
1557 _partition: usize,
1558 _context: Arc<TaskContext>,
1559 ) -> Result<SendableRecordBatchStream> {
1560 unimplemented!()
1561 }
1562
1563 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1564 unimplemented!()
1565 }
1566 }
1567
1568 #[derive(Debug)]
1569 pub struct RenamedEmptyExec;
1570
1571 impl RenamedEmptyExec {
1572 pub fn new(_schema: SchemaRef) -> Self {
1573 Self
1574 }
1575 }
1576
1577 impl DisplayAs for RenamedEmptyExec {
1578 fn fmt_as(
1579 &self,
1580 _t: DisplayFormatType,
1581 _f: &mut std::fmt::Formatter,
1582 ) -> std::fmt::Result {
1583 unimplemented!()
1584 }
1585 }
1586
1587 impl ExecutionPlan for RenamedEmptyExec {
1588 fn name(&self) -> &'static str {
1589 Self::static_name()
1590 }
1591
1592 fn static_name() -> &'static str
1593 where
1594 Self: Sized,
1595 {
1596 "MyRenamedEmptyExec"
1597 }
1598
1599 fn as_any(&self) -> &dyn Any {
1600 self
1601 }
1602
1603 fn properties(&self) -> &Arc<PlanProperties> {
1604 unimplemented!()
1605 }
1606
1607 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1608 vec![]
1609 }
1610
1611 fn with_new_children(
1612 self: Arc<Self>,
1613 _: Vec<Arc<dyn ExecutionPlan>>,
1614 ) -> Result<Arc<dyn ExecutionPlan>> {
1615 unimplemented!()
1616 }
1617
1618 fn execute(
1619 &self,
1620 _partition: usize,
1621 _context: Arc<TaskContext>,
1622 ) -> Result<SendableRecordBatchStream> {
1623 unimplemented!()
1624 }
1625
1626 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
1627 unimplemented!()
1628 }
1629 }
1630
1631 #[test]
1632 fn test_execution_plan_name() {
1633 let schema1 = Arc::new(Schema::empty());
1634 let default_name_exec = EmptyExec::new(schema1);
1635 assert_eq!(default_name_exec.name(), "EmptyExec");
1636
1637 let schema2 = Arc::new(Schema::empty());
1638 let renamed_exec = RenamedEmptyExec::new(schema2);
1639 assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1640 assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1641 }
1642
1643 /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1644 /// be called from a trait object.
1645 /// Related ticket: https://github.com/apache/datafusion/pull/11047
1646 #[expect(unused)]
1647 fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1648 let _ = plan.name();
1649 }
1650
1651 #[test]
1652 fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1653 check_not_null_constraints(
1654 RecordBatch::try_new(
1655 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1656 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1657 )?,
1658 &vec![0],
1659 )?;
1660 Ok(())
1661 }
1662
1663 #[test]
1664 fn test_check_not_null_constraints_reject_null() -> Result<()> {
1665 let result = check_not_null_constraints(
1666 RecordBatch::try_new(
1667 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1668 vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1669 )?,
1670 &vec![0],
1671 );
1672 assert!(result.is_err());
1673 assert_eq!(
1674 result.err().unwrap().strip_backtrace(),
1675 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1676 );
1677 Ok(())
1678 }
1679
1680 #[test]
1681 fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1682 // some null value inside REE array
1683 let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1684 let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1685 let run_end_array = RunArray::try_new(&run_ends, &values)?;
1686 let result = check_not_null_constraints(
1687 RecordBatch::try_new(
1688 Arc::new(Schema::new(vec![Field::new(
1689 "a",
1690 run_end_array.data_type().to_owned(),
1691 true,
1692 )])),
1693 vec![Arc::new(run_end_array)],
1694 )?,
1695 &vec![0],
1696 );
1697 assert!(result.is_err());
1698 assert_eq!(
1699 result.err().unwrap().strip_backtrace(),
1700 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1701 );
1702 Ok(())
1703 }
1704
1705 #[test]
1706 fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1707 let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1708 let keys = Int32Array::from(vec![0, 1, 2, 3]);
1709 let dictionary = DictionaryArray::new(keys, values);
1710 let result = check_not_null_constraints(
1711 RecordBatch::try_new(
1712 Arc::new(Schema::new(vec![Field::new(
1713 "a",
1714 dictionary.data_type().to_owned(),
1715 true,
1716 )])),
1717 vec![Arc::new(dictionary)],
1718 )?,
1719 &vec![0],
1720 );
1721 assert!(result.is_err());
1722 assert_eq!(
1723 result.err().unwrap().strip_backtrace(),
1724 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1725 );
1726 Ok(())
1727 }
1728
1729 #[test]
1730 fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1731 // some null value marked out by dictionary array
1732 let values = Arc::new(Int32Array::from(vec![
1733 Some(1),
1734 None, // this null value is masked by dictionary keys
1735 Some(3),
1736 Some(4),
1737 ]));
1738 let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1739 let dictionary = DictionaryArray::new(keys, values);
1740 check_not_null_constraints(
1741 RecordBatch::try_new(
1742 Arc::new(Schema::new(vec![Field::new(
1743 "a",
1744 dictionary.data_type().to_owned(),
1745 true,
1746 )])),
1747 vec![Arc::new(dictionary)],
1748 )?,
1749 &vec![0],
1750 )?;
1751 Ok(())
1752 }
1753
1754 #[test]
1755 fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1756 // null value of Null type
1757 let result = check_not_null_constraints(
1758 RecordBatch::try_new(
1759 Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1760 vec![Arc::new(NullArray::new(3))],
1761 )?,
1762 &vec![0],
1763 );
1764 assert!(result.is_err());
1765 assert_eq!(
1766 result.err().unwrap().strip_backtrace(),
1767 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1768 );
1769 Ok(())
1770 }
1771}