datafusion_physical_expr_common/
physical_expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use crate::utils::scatter;
25
26use arrow::array::{ArrayRef, BooleanArray};
27use arrow::compute::filter_record_batch;
28use arrow::datatypes::{DataType, Field, FieldRef, Schema};
29use arrow::record_batch::RecordBatch;
30use datafusion_common::tree_node::{
31    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
32};
33use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
34use datafusion_expr_common::columnar_value::ColumnarValue;
35use datafusion_expr_common::interval_arithmetic::Interval;
36use datafusion_expr_common::sort_properties::ExprProperties;
37use datafusion_expr_common::statistics::Distribution;
38
39use itertools::izip;
40
41/// Shared [`PhysicalExpr`].
42pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
43
44/// [`PhysicalExpr`]s represent expressions such as `A + 1` or `CAST(c1 AS int)`.
45///
46/// `PhysicalExpr` knows its type, nullability and can be evaluated directly on
47/// a [`RecordBatch`] (see [`Self::evaluate`]).
48///
49/// `PhysicalExpr` are the physical counterpart to [`Expr`] used in logical
50/// planning. They are typically created from [`Expr`] by a [`PhysicalPlanner`]
51/// invoked from a higher level API
52///
53/// Some important examples of `PhysicalExpr` are:
54/// * [`Column`]: Represents a column at a given index in a RecordBatch
55///
56/// To create `PhysicalExpr` from  `Expr`, see
57/// * [`SessionContext::create_physical_expr`]: A high level API
58/// * [`create_physical_expr`]: A low level API
59///
60/// # Formatting `PhysicalExpr` as strings
61/// There are three ways to format `PhysicalExpr` as a string:
62/// * [`Debug`]: Standard Rust debugging format (e.g. `Constant { value: ... }`)
63/// * [`Display`]: Detailed SQL-like format that shows expression structure (e.g. (`Utf8 ("foobar")`). This is often used for debugging and tests
64/// * [`Self::fmt_sql`]: SQL-like human readable format (e.g. ('foobar')`), See also [`sql_fmt`]
65///
66/// [`SessionContext::create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.create_physical_expr
67/// [`PhysicalPlanner`]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
68/// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
69/// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
70/// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html
71pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
72    /// Returns the physical expression as [`Any`] so that it can be
73    /// downcast to a specific implementation.
74    fn as_any(&self) -> &dyn Any;
75    /// Get the data type of this expression, given the schema of the input
76    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
77        Ok(self.return_field(input_schema)?.data_type().to_owned())
78    }
79    /// Determine whether this expression is nullable, given the schema of the input
80    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
81        Ok(self.return_field(input_schema)?.is_nullable())
82    }
83    /// Evaluate an expression against a RecordBatch
84    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
85    /// The output field associated with this expression
86    fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
87        Ok(Arc::new(Field::new(
88            format!("{self}"),
89            self.data_type(input_schema)?,
90            self.nullable(input_schema)?,
91        )))
92    }
93    /// Evaluate an expression against a RecordBatch after first applying a
94    /// validity array
95    fn evaluate_selection(
96        &self,
97        batch: &RecordBatch,
98        selection: &BooleanArray,
99    ) -> Result<ColumnarValue> {
100        let tmp_batch = filter_record_batch(batch, selection)?;
101
102        let tmp_result = self.evaluate(&tmp_batch)?;
103
104        if batch.num_rows() == tmp_batch.num_rows() {
105            // All values from the `selection` filter are true.
106            Ok(tmp_result)
107        } else if let ColumnarValue::Array(a) = tmp_result {
108            scatter(selection, a.as_ref()).map(ColumnarValue::Array)
109        } else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result {
110            // When the scalar is true or false, skip the scatter process
111            if let Some(v) = value {
112                if *v {
113                    Ok(ColumnarValue::from(Arc::new(selection.clone()) as ArrayRef))
114                } else {
115                    Ok(tmp_result)
116                }
117            } else {
118                let array = BooleanArray::from(vec![None; batch.num_rows()]);
119                scatter(selection, &array).map(ColumnarValue::Array)
120            }
121        } else {
122            Ok(tmp_result)
123        }
124    }
125
126    /// Get a list of child PhysicalExpr that provide the input for this expr.
127    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
128
129    /// Returns a new PhysicalExpr where all children were replaced by new exprs.
130    fn with_new_children(
131        self: Arc<Self>,
132        children: Vec<Arc<dyn PhysicalExpr>>,
133    ) -> Result<Arc<dyn PhysicalExpr>>;
134
135    /// Computes the output interval for the expression, given the input
136    /// intervals.
137    ///
138    /// # Parameters
139    ///
140    /// * `children` are the intervals for the children (inputs) of this
141    ///   expression.
142    ///
143    /// # Returns
144    ///
145    /// A `Result` containing the output interval for the expression in
146    /// case of success, or an error object in case of failure.
147    ///
148    /// # Example
149    ///
150    /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
151    /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
152    fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
153        not_impl_err!("Not implemented for {self}")
154    }
155
156    /// Updates bounds for child expressions, given a known interval for this
157    /// expression.
158    ///
159    /// This is used to propagate constraints down through an expression tree.
160    ///
161    /// # Parameters
162    ///
163    /// * `interval` is the currently known interval for this expression.
164    /// * `children` are the current intervals for the children of this expression.
165    ///
166    /// # Returns
167    ///
168    /// A `Result` containing a `Vec` of new intervals for the children (in order)
169    /// in case of success, or an error object in case of failure.
170    ///
171    /// If constraint propagation reveals an infeasibility for any child, returns
172    /// [`None`]. If none of the children intervals change as a result of
173    /// propagation, may return an empty vector instead of cloning `children`.
174    /// This is the default (and conservative) return value.
175    ///
176    /// # Example
177    ///
178    /// If the expression is `a + b`, the current `interval` is `[4, 5]` and the
179    /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, then
180    /// propagation would return `[0, 2]` and `[2, 4]` as `b` must be at least
181    /// `2` to make the output at least `4`.
182    fn propagate_constraints(
183        &self,
184        _interval: &Interval,
185        _children: &[&Interval],
186    ) -> Result<Option<Vec<Interval>>> {
187        Ok(Some(vec![]))
188    }
189
190    /// Computes the output statistics for the expression, given the input
191    /// statistics.
192    ///
193    /// # Parameters
194    ///
195    /// * `children` are the statistics for the children (inputs) of this
196    ///   expression.
197    ///
198    /// # Returns
199    ///
200    /// A `Result` containing the output statistics for the expression in
201    /// case of success, or an error object in case of failure.
202    ///
203    /// Expressions (should) implement this function and utilize the independence
204    /// assumption, match on children distribution types and compute the output
205    /// statistics accordingly. The default implementation simply creates an
206    /// unknown output distribution by combining input ranges. This logic loses
207    /// distribution information, but is a safe default.
208    fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
209        let children_ranges = children
210            .iter()
211            .map(|c| c.range())
212            .collect::<Result<Vec<_>>>()?;
213        let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
214        let output_interval = self.evaluate_bounds(children_ranges_refs.as_slice())?;
215        let dt = output_interval.data_type();
216        if dt.eq(&DataType::Boolean) {
217            let p = if output_interval.eq(&Interval::CERTAINLY_TRUE) {
218                ScalarValue::new_one(&dt)
219            } else if output_interval.eq(&Interval::CERTAINLY_FALSE) {
220                ScalarValue::new_zero(&dt)
221            } else {
222                ScalarValue::try_from(&dt)
223            }?;
224            Distribution::new_bernoulli(p)
225        } else {
226            Distribution::new_from_interval(output_interval)
227        }
228    }
229
230    /// Updates children statistics using the given parent statistic for this
231    /// expression.
232    ///
233    /// This is used to propagate statistics down through an expression tree.
234    ///
235    /// # Parameters
236    ///
237    /// * `parent` is the currently known statistics for this expression.
238    /// * `children` are the current statistics for the children of this expression.
239    ///
240    /// # Returns
241    ///
242    /// A `Result` containing a `Vec` of new statistics for the children (in order)
243    /// in case of success, or an error object in case of failure.
244    ///
245    /// If statistics propagation reveals an infeasibility for any child, returns
246    /// [`None`]. If none of the children statistics change as a result of
247    /// propagation, may return an empty vector instead of cloning `children`.
248    /// This is the default (and conservative) return value.
249    ///
250    /// Expressions (should) implement this function and apply Bayes rule to
251    /// reconcile and update parent/children statistics. This involves utilizing
252    /// the independence assumption, and matching on distribution types. The
253    /// default implementation simply creates an unknown distribution if it can
254    /// narrow the range by propagating ranges. This logic loses distribution
255    /// information, but is a safe default.
256    fn propagate_statistics(
257        &self,
258        parent: &Distribution,
259        children: &[&Distribution],
260    ) -> Result<Option<Vec<Distribution>>> {
261        let children_ranges = children
262            .iter()
263            .map(|c| c.range())
264            .collect::<Result<Vec<_>>>()?;
265        let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
266        let parent_range = parent.range()?;
267        let Some(propagated_children) =
268            self.propagate_constraints(&parent_range, children_ranges_refs.as_slice())?
269        else {
270            return Ok(None);
271        };
272        izip!(propagated_children.into_iter(), children_ranges, children)
273            .map(|(new_interval, old_interval, child)| {
274                if new_interval == old_interval {
275                    // We weren't able to narrow the range, preserve the old statistics.
276                    Ok((*child).clone())
277                } else if new_interval.data_type().eq(&DataType::Boolean) {
278                    let dt = old_interval.data_type();
279                    let p = if new_interval.eq(&Interval::CERTAINLY_TRUE) {
280                        ScalarValue::new_one(&dt)
281                    } else if new_interval.eq(&Interval::CERTAINLY_FALSE) {
282                        ScalarValue::new_zero(&dt)
283                    } else {
284                        unreachable!("Given that we have a range reduction for a boolean interval, we should have certainty")
285                    }?;
286                    Distribution::new_bernoulli(p)
287                } else {
288                    Distribution::new_from_interval(new_interval)
289                }
290            })
291            .collect::<Result<_>>()
292            .map(Some)
293    }
294
295    /// Calculates the properties of this [`PhysicalExpr`] based on its
296    /// children's properties (i.e. order and range), recursively aggregating
297    /// the information from its children. In cases where the [`PhysicalExpr`]
298    /// has no children (e.g., `Literal` or `Column`), these properties should
299    /// be specified externally, as the function defaults to unknown properties.
300    fn get_properties(&self, _children: &[ExprProperties]) -> Result<ExprProperties> {
301        Ok(ExprProperties::new_unknown())
302    }
303
304    /// Format this `PhysicalExpr` in nice human readable "SQL" format
305    ///
306    /// Specifically, this format is designed to be readable by humans, at the
307    /// expense of details. Use `Display` or `Debug` for more detailed
308    /// representation.
309    ///
310    /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
311    ///
312    fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
313
314    /// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
315    ///
316    /// "Dynamic" in this case means containing references to structures that may change
317    /// during plan execution, such as hash tables.
318    ///
319    /// This method is used to capture the current state of `PhysicalExpr`s that may contain
320    /// dynamic references to other operators in order to serialize it over the wire
321    /// or treat it via downcast matching.
322    ///
323    /// You should not call this method directly as it does not handle recursion.
324    /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
325    /// full state of the `PhysicalExpr`.
326    ///
327    /// This is expected to return "simple" expressions that do not have mutable state
328    /// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
329    /// Callers however should *not* assume anything about the returned expressions
330    /// since callers and implementers may not agree on what "simple" or "built-in"
331    /// means.
332    /// In other words, if you need to serialize a `PhysicalExpr` across the wire
333    /// you should call this method and then try to serialize the result,
334    /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
335    /// just as if you had not called this method at all.
336    ///
337    /// In particular, consider:
338    /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
339    ///   that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
340    ///   This function may return something like `a >= 12`.
341    /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
342    ///   from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
343    ///   This function may return something like `t2.b IN (1, 5, 7)`.
344    ///
345    /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
346    /// or needs to serialize this state to bytes may not be able to handle these dynamic references.
347    /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
348    /// contain these dynamic references.
349    ///
350    /// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
351    /// and send it across the wire to a remote executor may want to call this method after
352    /// every batch on the source side and broadcast / update the current snapshot to the remote executor.
353    ///
354    /// Note for implementers: this method should *not* handle recursion.
355    /// Recursion is handled in [`snapshot_physical_expr`].
356    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
357        // By default, we return None to indicate that this PhysicalExpr does not
358        // have any dynamic references or state.
359        // This is a safe default behavior.
360        Ok(None)
361    }
362
363    /// Returns the generation of this `PhysicalExpr` for snapshotting purposes.
364    /// The generation is an arbitrary u64 that can be used to track changes
365    /// in the state of the `PhysicalExpr` over time without having to do an exhaustive comparison.
366    /// This is useful to avoid unnecessary computation or serialization if there are no changes to the expression.
367    /// In particular, dynamic expressions that may change over time; this allows cheap checks for changes.
368    /// Static expressions that do not change over time should return 0, as does the default implementation.
369    /// You should not call this method directly as it does not handle recursion.
370    /// Instead use [`snapshot_generation`] to handle recursion and capture the
371    /// full state of the `PhysicalExpr`.
372    fn snapshot_generation(&self) -> u64 {
373        // By default, we return 0 to indicate that this PhysicalExpr does not
374        // have any dynamic references or state.
375        // Since the recursive algorithm XORs the generations of all children the overall
376        // generation will be 0 if no children have a non-zero generation, meaning that
377        // static expressions will always return 0.
378        0
379    }
380
381    /// Returns true if the expression node is volatile, i.e. whether it can return
382    /// different results when evaluated multiple times with the same input.
383    ///
384    /// Note: unlike [`is_volatile`], this function does not consider inputs:
385    /// - `random()` returns `true`,
386    /// - `a + random()` returns `false` (because the operation `+` itself is not volatile.)
387    ///
388    /// The default to this function was set to `false` when it was created
389    /// to avoid imposing API churn on implementers, but this is not a safe default in general.
390    /// It is highly recommended that volatile expressions implement this method and return `true`.
391    /// This default may be removed in the future if it causes problems or we decide to
392    /// eat the cost of the breaking change and require all implementers to make a choice.
393    fn is_volatile_node(&self) -> bool {
394        false
395    }
396}
397
398#[deprecated(
399    since = "50.0.0",
400    note = "Use `datafusion_expr_common::dyn_eq` instead"
401)]
402pub use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
403
404impl PartialEq for dyn PhysicalExpr {
405    fn eq(&self, other: &Self) -> bool {
406        self.dyn_eq(other.as_any())
407    }
408}
409impl Eq for dyn PhysicalExpr {}
410
411impl Hash for dyn PhysicalExpr {
412    fn hash<H: Hasher>(&self, state: &mut H) {
413        self.dyn_hash(state);
414    }
415}
416
417/// Returns a copy of this expr if we change any child according to the pointer comparison.
418/// The size of `children` must be equal to the size of `PhysicalExpr::children()`.
419pub fn with_new_children_if_necessary(
420    expr: Arc<dyn PhysicalExpr>,
421    children: Vec<Arc<dyn PhysicalExpr>>,
422) -> Result<Arc<dyn PhysicalExpr>> {
423    let old_children = expr.children();
424    if children.len() != old_children.len() {
425        internal_err!("PhysicalExpr: Wrong number of children")
426    } else if children.is_empty()
427        || children
428            .iter()
429            .zip(old_children.iter())
430            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
431    {
432        Ok(expr.with_new_children(children)?)
433    } else {
434        Ok(expr)
435    }
436}
437
438/// Returns [`Display`] able a list of [`PhysicalExpr`]
439///
440/// Example output: `[a + 1, b]`
441pub fn format_physical_expr_list<T>(exprs: T) -> impl Display
442where
443    T: IntoIterator,
444    T::Item: Display,
445    T::IntoIter: Clone,
446{
447    struct DisplayWrapper<I>(I)
448    where
449        I: Iterator + Clone,
450        I::Item: Display;
451
452    impl<I> Display for DisplayWrapper<I>
453    where
454        I: Iterator + Clone,
455        I::Item: Display,
456    {
457        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
458            let mut iter = self.0.clone();
459            write!(f, "[")?;
460            if let Some(expr) = iter.next() {
461                write!(f, "{expr}")?;
462            }
463            for expr in iter {
464                write!(f, ", {expr}")?;
465            }
466            write!(f, "]")?;
467            Ok(())
468        }
469    }
470
471    DisplayWrapper(exprs.into_iter())
472}
473
474/// Prints a [`PhysicalExpr`] in a SQL-like format
475///
476/// # Example
477/// ```
478/// # // The boilerplate needed to create a `PhysicalExpr` for the example
479/// # use std::any::Any;
480/// use std::collections::HashMap;
481/// # use std::fmt::Formatter;
482/// # use std::sync::Arc;
483/// # use arrow::array::RecordBatch;
484/// # use arrow::datatypes::{DataType, Field, FieldRef, Schema};
485/// # use datafusion_common::Result;
486/// # use datafusion_expr_common::columnar_value::ColumnarValue;
487/// # use datafusion_physical_expr_common::physical_expr::{fmt_sql, DynEq, PhysicalExpr};
488/// # #[derive(Debug, PartialEq, Eq, Hash)]
489/// # struct MyExpr {}
490/// # impl PhysicalExpr for MyExpr {fn as_any(&self) -> &dyn Any { unimplemented!() }
491/// # fn data_type(&self, input_schema: &Schema) -> Result<DataType> { unimplemented!() }
492/// # fn nullable(&self, input_schema: &Schema) -> Result<bool> { unimplemented!() }
493/// # fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { unimplemented!() }
494/// # fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> { unimplemented!() }
495/// # fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>{ unimplemented!() }
496/// # fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn PhysicalExpr>> { unimplemented!() }
497/// # fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "CASE a > b THEN 1 ELSE 0 END") }
498/// # }
499/// # impl std::fmt::Display for MyExpr {fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { unimplemented!() } }
500/// # fn make_physical_expr() -> Arc<dyn PhysicalExpr> { Arc::new(MyExpr{}) }
501/// let expr: Arc<dyn PhysicalExpr> = make_physical_expr();
502/// // wrap the expression in `sql_fmt` which can be used with
503/// // `format!`, `to_string()`, etc
504/// let expr_as_sql = fmt_sql(expr.as_ref());
505/// assert_eq!(
506///   "The SQL: CASE a > b THEN 1 ELSE 0 END",
507///   format!("The SQL: {expr_as_sql}")
508/// );
509/// ```
510pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {
511    struct Wrapper<'a> {
512        expr: &'a dyn PhysicalExpr,
513    }
514
515    impl Display for Wrapper<'_> {
516        fn fmt(&self, f: &mut Formatter) -> fmt::Result {
517            self.expr.fmt_sql(f)?;
518            Ok(())
519        }
520    }
521
522    Wrapper { expr }
523}
524
525/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
526///
527/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
528/// This is used to capture the current state of `PhysicalExpr`s that may contain
529/// dynamic references to other operators in order to serialize it over the wire
530/// or treat it via downcast matching.
531///
532/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
533///
534/// # Returns
535///
536/// Returns an `Option<Arc<dyn PhysicalExpr>>` which is the snapshot of the
537/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have
538/// any dynamic references or state, it returns `None`.
539pub fn snapshot_physical_expr(
540    expr: Arc<dyn PhysicalExpr>,
541) -> Result<Arc<dyn PhysicalExpr>> {
542    expr.transform_up(|e| {
543        if let Some(snapshot) = e.snapshot()? {
544            Ok(Transformed::yes(snapshot))
545        } else {
546            Ok(Transformed::no(Arc::clone(&e)))
547        }
548    })
549    .data()
550}
551
552/// Check the generation of this `PhysicalExpr`.
553/// Dynamic `PhysicalExpr`s may have a generation that is incremented
554/// every time the state of the `PhysicalExpr` changes.
555/// If the generation changes that means this `PhysicalExpr` or one of its children
556/// has changed since the last time it was evaluated.
557///
558/// This algorithm will not produce collisions as long as the structure of the
559/// `PhysicalExpr` does not change and no `PhysicalExpr` decrements its own generation.
560pub fn snapshot_generation(expr: &Arc<dyn PhysicalExpr>) -> u64 {
561    let mut generation = 0u64;
562    expr.apply(|e| {
563        // Add the current generation of the `PhysicalExpr` to our global generation.
564        generation = generation.wrapping_add(e.snapshot_generation());
565        Ok(TreeNodeRecursion::Continue)
566    })
567    .expect("this traversal is infallible");
568
569    generation
570}
571
572/// Check if the given `PhysicalExpr` is dynamic.
573/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero,
574/// any dynamic `PhysicalExpr` should have a non-zero generation.
575pub fn is_dynamic_physical_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
576    // If the generation is non-zero, then this `PhysicalExpr` is dynamic.
577    snapshot_generation(expr) != 0
578}
579
580/// Returns true if the expression is volatile, i.e. whether it can return different
581/// results when evaluated multiple times with the same input.
582///
583/// For example the function call `RANDOM()` is volatile as each call will
584/// return a different value.
585///
586/// This method recursively checks if any sub-expression is volatile, for example
587/// `1 + RANDOM()` will return `true`.
588pub fn is_volatile(expr: &Arc<dyn PhysicalExpr>) -> bool {
589    if expr.is_volatile_node() {
590        return true;
591    }
592    let mut is_volatile = false;
593    expr.apply(|e| {
594        if e.is_volatile_node() {
595            is_volatile = true;
596            Ok(TreeNodeRecursion::Stop)
597        } else {
598            Ok(TreeNodeRecursion::Continue)
599        }
600    })
601    .expect("infallible closure should not fail");
602    is_volatile
603}