datafusion_physical_expr_common/physical_expr.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use crate::utils::scatter;
25
26use arrow::array::BooleanArray;
27use arrow::compute::filter_record_batch;
28use arrow::datatypes::{DataType, Schema};
29use arrow::record_batch::RecordBatch;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
32use datafusion_expr_common::columnar_value::ColumnarValue;
33use datafusion_expr_common::interval_arithmetic::Interval;
34use datafusion_expr_common::sort_properties::ExprProperties;
35use datafusion_expr_common::statistics::Distribution;
36
37use itertools::izip;
38
39/// Shared [`PhysicalExpr`].
40pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
41
42/// [`PhysicalExpr`]s represent expressions such as `A + 1` or `CAST(c1 AS int)`.
43///
44/// `PhysicalExpr` knows its type, nullability and can be evaluated directly on
45/// a [`RecordBatch`] (see [`Self::evaluate`]).
46///
47/// `PhysicalExpr` are the physical counterpart to [`Expr`] used in logical
48/// planning. They are typically created from [`Expr`] by a [`PhysicalPlanner`]
49/// invoked from a higher level API
50///
51/// Some important examples of `PhysicalExpr` are:
52/// * [`Column`]: Represents a column at a given index in a RecordBatch
53///
54/// To create `PhysicalExpr` from `Expr`, see
55/// * [`SessionContext::create_physical_expr`]: A high level API
56/// * [`create_physical_expr`]: A low level API
57///
58/// # Formatting `PhysicalExpr` as strings
59/// There are three ways to format `PhysicalExpr` as a string:
60/// * [`Debug`]: Standard Rust debugging format (e.g. `Constant { value: ... }`)
61/// * [`Display`]: Detailed SQL-like format that shows expression structure (e.g. (`Utf8 ("foobar")`). This is often used for debugging and tests
62/// * [`Self::fmt_sql`]: SQL-like human readable format (e.g. ('foobar')`), See also [`sql_fmt`]
63///
64/// [`SessionContext::create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.create_physical_expr
65/// [`PhysicalPlanner`]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
66/// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
67/// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
68/// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html
69pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
70 /// Returns the physical expression as [`Any`] so that it can be
71 /// downcast to a specific implementation.
72 fn as_any(&self) -> &dyn Any;
73 /// Get the data type of this expression, given the schema of the input
74 fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
75 /// Determine whether this expression is nullable, given the schema of the input
76 fn nullable(&self, input_schema: &Schema) -> Result<bool>;
77 /// Evaluate an expression against a RecordBatch
78 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
79 /// Evaluate an expression against a RecordBatch after first applying a
80 /// validity array
81 fn evaluate_selection(
82 &self,
83 batch: &RecordBatch,
84 selection: &BooleanArray,
85 ) -> Result<ColumnarValue> {
86 let tmp_batch = filter_record_batch(batch, selection)?;
87
88 let tmp_result = self.evaluate(&tmp_batch)?;
89
90 if batch.num_rows() == tmp_batch.num_rows() {
91 // All values from the `selection` filter are true.
92 Ok(tmp_result)
93 } else if let ColumnarValue::Array(a) = tmp_result {
94 scatter(selection, a.as_ref()).map(ColumnarValue::Array)
95 } else {
96 Ok(tmp_result)
97 }
98 }
99
100 /// Get a list of child PhysicalExpr that provide the input for this expr.
101 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
102
103 /// Returns a new PhysicalExpr where all children were replaced by new exprs.
104 fn with_new_children(
105 self: Arc<Self>,
106 children: Vec<Arc<dyn PhysicalExpr>>,
107 ) -> Result<Arc<dyn PhysicalExpr>>;
108
109 /// Computes the output interval for the expression, given the input
110 /// intervals.
111 ///
112 /// # Parameters
113 ///
114 /// * `children` are the intervals for the children (inputs) of this
115 /// expression.
116 ///
117 /// # Returns
118 ///
119 /// A `Result` containing the output interval for the expression in
120 /// case of success, or an error object in case of failure.
121 ///
122 /// # Example
123 ///
124 /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
125 /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
126 fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
127 not_impl_err!("Not implemented for {self}")
128 }
129
130 /// Updates bounds for child expressions, given a known interval for this
131 /// expression.
132 ///
133 /// This is used to propagate constraints down through an expression tree.
134 ///
135 /// # Parameters
136 ///
137 /// * `interval` is the currently known interval for this expression.
138 /// * `children` are the current intervals for the children of this expression.
139 ///
140 /// # Returns
141 ///
142 /// A `Result` containing a `Vec` of new intervals for the children (in order)
143 /// in case of success, or an error object in case of failure.
144 ///
145 /// If constraint propagation reveals an infeasibility for any child, returns
146 /// [`None`]. If none of the children intervals change as a result of
147 /// propagation, may return an empty vector instead of cloning `children`.
148 /// This is the default (and conservative) return value.
149 ///
150 /// # Example
151 ///
152 /// If the expression is `a + b`, the current `interval` is `[4, 5]` and the
153 /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, then
154 /// propagation would return `[0, 2]` and `[2, 4]` as `b` must be at least
155 /// `2` to make the output at least `4`.
156 fn propagate_constraints(
157 &self,
158 _interval: &Interval,
159 _children: &[&Interval],
160 ) -> Result<Option<Vec<Interval>>> {
161 Ok(Some(vec![]))
162 }
163
164 /// Computes the output statistics for the expression, given the input
165 /// statistics.
166 ///
167 /// # Parameters
168 ///
169 /// * `children` are the statistics for the children (inputs) of this
170 /// expression.
171 ///
172 /// # Returns
173 ///
174 /// A `Result` containing the output statistics for the expression in
175 /// case of success, or an error object in case of failure.
176 ///
177 /// Expressions (should) implement this function and utilize the independence
178 /// assumption, match on children distribution types and compute the output
179 /// statistics accordingly. The default implementation simply creates an
180 /// unknown output distribution by combining input ranges. This logic loses
181 /// distribution information, but is a safe default.
182 fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
183 let children_ranges = children
184 .iter()
185 .map(|c| c.range())
186 .collect::<Result<Vec<_>>>()?;
187 let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
188 let output_interval = self.evaluate_bounds(children_ranges_refs.as_slice())?;
189 let dt = output_interval.data_type();
190 if dt.eq(&DataType::Boolean) {
191 let p = if output_interval.eq(&Interval::CERTAINLY_TRUE) {
192 ScalarValue::new_one(&dt)
193 } else if output_interval.eq(&Interval::CERTAINLY_FALSE) {
194 ScalarValue::new_zero(&dt)
195 } else {
196 ScalarValue::try_from(&dt)
197 }?;
198 Distribution::new_bernoulli(p)
199 } else {
200 Distribution::new_from_interval(output_interval)
201 }
202 }
203
204 /// Updates children statistics using the given parent statistic for this
205 /// expression.
206 ///
207 /// This is used to propagate statistics down through an expression tree.
208 ///
209 /// # Parameters
210 ///
211 /// * `parent` is the currently known statistics for this expression.
212 /// * `children` are the current statistics for the children of this expression.
213 ///
214 /// # Returns
215 ///
216 /// A `Result` containing a `Vec` of new statistics for the children (in order)
217 /// in case of success, or an error object in case of failure.
218 ///
219 /// If statistics propagation reveals an infeasibility for any child, returns
220 /// [`None`]. If none of the children statistics change as a result of
221 /// propagation, may return an empty vector instead of cloning `children`.
222 /// This is the default (and conservative) return value.
223 ///
224 /// Expressions (should) implement this function and apply Bayes rule to
225 /// reconcile and update parent/children statistics. This involves utilizing
226 /// the independence assumption, and matching on distribution types. The
227 /// default implementation simply creates an unknown distribution if it can
228 /// narrow the range by propagating ranges. This logic loses distribution
229 /// information, but is a safe default.
230 fn propagate_statistics(
231 &self,
232 parent: &Distribution,
233 children: &[&Distribution],
234 ) -> Result<Option<Vec<Distribution>>> {
235 let children_ranges = children
236 .iter()
237 .map(|c| c.range())
238 .collect::<Result<Vec<_>>>()?;
239 let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
240 let parent_range = parent.range()?;
241 let Some(propagated_children) =
242 self.propagate_constraints(&parent_range, children_ranges_refs.as_slice())?
243 else {
244 return Ok(None);
245 };
246 izip!(propagated_children.into_iter(), children_ranges, children)
247 .map(|(new_interval, old_interval, child)| {
248 if new_interval == old_interval {
249 // We weren't able to narrow the range, preserve the old statistics.
250 Ok((*child).clone())
251 } else if new_interval.data_type().eq(&DataType::Boolean) {
252 let dt = old_interval.data_type();
253 let p = if new_interval.eq(&Interval::CERTAINLY_TRUE) {
254 ScalarValue::new_one(&dt)
255 } else if new_interval.eq(&Interval::CERTAINLY_FALSE) {
256 ScalarValue::new_zero(&dt)
257 } else {
258 unreachable!("Given that we have a range reduction for a boolean interval, we should have certainty")
259 }?;
260 Distribution::new_bernoulli(p)
261 } else {
262 Distribution::new_from_interval(new_interval)
263 }
264 })
265 .collect::<Result<_>>()
266 .map(Some)
267 }
268
269 /// Calculates the properties of this [`PhysicalExpr`] based on its
270 /// children's properties (i.e. order and range), recursively aggregating
271 /// the information from its children. In cases where the [`PhysicalExpr`]
272 /// has no children (e.g., `Literal` or `Column`), these properties should
273 /// be specified externally, as the function defaults to unknown properties.
274 fn get_properties(&self, _children: &[ExprProperties]) -> Result<ExprProperties> {
275 Ok(ExprProperties::new_unknown())
276 }
277
278 /// Format this `PhysicalExpr` in nice human readable "SQL" format
279 ///
280 /// Specifically, this format is designed to be readable by humans, at the
281 /// expense of details. Use `Display` or `Debug` for more detailed
282 /// representation.
283 ///
284 /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
285 ///
286 fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
287
288 /// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
289 ///
290 /// "Dynamic" in this case means containing references to structures that may change
291 /// during plan execution, such as hash tables.
292 ///
293 /// This method is used to capture the current state of `PhysicalExpr`s that may contain
294 /// dynamic references to other operators in order to serialize it over the wire
295 /// or treat it via downcast matching.
296 ///
297 /// You should not call this method directly as it does not handle recursion.
298 /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
299 /// full state of the `PhysicalExpr`.
300 ///
301 /// This is expected to return "simple" expressions that do not have mutable state
302 /// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
303 /// Callers however should *not* assume anything about the returned expressions
304 /// since callers and implementers may not agree on what "simple" or "built-in"
305 /// means.
306 /// In other words, if you need to serialize a `PhysicalExpr` across the wire
307 /// you should call this method and then try to serialize the result,
308 /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
309 /// just as if you had not called this method at all.
310 ///
311 /// In particular, consider:
312 /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
313 /// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
314 /// This function may return something like `a >= 12`.
315 /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
316 /// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
317 /// This function may return something like `t2.b IN (1, 5, 7)`.
318 ///
319 /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
320 /// or needs to serialize this state to bytes may not be able to handle these dynamic references.
321 /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
322 /// contain these dynamic references.
323 ///
324 /// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
325 /// and send it across the wire to a remote executor may want to call this method after
326 /// every batch on the source side and brodcast / update the current snaphot to the remote executor.
327 ///
328 /// Note for implementers: this method should *not* handle recursion.
329 /// Recursion is handled in [`snapshot_physical_expr`].
330 fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
331 // By default, we return None to indicate that this PhysicalExpr does not
332 // have any dynamic references or state.
333 // This is a safe default behavior.
334 Ok(None)
335 }
336}
337
338/// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object
339/// safe. To ease implementation, blanket implementation is provided for [`Eq`] types.
340pub trait DynEq {
341 fn dyn_eq(&self, other: &dyn Any) -> bool;
342}
343
344impl<T: Eq + Any> DynEq for T {
345 fn dyn_eq(&self, other: &dyn Any) -> bool {
346 other.downcast_ref::<Self>() == Some(self)
347 }
348}
349
350impl PartialEq for dyn PhysicalExpr {
351 fn eq(&self, other: &Self) -> bool {
352 self.dyn_eq(other.as_any())
353 }
354}
355
356impl Eq for dyn PhysicalExpr {}
357
358/// [`PhysicalExpr`] can't be constrained by [`Hash`] directly because it must remain
359/// object safe. To ease implementation blanket implementation is provided for [`Hash`]
360/// types.
361pub trait DynHash {
362 fn dyn_hash(&self, _state: &mut dyn Hasher);
363}
364
365impl<T: Hash + Any> DynHash for T {
366 fn dyn_hash(&self, mut state: &mut dyn Hasher) {
367 self.type_id().hash(&mut state);
368 self.hash(&mut state)
369 }
370}
371
372impl Hash for dyn PhysicalExpr {
373 fn hash<H: Hasher>(&self, state: &mut H) {
374 self.dyn_hash(state);
375 }
376}
377
378/// Returns a copy of this expr if we change any child according to the pointer comparison.
379/// The size of `children` must be equal to the size of `PhysicalExpr::children()`.
380pub fn with_new_children_if_necessary(
381 expr: Arc<dyn PhysicalExpr>,
382 children: Vec<Arc<dyn PhysicalExpr>>,
383) -> Result<Arc<dyn PhysicalExpr>> {
384 let old_children = expr.children();
385 if children.len() != old_children.len() {
386 internal_err!("PhysicalExpr: Wrong number of children")
387 } else if children.is_empty()
388 || children
389 .iter()
390 .zip(old_children.iter())
391 .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
392 {
393 Ok(expr.with_new_children(children)?)
394 } else {
395 Ok(expr)
396 }
397}
398
399#[deprecated(since = "44.0.0")]
400pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
401 if any.is::<Arc<dyn PhysicalExpr>>() {
402 any.downcast_ref::<Arc<dyn PhysicalExpr>>()
403 .unwrap()
404 .as_any()
405 } else if any.is::<Box<dyn PhysicalExpr>>() {
406 any.downcast_ref::<Box<dyn PhysicalExpr>>()
407 .unwrap()
408 .as_any()
409 } else {
410 any
411 }
412}
413
414/// Returns [`Display`] able a list of [`PhysicalExpr`]
415///
416/// Example output: `[a + 1, b]`
417pub fn format_physical_expr_list<T>(exprs: T) -> impl Display
418where
419 T: IntoIterator,
420 T::Item: Display,
421 T::IntoIter: Clone,
422{
423 struct DisplayWrapper<I>(I)
424 where
425 I: Iterator + Clone,
426 I::Item: Display;
427
428 impl<I> Display for DisplayWrapper<I>
429 where
430 I: Iterator + Clone,
431 I::Item: Display,
432 {
433 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
434 let mut iter = self.0.clone();
435 write!(f, "[")?;
436 if let Some(expr) = iter.next() {
437 write!(f, "{}", expr)?;
438 }
439 for expr in iter {
440 write!(f, ", {}", expr)?;
441 }
442 write!(f, "]")?;
443 Ok(())
444 }
445 }
446
447 DisplayWrapper(exprs.into_iter())
448}
449
450/// Prints a [`PhysicalExpr`] in a SQL-like format
451///
452/// # Example
453/// ```
454/// # // The boiler plate needed to create a `PhysicalExpr` for the example
455/// # use std::any::Any;
456/// # use std::fmt::Formatter;
457/// # use std::sync::Arc;
458/// # use arrow::array::RecordBatch;
459/// # use arrow::datatypes::{DataType, Schema};
460/// # use datafusion_common::Result;
461/// # use datafusion_expr_common::columnar_value::ColumnarValue;
462/// # use datafusion_physical_expr_common::physical_expr::{fmt_sql, DynEq, PhysicalExpr};
463/// # #[derive(Debug, Hash, PartialOrd, PartialEq)]
464/// # struct MyExpr {};
465/// # impl PhysicalExpr for MyExpr {fn as_any(&self) -> &dyn Any { unimplemented!() }
466/// # fn data_type(&self, input_schema: &Schema) -> Result<DataType> { unimplemented!() }
467/// # fn nullable(&self, input_schema: &Schema) -> Result<bool> { unimplemented!() }
468/// # fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { unimplemented!() }
469/// # fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>{ unimplemented!() }
470/// # fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn PhysicalExpr>> { unimplemented!() }
471/// # fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "CASE a > b THEN 1 ELSE 0 END") }
472/// # }
473/// # impl std::fmt::Display for MyExpr {fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { unimplemented!() } }
474/// # impl DynEq for MyExpr {fn dyn_eq(&self, other: &dyn Any) -> bool { unimplemented!() } }
475/// # fn make_physical_expr() -> Arc<dyn PhysicalExpr> { Arc::new(MyExpr{}) }
476/// let expr: Arc<dyn PhysicalExpr> = make_physical_expr();
477/// // wrap the expression in `sql_fmt` which can be used with
478/// // `format!`, `to_string()`, etc
479/// let expr_as_sql = fmt_sql(expr.as_ref());
480/// assert_eq!(
481/// "The SQL: CASE a > b THEN 1 ELSE 0 END",
482/// format!("The SQL: {expr_as_sql}")
483/// );
484/// ```
485pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {
486 struct Wrapper<'a> {
487 expr: &'a dyn PhysicalExpr,
488 }
489
490 impl Display for Wrapper<'_> {
491 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
492 self.expr.fmt_sql(f)?;
493 Ok(())
494 }
495 }
496
497 Wrapper { expr }
498}
499
500/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
501///
502/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
503/// This is used to capture the current state of `PhysicalExpr`s that may contain
504/// dynamic references to other operators in order to serialize it over the wire
505/// or treat it via downcast matching.
506///
507/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
508///
509/// # Returns
510///
511/// Returns an `Option<Arc<dyn PhysicalExpr>>` which is the snapshot of the
512/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have
513/// any dynamic references or state, it returns `None`.
514pub fn snapshot_physical_expr(
515 expr: Arc<dyn PhysicalExpr>,
516) -> Result<Arc<dyn PhysicalExpr>> {
517 expr.transform_up(|e| {
518 if let Some(snapshot) = e.snapshot()? {
519 Ok(Transformed::yes(snapshot))
520 } else {
521 Ok(Transformed::no(Arc::clone(&e)))
522 }
523 })
524 .data()
525}