datafusion_physical_expr_common/physical_expr.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::fmt;
20use std::fmt::{Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use crate::utils::scatter;
25
26use arrow::array::BooleanArray;
27use arrow::compute::filter_record_batch;
28use arrow::datatypes::{DataType, Field, FieldRef, Schema};
29use arrow::record_batch::RecordBatch;
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
32use datafusion_expr_common::columnar_value::ColumnarValue;
33use datafusion_expr_common::interval_arithmetic::Interval;
34use datafusion_expr_common::sort_properties::ExprProperties;
35use datafusion_expr_common::statistics::Distribution;
36
37use itertools::izip;
38
39/// Shared [`PhysicalExpr`].
40pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
41
42/// [`PhysicalExpr`]s represent expressions such as `A + 1` or `CAST(c1 AS int)`.
43///
44/// `PhysicalExpr` knows its type, nullability and can be evaluated directly on
45/// a [`RecordBatch`] (see [`Self::evaluate`]).
46///
47/// `PhysicalExpr` are the physical counterpart to [`Expr`] used in logical
48/// planning. They are typically created from [`Expr`] by a [`PhysicalPlanner`]
49/// invoked from a higher level API
50///
51/// Some important examples of `PhysicalExpr` are:
52/// * [`Column`]: Represents a column at a given index in a RecordBatch
53///
54/// To create `PhysicalExpr` from `Expr`, see
55/// * [`SessionContext::create_physical_expr`]: A high level API
56/// * [`create_physical_expr`]: A low level API
57///
58/// # Formatting `PhysicalExpr` as strings
59/// There are three ways to format `PhysicalExpr` as a string:
60/// * [`Debug`]: Standard Rust debugging format (e.g. `Constant { value: ... }`)
61/// * [`Display`]: Detailed SQL-like format that shows expression structure (e.g. (`Utf8 ("foobar")`). This is often used for debugging and tests
62/// * [`Self::fmt_sql`]: SQL-like human readable format (e.g. ('foobar')`), See also [`sql_fmt`]
63///
64/// [`SessionContext::create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.create_physical_expr
65/// [`PhysicalPlanner`]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
66/// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
67/// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
68/// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html
69pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
70 /// Returns the physical expression as [`Any`] so that it can be
71 /// downcast to a specific implementation.
72 fn as_any(&self) -> &dyn Any;
73 /// Get the data type of this expression, given the schema of the input
74 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
75 Ok(self.return_field(input_schema)?.data_type().to_owned())
76 }
77 /// Determine whether this expression is nullable, given the schema of the input
78 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
79 Ok(self.return_field(input_schema)?.is_nullable())
80 }
81 /// Evaluate an expression against a RecordBatch
82 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
83 /// The output field associated with this expression
84 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
85 Ok(Arc::new(Field::new(
86 format!("{self}"),
87 self.data_type(input_schema)?,
88 self.nullable(input_schema)?,
89 )))
90 }
91 /// Evaluate an expression against a RecordBatch after first applying a
92 /// validity array
93 fn evaluate_selection(
94 &self,
95 batch: &RecordBatch,
96 selection: &BooleanArray,
97 ) -> Result<ColumnarValue> {
98 let tmp_batch = filter_record_batch(batch, selection)?;
99
100 let tmp_result = self.evaluate(&tmp_batch)?;
101
102 if batch.num_rows() == tmp_batch.num_rows() {
103 // All values from the `selection` filter are true.
104 Ok(tmp_result)
105 } else if let ColumnarValue::Array(a) = tmp_result {
106 scatter(selection, a.as_ref()).map(ColumnarValue::Array)
107 } else {
108 Ok(tmp_result)
109 }
110 }
111
112 /// Get a list of child PhysicalExpr that provide the input for this expr.
113 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
114
115 /// Returns a new PhysicalExpr where all children were replaced by new exprs.
116 fn with_new_children(
117 self: Arc<Self>,
118 children: Vec<Arc<dyn PhysicalExpr>>,
119 ) -> Result<Arc<dyn PhysicalExpr>>;
120
121 /// Computes the output interval for the expression, given the input
122 /// intervals.
123 ///
124 /// # Parameters
125 ///
126 /// * `children` are the intervals for the children (inputs) of this
127 /// expression.
128 ///
129 /// # Returns
130 ///
131 /// A `Result` containing the output interval for the expression in
132 /// case of success, or an error object in case of failure.
133 ///
134 /// # Example
135 ///
136 /// If the expression is `a + b`, and the input intervals are `a: [1, 2]`
137 /// and `b: [3, 4]`, then the output interval would be `[4, 6]`.
138 fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Interval> {
139 not_impl_err!("Not implemented for {self}")
140 }
141
142 /// Updates bounds for child expressions, given a known interval for this
143 /// expression.
144 ///
145 /// This is used to propagate constraints down through an expression tree.
146 ///
147 /// # Parameters
148 ///
149 /// * `interval` is the currently known interval for this expression.
150 /// * `children` are the current intervals for the children of this expression.
151 ///
152 /// # Returns
153 ///
154 /// A `Result` containing a `Vec` of new intervals for the children (in order)
155 /// in case of success, or an error object in case of failure.
156 ///
157 /// If constraint propagation reveals an infeasibility for any child, returns
158 /// [`None`]. If none of the children intervals change as a result of
159 /// propagation, may return an empty vector instead of cloning `children`.
160 /// This is the default (and conservative) return value.
161 ///
162 /// # Example
163 ///
164 /// If the expression is `a + b`, the current `interval` is `[4, 5]` and the
165 /// inputs `a` and `b` are respectively given as `[0, 2]` and `[-∞, 4]`, then
166 /// propagation would return `[0, 2]` and `[2, 4]` as `b` must be at least
167 /// `2` to make the output at least `4`.
168 fn propagate_constraints(
169 &self,
170 _interval: &Interval,
171 _children: &[&Interval],
172 ) -> Result<Option<Vec<Interval>>> {
173 Ok(Some(vec![]))
174 }
175
176 /// Computes the output statistics for the expression, given the input
177 /// statistics.
178 ///
179 /// # Parameters
180 ///
181 /// * `children` are the statistics for the children (inputs) of this
182 /// expression.
183 ///
184 /// # Returns
185 ///
186 /// A `Result` containing the output statistics for the expression in
187 /// case of success, or an error object in case of failure.
188 ///
189 /// Expressions (should) implement this function and utilize the independence
190 /// assumption, match on children distribution types and compute the output
191 /// statistics accordingly. The default implementation simply creates an
192 /// unknown output distribution by combining input ranges. This logic loses
193 /// distribution information, but is a safe default.
194 fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
195 let children_ranges = children
196 .iter()
197 .map(|c| c.range())
198 .collect::<Result<Vec<_>>>()?;
199 let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
200 let output_interval = self.evaluate_bounds(children_ranges_refs.as_slice())?;
201 let dt = output_interval.data_type();
202 if dt.eq(&DataType::Boolean) {
203 let p = if output_interval.eq(&Interval::CERTAINLY_TRUE) {
204 ScalarValue::new_one(&dt)
205 } else if output_interval.eq(&Interval::CERTAINLY_FALSE) {
206 ScalarValue::new_zero(&dt)
207 } else {
208 ScalarValue::try_from(&dt)
209 }?;
210 Distribution::new_bernoulli(p)
211 } else {
212 Distribution::new_from_interval(output_interval)
213 }
214 }
215
216 /// Updates children statistics using the given parent statistic for this
217 /// expression.
218 ///
219 /// This is used to propagate statistics down through an expression tree.
220 ///
221 /// # Parameters
222 ///
223 /// * `parent` is the currently known statistics for this expression.
224 /// * `children` are the current statistics for the children of this expression.
225 ///
226 /// # Returns
227 ///
228 /// A `Result` containing a `Vec` of new statistics for the children (in order)
229 /// in case of success, or an error object in case of failure.
230 ///
231 /// If statistics propagation reveals an infeasibility for any child, returns
232 /// [`None`]. If none of the children statistics change as a result of
233 /// propagation, may return an empty vector instead of cloning `children`.
234 /// This is the default (and conservative) return value.
235 ///
236 /// Expressions (should) implement this function and apply Bayes rule to
237 /// reconcile and update parent/children statistics. This involves utilizing
238 /// the independence assumption, and matching on distribution types. The
239 /// default implementation simply creates an unknown distribution if it can
240 /// narrow the range by propagating ranges. This logic loses distribution
241 /// information, but is a safe default.
242 fn propagate_statistics(
243 &self,
244 parent: &Distribution,
245 children: &[&Distribution],
246 ) -> Result<Option<Vec<Distribution>>> {
247 let children_ranges = children
248 .iter()
249 .map(|c| c.range())
250 .collect::<Result<Vec<_>>>()?;
251 let children_ranges_refs = children_ranges.iter().collect::<Vec<_>>();
252 let parent_range = parent.range()?;
253 let Some(propagated_children) =
254 self.propagate_constraints(&parent_range, children_ranges_refs.as_slice())?
255 else {
256 return Ok(None);
257 };
258 izip!(propagated_children.into_iter(), children_ranges, children)
259 .map(|(new_interval, old_interval, child)| {
260 if new_interval == old_interval {
261 // We weren't able to narrow the range, preserve the old statistics.
262 Ok((*child).clone())
263 } else if new_interval.data_type().eq(&DataType::Boolean) {
264 let dt = old_interval.data_type();
265 let p = if new_interval.eq(&Interval::CERTAINLY_TRUE) {
266 ScalarValue::new_one(&dt)
267 } else if new_interval.eq(&Interval::CERTAINLY_FALSE) {
268 ScalarValue::new_zero(&dt)
269 } else {
270 unreachable!("Given that we have a range reduction for a boolean interval, we should have certainty")
271 }?;
272 Distribution::new_bernoulli(p)
273 } else {
274 Distribution::new_from_interval(new_interval)
275 }
276 })
277 .collect::<Result<_>>()
278 .map(Some)
279 }
280
281 /// Calculates the properties of this [`PhysicalExpr`] based on its
282 /// children's properties (i.e. order and range), recursively aggregating
283 /// the information from its children. In cases where the [`PhysicalExpr`]
284 /// has no children (e.g., `Literal` or `Column`), these properties should
285 /// be specified externally, as the function defaults to unknown properties.
286 fn get_properties(&self, _children: &[ExprProperties]) -> Result<ExprProperties> {
287 Ok(ExprProperties::new_unknown())
288 }
289
290 /// Format this `PhysicalExpr` in nice human readable "SQL" format
291 ///
292 /// Specifically, this format is designed to be readable by humans, at the
293 /// expense of details. Use `Display` or `Debug` for more detailed
294 /// representation.
295 ///
296 /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
297 ///
298 fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
299
300 /// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
301 ///
302 /// "Dynamic" in this case means containing references to structures that may change
303 /// during plan execution, such as hash tables.
304 ///
305 /// This method is used to capture the current state of `PhysicalExpr`s that may contain
306 /// dynamic references to other operators in order to serialize it over the wire
307 /// or treat it via downcast matching.
308 ///
309 /// You should not call this method directly as it does not handle recursion.
310 /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
311 /// full state of the `PhysicalExpr`.
312 ///
313 /// This is expected to return "simple" expressions that do not have mutable state
314 /// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
315 /// Callers however should *not* assume anything about the returned expressions
316 /// since callers and implementers may not agree on what "simple" or "built-in"
317 /// means.
318 /// In other words, if you need to serialize a `PhysicalExpr` across the wire
319 /// you should call this method and then try to serialize the result,
320 /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
321 /// just as if you had not called this method at all.
322 ///
323 /// In particular, consider:
324 /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
325 /// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
326 /// This function may return something like `a >= 12`.
327 /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
328 /// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
329 /// This function may return something like `t2.b IN (1, 5, 7)`.
330 ///
331 /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
332 /// or needs to serialize this state to bytes may not be able to handle these dynamic references.
333 /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
334 /// contain these dynamic references.
335 ///
336 /// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
337 /// and send it across the wire to a remote executor may want to call this method after
338 /// every batch on the source side and brodcast / update the current snaphot to the remote executor.
339 ///
340 /// Note for implementers: this method should *not* handle recursion.
341 /// Recursion is handled in [`snapshot_physical_expr`].
342 fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
343 // By default, we return None to indicate that this PhysicalExpr does not
344 // have any dynamic references or state.
345 // This is a safe default behavior.
346 Ok(None)
347 }
348}
349
350/// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object
351/// safe. To ease implementation, blanket implementation is provided for [`Eq`] types.
352pub trait DynEq {
353 fn dyn_eq(&self, other: &dyn Any) -> bool;
354}
355
356impl<T: Eq + Any> DynEq for T {
357 fn dyn_eq(&self, other: &dyn Any) -> bool {
358 other.downcast_ref::<Self>() == Some(self)
359 }
360}
361
362impl PartialEq for dyn PhysicalExpr {
363 fn eq(&self, other: &Self) -> bool {
364 self.dyn_eq(other.as_any())
365 }
366}
367
368impl Eq for dyn PhysicalExpr {}
369
370/// [`PhysicalExpr`] can't be constrained by [`Hash`] directly because it must remain
371/// object safe. To ease implementation blanket implementation is provided for [`Hash`]
372/// types.
373pub trait DynHash {
374 fn dyn_hash(&self, _state: &mut dyn Hasher);
375}
376
377impl<T: Hash + Any> DynHash for T {
378 fn dyn_hash(&self, mut state: &mut dyn Hasher) {
379 self.type_id().hash(&mut state);
380 self.hash(&mut state)
381 }
382}
383
384impl Hash for dyn PhysicalExpr {
385 fn hash<H: Hasher>(&self, state: &mut H) {
386 self.dyn_hash(state);
387 }
388}
389
390/// Returns a copy of this expr if we change any child according to the pointer comparison.
391/// The size of `children` must be equal to the size of `PhysicalExpr::children()`.
392pub fn with_new_children_if_necessary(
393 expr: Arc<dyn PhysicalExpr>,
394 children: Vec<Arc<dyn PhysicalExpr>>,
395) -> Result<Arc<dyn PhysicalExpr>> {
396 let old_children = expr.children();
397 if children.len() != old_children.len() {
398 internal_err!("PhysicalExpr: Wrong number of children")
399 } else if children.is_empty()
400 || children
401 .iter()
402 .zip(old_children.iter())
403 .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
404 {
405 Ok(expr.with_new_children(children)?)
406 } else {
407 Ok(expr)
408 }
409}
410
411#[deprecated(since = "44.0.0")]
412pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
413 if any.is::<Arc<dyn PhysicalExpr>>() {
414 any.downcast_ref::<Arc<dyn PhysicalExpr>>()
415 .unwrap()
416 .as_any()
417 } else if any.is::<Box<dyn PhysicalExpr>>() {
418 any.downcast_ref::<Box<dyn PhysicalExpr>>()
419 .unwrap()
420 .as_any()
421 } else {
422 any
423 }
424}
425
426/// Returns [`Display`] able a list of [`PhysicalExpr`]
427///
428/// Example output: `[a + 1, b]`
429pub fn format_physical_expr_list<T>(exprs: T) -> impl Display
430where
431 T: IntoIterator,
432 T::Item: Display,
433 T::IntoIter: Clone,
434{
435 struct DisplayWrapper<I>(I)
436 where
437 I: Iterator + Clone,
438 I::Item: Display;
439
440 impl<I> Display for DisplayWrapper<I>
441 where
442 I: Iterator + Clone,
443 I::Item: Display,
444 {
445 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
446 let mut iter = self.0.clone();
447 write!(f, "[")?;
448 if let Some(expr) = iter.next() {
449 write!(f, "{expr}")?;
450 }
451 for expr in iter {
452 write!(f, ", {expr}")?;
453 }
454 write!(f, "]")?;
455 Ok(())
456 }
457 }
458
459 DisplayWrapper(exprs.into_iter())
460}
461
462/// Prints a [`PhysicalExpr`] in a SQL-like format
463///
464/// # Example
465/// ```
466/// # // The boiler plate needed to create a `PhysicalExpr` for the example
467/// # use std::any::Any;
468/// use std::collections::HashMap;
469/// # use std::fmt::Formatter;
470/// # use std::sync::Arc;
471/// # use arrow::array::RecordBatch;
472/// # use arrow::datatypes::{DataType, Field, FieldRef, Schema};
473/// # use datafusion_common::Result;
474/// # use datafusion_expr_common::columnar_value::ColumnarValue;
475/// # use datafusion_physical_expr_common::physical_expr::{fmt_sql, DynEq, PhysicalExpr};
476/// # #[derive(Debug, Hash, PartialOrd, PartialEq)]
477/// # struct MyExpr {}
478/// # impl PhysicalExpr for MyExpr {fn as_any(&self) -> &dyn Any { unimplemented!() }
479/// # fn data_type(&self, input_schema: &Schema) -> Result<DataType> { unimplemented!() }
480/// # fn nullable(&self, input_schema: &Schema) -> Result<bool> { unimplemented!() }
481/// # fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { unimplemented!() }
482/// # fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> { unimplemented!() }
483/// # fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>{ unimplemented!() }
484/// # fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn PhysicalExpr>> { unimplemented!() }
485/// # fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "CASE a > b THEN 1 ELSE 0 END") }
486/// # }
487/// # impl std::fmt::Display for MyExpr {fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { unimplemented!() } }
488/// # impl DynEq for MyExpr {fn dyn_eq(&self, other: &dyn Any) -> bool { unimplemented!() } }
489/// # fn make_physical_expr() -> Arc<dyn PhysicalExpr> { Arc::new(MyExpr{}) }
490/// let expr: Arc<dyn PhysicalExpr> = make_physical_expr();
491/// // wrap the expression in `sql_fmt` which can be used with
492/// // `format!`, `to_string()`, etc
493/// let expr_as_sql = fmt_sql(expr.as_ref());
494/// assert_eq!(
495/// "The SQL: CASE a > b THEN 1 ELSE 0 END",
496/// format!("The SQL: {expr_as_sql}")
497/// );
498/// ```
499pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {
500 struct Wrapper<'a> {
501 expr: &'a dyn PhysicalExpr,
502 }
503
504 impl Display for Wrapper<'_> {
505 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
506 self.expr.fmt_sql(f)?;
507 Ok(())
508 }
509 }
510
511 Wrapper { expr }
512}
513
514/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
515///
516/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
517/// This is used to capture the current state of `PhysicalExpr`s that may contain
518/// dynamic references to other operators in order to serialize it over the wire
519/// or treat it via downcast matching.
520///
521/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
522///
523/// # Returns
524///
525/// Returns an `Option<Arc<dyn PhysicalExpr>>` which is the snapshot of the
526/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have
527/// any dynamic references or state, it returns `None`.
528pub fn snapshot_physical_expr(
529 expr: Arc<dyn PhysicalExpr>,
530) -> Result<Arc<dyn PhysicalExpr>> {
531 expr.transform_up(|e| {
532 if let Some(snapshot) = e.snapshot()? {
533 Ok(Transformed::yes(snapshot))
534 } else {
535 Ok(Transformed::no(Arc::clone(&e)))
536 }
537 })
538 .data()
539}