datafusion_physical_expr_common/physical_expr.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use crate::utils::scatter;
25
26use arrow::array::BooleanArray;
27use arrow::compute::filter_record_batch;
28use arrow::datatypes::{DataType, Field, FieldRef, Schema};
29use arrow::record_batch::RecordBatch;
30use datafusion_common::tree_node::{
31 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
32};
33use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
34use datafusion_expr_common::columnar_value::ColumnarValue;
35use datafusion_expr_common::interval_arithmetic::Interval;
36use datafusion_expr_common::sort_properties::ExprProperties;
37use datafusion_expr_common::statistics::Distribution;
38
39use itertools::izip;
40
41/// Shared [`PhysicalExpr`].
42pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
43
44/// [`PhysicalExpr`]s represent expressions such as `A + 1` or `CAST(c1 AS int)`.
45///
46/// `PhysicalExpr` knows its type, nullability and can be evaluated directly on
47/// a [`RecordBatch`] (see [`Self::evaluate`]).
48///
49/// `PhysicalExpr` are the physical counterpart to [`Expr`] used in logical
50/// planning. They are typically created from [`Expr`] by a [`PhysicalPlanner`]
51/// invoked from a higher level API
52///
53/// Some important examples of `PhysicalExpr` are:
54/// * [`Column`]: Represents a column at a given index in a RecordBatch
55///
56/// To create `PhysicalExpr` from `Expr`, see
57/// * [`SessionContext::create_physical_expr`]: A high level API
58/// * [`create_physical_expr`]: A low level API
59///
60/// # Formatting `PhysicalExpr` as strings
61/// There are three ways to format `PhysicalExpr` as a string:
62/// * [`Debug`]: Standard Rust debugging format (e.g. `Constant { value: ... }`)
63/// * [`Display`]: Detailed SQL-like format that shows expression structure (e.g. (`Utf8 ("foobar")`). This is often used for debugging and tests
64/// * [`Self::fmt_sql`]: SQL-like human readable format (e.g. ('foobar')`), See also [`sql_fmt`]
65///
66/// [`SessionContext::create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.create_physical_expr
67/// [`PhysicalPlanner`]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
68/// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
69/// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
70/// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html
71pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
72 /// Returns the physical expression as [`Any`] so that it can be
73 /// downcast to a specific implementation.
74 fn as_any(&self) -> &dyn Any;
75 /// Get the data type of this expression, given the schema of the input
76 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
77 Ok(self.return_field(input_schema)?.data_type().to_owned())
78 }
79 /// Determine whether this expression is nullable, given the schema of the input
80 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
81 Ok(self.return_field(input_schema)?.is_nullable())
82 }
83 /// Evaluate an expression against a RecordBatch
84 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
85 /// The output field associated with this expression
86 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
87 Ok(Arc::new(Field::new(
88 format!("{self}"),
89 self.data_type(input_schema)?,
90 self.nullable(input_schema)?,
91 )))
92 }
93 /// Evaluate an expression against a RecordBatch after first applying a
94 /// validity array
95 fn evaluate_selection(
96 &self,
97 batch: &RecordBatch,
98 selection: &BooleanArray,
99 ) -> Result<ColumnarValue> {
100 let tmp_batch = filter_record_batch(batch, selection)?;
101
102 let tmp_result = self.evaluate(&tmp_batch)?;
103
104 if batch.num_rows() == tmp_batch.num_rows() {
105 // All values from the `selection` filter are true.
106 Ok(tmp_result)
107 } else if let ColumnarValue::Array(a) = tmp_result {
108 scatter(selection, a.as_ref()).map(ColumnarValue::Array)
109 } else {
110 Ok(tmp_result)
111 }
112 }
113
114 /// Get a list of child PhysicalExpr that provide the input for this expr.
115 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
116
117 /// Returns a new PhysicalExpr where all children were replaced by new exprs.
118 fn with_new_children(
119 self: Arc<Self>,
120 children: Vec<Arc<dyn PhysicalExpr>>,
121 ) -> Result<Arc<dyn PhysicalExpr>>;
122
123 /// Computes the output interval for the expression, given the input
124 /// intervals.
125 ///
126 /// # Parameters
127 ///
128 /// * `children` are the intervals for the children (inputs) of this
129 /// expression.
130 ///
131 /// # Returns
132 ///
133 /// A `Result` containing the output interval for the expression in
134 /// case of success, or an error object in case of failure.
135 ///
136 /// # Example
137 ///
138 /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
139 /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
140 fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
141 not_impl_err!("Not implemented for {self}")
142 }
143
144 /// Updates bounds for child expressions, given a known interval for this
145 /// expression.
146 ///
147 /// This is used to propagate constraints down through an expression tree.
148 ///
149 /// # Parameters
150 ///
151 /// * `interval` is the currently known interval for this expression.
152 /// * `children` are the current intervals for the children of this expression.
153 ///
154 /// # Returns
155 ///
156 /// A `Result` containing a `Vec` of new intervals for the children (in order)
157 /// in case of success, or an error object in case of failure.
158 ///
159 /// If constraint propagation reveals an infeasibility for any child, returns
160 /// [`None`]. If none of the children intervals change as a result of
161 /// propagation, may return an empty vector instead of cloning `children`.
162 /// This is the default (and conservative) return value.
163 ///
164 /// # Example
165 ///
166 /// If the expression is `a + b`, the current `interval` is `[4, 5]` and the
167 /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, then
168 /// propagation would return `[0, 2]` and `[2, 4]` as `b` must be at least
169 /// `2` to make the output at least `4`.
170 fn propagate_constraints(
171 &self,
172 _interval: &Interval,
173 _children: &[&Interval],
174 ) -> Result<Option<Vec<Interval>>> {
175 Ok(Some(vec![]))
176 }
177
178 /// Computes the output statistics for the expression, given the input
179 /// statistics.
180 ///
181 /// # Parameters
182 ///
183 /// * `children` are the statistics for the children (inputs) of this
184 /// expression.
185 ///
186 /// # Returns
187 ///
188 /// A `Result` containing the output statistics for the expression in
189 /// case of success, or an error object in case of failure.
190 ///
191 /// Expressions (should) implement this function and utilize the independence
192 /// assumption, match on children distribution types and compute the output
193 /// statistics accordingly. The default implementation simply creates an
194 /// unknown output distribution by combining input ranges. This logic loses
195 /// distribution information, but is a safe default.
196 fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
197 let children_ranges = children
198 .iter()
199 .map(|c| c.range())
200 .collect::<Result<Vec<_>>>()?;
201 let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
202 let output_interval = self.evaluate_bounds(children_ranges_refs.as_slice())?;
203 let dt = output_interval.data_type();
204 if dt.eq(&DataType::Boolean) {
205 let p = if output_interval.eq(&Interval::CERTAINLY_TRUE) {
206 ScalarValue::new_one(&dt)
207 } else if output_interval.eq(&Interval::CERTAINLY_FALSE) {
208 ScalarValue::new_zero(&dt)
209 } else {
210 ScalarValue::try_from(&dt)
211 }?;
212 Distribution::new_bernoulli(p)
213 } else {
214 Distribution::new_from_interval(output_interval)
215 }
216 }
217
218 /// Updates children statistics using the given parent statistic for this
219 /// expression.
220 ///
221 /// This is used to propagate statistics down through an expression tree.
222 ///
223 /// # Parameters
224 ///
225 /// * `parent` is the currently known statistics for this expression.
226 /// * `children` are the current statistics for the children of this expression.
227 ///
228 /// # Returns
229 ///
230 /// A `Result` containing a `Vec` of new statistics for the children (in order)
231 /// in case of success, or an error object in case of failure.
232 ///
233 /// If statistics propagation reveals an infeasibility for any child, returns
234 /// [`None`]. If none of the children statistics change as a result of
235 /// propagation, may return an empty vector instead of cloning `children`.
236 /// This is the default (and conservative) return value.
237 ///
238 /// Expressions (should) implement this function and apply Bayes rule to
239 /// reconcile and update parent/children statistics. This involves utilizing
240 /// the independence assumption, and matching on distribution types. The
241 /// default implementation simply creates an unknown distribution if it can
242 /// narrow the range by propagating ranges. This logic loses distribution
243 /// information, but is a safe default.
244 fn propagate_statistics(
245 &self,
246 parent: &Distribution,
247 children: &[&Distribution],
248 ) -> Result<Option<Vec<Distribution>>> {
249 let children_ranges = children
250 .iter()
251 .map(|c| c.range())
252 .collect::<Result<Vec<_>>>()?;
253 let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
254 let parent_range = parent.range()?;
255 let Some(propagated_children) =
256 self.propagate_constraints(&parent_range, children_ranges_refs.as_slice())?
257 else {
258 return Ok(None);
259 };
260 izip!(propagated_children.into_iter(), children_ranges, children)
261 .map(|(new_interval, old_interval, child)| {
262 if new_interval == old_interval {
263 // We weren't able to narrow the range, preserve the old statistics.
264 Ok((*child).clone())
265 } else if new_interval.data_type().eq(&DataType::Boolean) {
266 let dt = old_interval.data_type();
267 let p = if new_interval.eq(&Interval::CERTAINLY_TRUE) {
268 ScalarValue::new_one(&dt)
269 } else if new_interval.eq(&Interval::CERTAINLY_FALSE) {
270 ScalarValue::new_zero(&dt)
271 } else {
272 unreachable!("Given that we have a range reduction for a boolean interval, we should have certainty")
273 }?;
274 Distribution::new_bernoulli(p)
275 } else {
276 Distribution::new_from_interval(new_interval)
277 }
278 })
279 .collect::<Result<_>>()
280 .map(Some)
281 }
282
283 /// Calculates the properties of this [`PhysicalExpr`] based on its
284 /// children's properties (i.e. order and range), recursively aggregating
285 /// the information from its children. In cases where the [`PhysicalExpr`]
286 /// has no children (e.g., `Literal` or `Column`), these properties should
287 /// be specified externally, as the function defaults to unknown properties.
288 fn get_properties(&self, _children: &[ExprProperties]) -> Result<ExprProperties> {
289 Ok(ExprProperties::new_unknown())
290 }
291
292 /// Format this `PhysicalExpr` in nice human readable "SQL" format
293 ///
294 /// Specifically, this format is designed to be readable by humans, at the
295 /// expense of details. Use `Display` or `Debug` for more detailed
296 /// representation.
297 ///
298 /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
299 ///
300 fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
301
302 /// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
303 ///
304 /// "Dynamic" in this case means containing references to structures that may change
305 /// during plan execution, such as hash tables.
306 ///
307 /// This method is used to capture the current state of `PhysicalExpr`s that may contain
308 /// dynamic references to other operators in order to serialize it over the wire
309 /// or treat it via downcast matching.
310 ///
311 /// You should not call this method directly as it does not handle recursion.
312 /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
313 /// full state of the `PhysicalExpr`.
314 ///
315 /// This is expected to return "simple" expressions that do not have mutable state
316 /// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
317 /// Callers however should *not* assume anything about the returned expressions
318 /// since callers and implementers may not agree on what "simple" or "built-in"
319 /// means.
320 /// In other words, if you need to serialize a `PhysicalExpr` across the wire
321 /// you should call this method and then try to serialize the result,
322 /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
323 /// just as if you had not called this method at all.
324 ///
325 /// In particular, consider:
326 /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
327 /// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
328 /// This function may return something like `a >= 12`.
329 /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
330 /// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
331 /// This function may return something like `t2.b IN (1, 5, 7)`.
332 ///
333 /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
334 /// or needs to serialize this state to bytes may not be able to handle these dynamic references.
335 /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
336 /// contain these dynamic references.
337 ///
338 /// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
339 /// and send it across the wire to a remote executor may want to call this method after
340 /// every batch on the source side and brodcast / update the current snaphot to the remote executor.
341 ///
342 /// Note for implementers: this method should *not* handle recursion.
343 /// Recursion is handled in [`snapshot_physical_expr`].
344 fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
345 // By default, we return None to indicate that this PhysicalExpr does not
346 // have any dynamic references or state.
347 // This is a safe default behavior.
348 Ok(None)
349 }
350
351 /// Returns the generation of this `PhysicalExpr` for snapshotting purposes.
352 /// The generation is an arbitrary u64 that can be used to track changes
353 /// in the state of the `PhysicalExpr` over time without having to do an exhaustive comparison.
354 /// This is useful to avoid unecessary computation or serialization if there are no changes to the expression.
355 /// In particular, dynamic expressions that may change over time; this allows cheap checks for changes.
356 /// Static expressions that do not change over time should return 0, as does the default implementation.
357 /// You should not call this method directly as it does not handle recursion.
358 /// Instead use [`snapshot_generation`] to handle recursion and capture the
359 /// full state of the `PhysicalExpr`.
360 fn snapshot_generation(&self) -> u64 {
361 // By default, we return 0 to indicate that this PhysicalExpr does not
362 // have any dynamic references or state.
363 // Since the recursive algorithm XORs the generations of all children the overall
364 // generation will be 0 if no children have a non-zero generation, meaning that
365 // static expressions will always return 0.
366 0
367 }
368}
369
370/// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object
371/// safe. To ease implementation, blanket implementation is provided for [`Eq`] types.
372pub trait DynEq {
373 fn dyn_eq(&self, other: &dyn Any) -> bool;
374}
375
376impl<T: Eq + Any> DynEq for T {
377 fn dyn_eq(&self, other: &dyn Any) -> bool {
378 other.downcast_ref::<Self>() == Some(self)
379 }
380}
381
382impl PartialEq for dyn PhysicalExpr {
383 fn eq(&self, other: &Self) -> bool {
384 self.dyn_eq(other.as_any())
385 }
386}
387
388impl Eq for dyn PhysicalExpr {}
389
390/// [`PhysicalExpr`] can't be constrained by [`Hash`] directly because it must remain
391/// object safe. To ease implementation blanket implementation is provided for [`Hash`]
392/// types.
393pub trait DynHash {
394 fn dyn_hash(&self, _state: &mut dyn Hasher);
395}
396
397impl<T: Hash + Any> DynHash for T {
398 fn dyn_hash(&self, mut state: &mut dyn Hasher) {
399 self.type_id().hash(&mut state);
400 self.hash(&mut state)
401 }
402}
403
404impl Hash for dyn PhysicalExpr {
405 fn hash<H: Hasher>(&self, state: &mut H) {
406 self.dyn_hash(state);
407 }
408}
409
410/// Returns a copy of this expr if we change any child according to the pointer comparison.
411/// The size of `children` must be equal to the size of `PhysicalExpr::children()`.
412pub fn with_new_children_if_necessary(
413 expr: Arc<dyn PhysicalExpr>,
414 children: Vec<Arc<dyn PhysicalExpr>>,
415) -> Result<Arc<dyn PhysicalExpr>> {
416 let old_children = expr.children();
417 if children.len() != old_children.len() {
418 internal_err!("PhysicalExpr: Wrong number of children")
419 } else if children.is_empty()
420 || children
421 .iter()
422 .zip(old_children.iter())
423 .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
424 {
425 Ok(expr.with_new_children(children)?)
426 } else {
427 Ok(expr)
428 }
429}
430
431#[deprecated(since = "44.0.0")]
432pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
433 if any.is::<Arc<dyn PhysicalExpr>>() {
434 any.downcast_ref::<Arc<dyn PhysicalExpr>>()
435 .unwrap()
436 .as_any()
437 } else if any.is::<Box<dyn PhysicalExpr>>() {
438 any.downcast_ref::<Box<dyn PhysicalExpr>>()
439 .unwrap()
440 .as_any()
441 } else {
442 any
443 }
444}
445
446/// Returns [`Display`] able a list of [`PhysicalExpr`]
447///
448/// Example output: `[a + 1, b]`
449pub fn format_physical_expr_list<T>(exprs: T) -> impl Display
450where
451 T: IntoIterator,
452 T::Item: Display,
453 T::IntoIter: Clone,
454{
455 struct DisplayWrapper<I>(I)
456 where
457 I: Iterator + Clone,
458 I::Item: Display;
459
460 impl<I> Display for DisplayWrapper<I>
461 where
462 I: Iterator + Clone,
463 I::Item: Display,
464 {
465 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
466 let mut iter = self.0.clone();
467 write!(f, "[")?;
468 if let Some(expr) = iter.next() {
469 write!(f, "{expr}")?;
470 }
471 for expr in iter {
472 write!(f, ", {expr}")?;
473 }
474 write!(f, "]")?;
475 Ok(())
476 }
477 }
478
479 DisplayWrapper(exprs.into_iter())
480}
481
482/// Prints a [`PhysicalExpr`] in a SQL-like format
483///
484/// # Example
485/// ```
486/// # // The boiler plate needed to create a `PhysicalExpr` for the example
487/// # use std::any::Any;
488/// use std::collections::HashMap;
489/// # use std::fmt::Formatter;
490/// # use std::sync::Arc;
491/// # use arrow::array::RecordBatch;
492/// # use arrow::datatypes::{DataType, Field, FieldRef, Schema};
493/// # use datafusion_common::Result;
494/// # use datafusion_expr_common::columnar_value::ColumnarValue;
495/// # use datafusion_physical_expr_common::physical_expr::{fmt_sql, DynEq, PhysicalExpr};
496/// # #[derive(Debug, Hash, PartialOrd, PartialEq)]
497/// # struct MyExpr {}
498/// # impl PhysicalExpr for MyExpr {fn as_any(&self) -> &dyn Any { unimplemented!() }
499/// # fn data_type(&self, input_schema: &Schema) -> Result<DataType> { unimplemented!() }
500/// # fn nullable(&self, input_schema: &Schema) -> Result<bool> { unimplemented!() }
501/// # fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { unimplemented!() }
502/// # fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> { unimplemented!() }
503/// # fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>{ unimplemented!() }
504/// # fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn PhysicalExpr>> { unimplemented!() }
505/// # fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "CASE a > b THEN 1 ELSE 0 END") }
506/// # }
507/// # impl std::fmt::Display for MyExpr {fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { unimplemented!() } }
508/// # impl DynEq for MyExpr {fn dyn_eq(&self, other: &dyn Any) -> bool { unimplemented!() } }
509/// # fn make_physical_expr() -> Arc<dyn PhysicalExpr> { Arc::new(MyExpr{}) }
510/// let expr: Arc<dyn PhysicalExpr> = make_physical_expr();
511/// // wrap the expression in `sql_fmt` which can be used with
512/// // `format!`, `to_string()`, etc
513/// let expr_as_sql = fmt_sql(expr.as_ref());
514/// assert_eq!(
515/// "The SQL: CASE a > b THEN 1 ELSE 0 END",
516/// format!("The SQL: {expr_as_sql}")
517/// );
518/// ```
519pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {
520 struct Wrapper<'a> {
521 expr: &'a dyn PhysicalExpr,
522 }
523
524 impl Display for Wrapper<'_> {
525 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
526 self.expr.fmt_sql(f)?;
527 Ok(())
528 }
529 }
530
531 Wrapper { expr }
532}
533
534/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
535///
536/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
537/// This is used to capture the current state of `PhysicalExpr`s that may contain
538/// dynamic references to other operators in order to serialize it over the wire
539/// or treat it via downcast matching.
540///
541/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
542///
543/// # Returns
544///
545/// Returns an `Option<Arc<dyn PhysicalExpr>>` which is the snapshot of the
546/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have
547/// any dynamic references or state, it returns `None`.
548pub fn snapshot_physical_expr(
549 expr: Arc<dyn PhysicalExpr>,
550) -> Result<Arc<dyn PhysicalExpr>> {
551 expr.transform_up(|e| {
552 if let Some(snapshot) = e.snapshot()? {
553 Ok(Transformed::yes(snapshot))
554 } else {
555 Ok(Transformed::no(Arc::clone(&e)))
556 }
557 })
558 .data()
559}
560
561/// Check the generation of this `PhysicalExpr`.
562/// Dynamic `PhysicalExpr`s may have a generation that is incremented
563/// every time the state of the `PhysicalExpr` changes.
564/// If the generation changes that means this `PhysicalExpr` or one of its children
565/// has changed since the last time it was evaluated.
566///
567/// This algorithm will not produce collisions as long as the structure of the
568/// `PhysicalExpr` does not change and no `PhysicalExpr` decrements its own generation.
569pub fn snapshot_generation(expr: &Arc<dyn PhysicalExpr>) -> u64 {
570 let mut generation = 0u64;
571 expr.apply(|e| {
572 // Add the current generation of the `PhysicalExpr` to our global generation.
573 generation = generation.wrapping_add(e.snapshot_generation());
574 Ok(TreeNodeRecursion::Continue)
575 })
576 .expect("this traversal is infallible");
577
578 generation
579}
580
581/// Check if the given `PhysicalExpr` is dynamic.
582/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero,
583/// any dynamic `PhysicalExpr` should have a non-zero generation.
584pub fn is_dynamic_physical_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
585 // If the generation is non-zero, then this `PhysicalExpr` is dynamic.
586 snapshot_generation(expr) != 0
587}