datafusion_physical_expr/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::ScalarFunctionExpr;
22use crate::{
23    expressions::{self, binary, like, similar_to, Column, Literal},
24    PhysicalExpr,
25};
26
27use arrow::datatypes::Schema;
28use datafusion_common::{
29    exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
30};
31use datafusion_expr::execution_props::ExecutionProps;
32use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
33use datafusion_expr::var_provider::is_system_variables;
34use datafusion_expr::var_provider::VarType;
35use datafusion_expr::{
36    binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
37};
38
39/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
40/// AS int)`.
41///
42/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical
43/// planning, and can be evaluated directly on a [RecordBatch]. They are
44/// normally created from [Expr] by a [PhysicalPlanner] and can be created
45/// directly using [create_physical_expr].
46///
47/// A Physical expression knows its type, nullability and how to evaluate itself.
48///
49/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
50/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
51///
52/// # Example: Create `PhysicalExpr` from `Expr`
53/// ```
54/// # use arrow::datatypes::{DataType, Field, Schema};
55/// # use datafusion_common::DFSchema;
56/// # use datafusion_expr::{Expr, col, lit};
57/// # use datafusion_physical_expr::create_physical_expr;
58/// # use datafusion_expr::execution_props::ExecutionProps;
59/// // For a logical expression `a = 1`, we can create a physical expression
60/// let expr = col("a").eq(lit(1));
61/// // To create a PhysicalExpr we need 1. a schema
62/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
63/// let df_schema = DFSchema::try_from(schema).unwrap();
64/// // 2. ExecutionProps
65/// let props = ExecutionProps::new();
66/// // We can now create a PhysicalExpr:
67/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
68/// ```
69///
70/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue]
71/// ```
72/// # use std::sync::Arc;
73/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
74/// # use arrow::datatypes::{DataType, Field, Schema};
75/// # use datafusion_common::{assert_batches_eq, DFSchema};
76/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
77/// # use datafusion_physical_expr::create_physical_expr;
78/// # use datafusion_expr::execution_props::ExecutionProps;
79/// # let expr = col("a").eq(lit(1));
80/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
81/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
82/// # let props = ExecutionProps::new();
83/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this:
84/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
85/// // Input of [1,2,3]
86/// let input_batch = RecordBatch::try_from_iter(vec![
87///   ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
88/// ]).unwrap();
89/// // The result is a ColumnarValue (either an Array or a Scalar)
90/// let result = physical_expr.evaluate(&input_batch).unwrap();
91/// // In this case, a BooleanArray with the result of the comparison
92/// let ColumnarValue::Array(arr) = result else {
93///  panic!("Expected an array")
94/// };
95/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false]));
96/// ```
97///
98/// [ColumnarValue]: datafusion_expr::ColumnarValue
99///
100/// Create a physical expression from a logical expression ([Expr]).
101///
102/// # Arguments
103///
104/// * `e` - The logical expression
105/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
106///   to qualified or unqualified fields by name.
107pub fn create_physical_expr(
108    e: &Expr,
109    input_dfschema: &DFSchema,
110    execution_props: &ExecutionProps,
111) -> Result<Arc<dyn PhysicalExpr>> {
112    let input_schema: &Schema = &input_dfschema.into();
113
114    match e {
115        Expr::Alias(Alias { expr, metadata, .. }) => {
116            if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
117                let mut new_metadata = prior_metadata
118                    .as_ref()
119                    .map(|m| {
120                        m.iter()
121                            .map(|(k, v)| (k.clone(), v.clone()))
122                            .collect::<HashMap<String, String>>()
123                    })
124                    .unwrap_or_default();
125                if let Some(metadata) = metadata {
126                    new_metadata.extend(metadata.clone());
127                }
128                let new_metadata = match new_metadata.is_empty() {
129                    true => None,
130                    false => Some(new_metadata),
131                };
132
133                Ok(Arc::new(Literal::new_with_metadata(
134                    v.clone(),
135                    new_metadata,
136                )))
137            } else {
138                Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
139            }
140        }
141        Expr::Column(c) => {
142            let idx = input_dfschema.index_of_column(c)?;
143            Ok(Arc::new(Column::new(&c.name, idx)))
144        }
145        Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
146            value.clone(),
147            metadata
148                .as_ref()
149                .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()),
150        ))),
151        Expr::ScalarVariable(_, variable_names) => {
152            if is_system_variables(variable_names) {
153                match execution_props.get_var_provider(VarType::System) {
154                    Some(provider) => {
155                        let scalar_value = provider.get_value(variable_names.clone())?;
156                        Ok(Arc::new(Literal::new(scalar_value)))
157                    }
158                    _ => plan_err!("No system variable provider found"),
159                }
160            } else {
161                match execution_props.get_var_provider(VarType::UserDefined) {
162                    Some(provider) => {
163                        let scalar_value = provider.get_value(variable_names.clone())?;
164                        Ok(Arc::new(Literal::new(scalar_value)))
165                    }
166                    _ => plan_err!("No user defined variable provider found"),
167                }
168            }
169        }
170        Expr::IsTrue(expr) => {
171            let binary_op = binary_expr(
172                expr.as_ref().clone(),
173                Operator::IsNotDistinctFrom,
174                lit(true),
175            );
176            create_physical_expr(&binary_op, input_dfschema, execution_props)
177        }
178        Expr::IsNotTrue(expr) => {
179            let binary_op =
180                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
181            create_physical_expr(&binary_op, input_dfschema, execution_props)
182        }
183        Expr::IsFalse(expr) => {
184            let binary_op = binary_expr(
185                expr.as_ref().clone(),
186                Operator::IsNotDistinctFrom,
187                lit(false),
188            );
189            create_physical_expr(&binary_op, input_dfschema, execution_props)
190        }
191        Expr::IsNotFalse(expr) => {
192            let binary_op =
193                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
194            create_physical_expr(&binary_op, input_dfschema, execution_props)
195        }
196        Expr::IsUnknown(expr) => {
197            let binary_op = binary_expr(
198                expr.as_ref().clone(),
199                Operator::IsNotDistinctFrom,
200                Expr::Literal(ScalarValue::Boolean(None), None),
201            );
202            create_physical_expr(&binary_op, input_dfschema, execution_props)
203        }
204        Expr::IsNotUnknown(expr) => {
205            let binary_op = binary_expr(
206                expr.as_ref().clone(),
207                Operator::IsDistinctFrom,
208                Expr::Literal(ScalarValue::Boolean(None), None),
209            );
210            create_physical_expr(&binary_op, input_dfschema, execution_props)
211        }
212        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
213            // Create physical expressions for left and right operands
214            let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
215            let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
216            // Note that the logical planner is responsible
217            // for type coercion on the arguments (e.g. if one
218            // argument was originally Int32 and one was
219            // Int64 they will both be coerced to Int64).
220            //
221            // There should be no coercion during physical
222            // planning.
223            binary(lhs, *op, rhs, input_schema)
224        }
225        Expr::Like(Like {
226            negated,
227            expr,
228            pattern,
229            escape_char,
230            case_insensitive,
231        }) => {
232            // `\` is the implicit escape, see https://github.com/apache/datafusion/issues/13291
233            if escape_char.unwrap_or('\\') != '\\' {
234                return exec_err!(
235                    "LIKE does not support escape_char other than the backslash (\\)"
236                );
237            }
238            let physical_expr =
239                create_physical_expr(expr, input_dfschema, execution_props)?;
240            let physical_pattern =
241                create_physical_expr(pattern, input_dfschema, execution_props)?;
242            like(
243                *negated,
244                *case_insensitive,
245                physical_expr,
246                physical_pattern,
247                input_schema,
248            )
249        }
250        Expr::SimilarTo(Like {
251            negated,
252            expr,
253            pattern,
254            escape_char,
255            case_insensitive,
256        }) => {
257            if escape_char.is_some() {
258                return exec_err!("SIMILAR TO does not support escape_char yet");
259            }
260            let physical_expr =
261                create_physical_expr(expr, input_dfschema, execution_props)?;
262            let physical_pattern =
263                create_physical_expr(pattern, input_dfschema, execution_props)?;
264            similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
265        }
266        Expr::Case(case) => {
267            let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
268                Some(create_physical_expr(
269                    e.as_ref(),
270                    input_dfschema,
271                    execution_props,
272                )?)
273            } else {
274                None
275            };
276            let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
277                .when_then_expr
278                .iter()
279                .map(|(w, t)| (w.as_ref(), t.as_ref()))
280                .unzip();
281            let when_expr =
282                create_physical_exprs(when_expr, input_dfschema, execution_props)?;
283            let then_expr =
284                create_physical_exprs(then_expr, input_dfschema, execution_props)?;
285            let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
286                when_expr
287                    .iter()
288                    .zip(then_expr.iter())
289                    .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
290                    .collect();
291            let else_expr: Option<Arc<dyn PhysicalExpr>> =
292                if let Some(e) = &case.else_expr {
293                    Some(create_physical_expr(
294                        e.as_ref(),
295                        input_dfschema,
296                        execution_props,
297                    )?)
298                } else {
299                    None
300                };
301            Ok(expressions::case(expr, when_then_expr, else_expr)?)
302        }
303        Expr::Cast(Cast { expr, data_type }) => expressions::cast(
304            create_physical_expr(expr, input_dfschema, execution_props)?,
305            input_schema,
306            data_type.clone(),
307        ),
308        Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
309            create_physical_expr(expr, input_dfschema, execution_props)?,
310            input_schema,
311            data_type.clone(),
312        ),
313        Expr::Not(expr) => {
314            expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
315        }
316        Expr::Negative(expr) => expressions::negative(
317            create_physical_expr(expr, input_dfschema, execution_props)?,
318            input_schema,
319        ),
320        Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
321            expr,
322            input_dfschema,
323            execution_props,
324        )?),
325        Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
326            expr,
327            input_dfschema,
328            execution_props,
329        )?),
330        Expr::ScalarFunction(ScalarFunction { func, args }) => {
331            let physical_args =
332                create_physical_exprs(args, input_dfschema, execution_props)?;
333
334            Ok(Arc::new(ScalarFunctionExpr::try_new(
335                Arc::clone(func),
336                physical_args,
337                input_schema,
338            )?))
339        }
340        Expr::Between(Between {
341            expr,
342            negated,
343            low,
344            high,
345        }) => {
346            let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
347            let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
348            let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
349
350            // rewrite the between into the two binary operators
351            let binary_expr = binary(
352                binary(
353                    Arc::clone(&value_expr),
354                    Operator::GtEq,
355                    low_expr,
356                    input_schema,
357                )?,
358                Operator::And,
359                binary(
360                    Arc::clone(&value_expr),
361                    Operator::LtEq,
362                    high_expr,
363                    input_schema,
364                )?,
365                input_schema,
366            );
367
368            if *negated {
369                expressions::not(binary_expr?)
370            } else {
371                binary_expr
372            }
373        }
374        Expr::InList(InList {
375            expr,
376            list,
377            negated,
378        }) => match expr.as_ref() {
379            Expr::Literal(ScalarValue::Utf8(None), _) => {
380                Ok(expressions::lit(ScalarValue::Boolean(None)))
381            }
382            _ => {
383                let value_expr =
384                    create_physical_expr(expr, input_dfschema, execution_props)?;
385
386                let list_exprs =
387                    create_physical_exprs(list, input_dfschema, execution_props)?;
388                expressions::in_list(value_expr, list_exprs, negated, input_schema)
389            }
390        },
391        Expr::Placeholder(Placeholder { id, .. }) => {
392            exec_err!("Placeholder '{id}' was not provided a value for execution.")
393        }
394        other => {
395            not_impl_err!("Physical plan does not support logical expression {other:?}")
396        }
397    }
398}
399
400/// Create vector of Physical Expression from a vector of logical expression
401pub fn create_physical_exprs<'a, I>(
402    exprs: I,
403    input_dfschema: &DFSchema,
404    execution_props: &ExecutionProps,
405) -> Result<Vec<Arc<dyn PhysicalExpr>>>
406where
407    I: IntoIterator<Item = &'a Expr>,
408{
409    exprs
410        .into_iter()
411        .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
412        .collect::<Result<Vec<_>>>()
413}
414
415/// Convert a logical expression to a physical expression (without any simplification, etc)
416pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
417    let df_schema = schema.clone().to_dfschema().unwrap();
418    let execution_props = ExecutionProps::new();
419    create_physical_expr(expr, &df_schema, &execution_props).unwrap()
420}
421
422#[cfg(test)]
423mod tests {
424    use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
425    use arrow::datatypes::{DataType, Field};
426
427    use datafusion_expr::{col, lit};
428
429    use super::*;
430
431    #[test]
432    fn test_create_physical_expr_scalar_input_output() -> Result<()> {
433        let expr = col("letter").eq(lit("A"));
434
435        let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
436        let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
437        let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
438
439        let batch = RecordBatch::try_new(
440            Arc::new(schema),
441            vec![Arc::new(StringArray::from_iter_values(vec![
442                "A", "B", "C", "D",
443            ]))],
444        )?;
445        let result = p.evaluate(&batch)?;
446        let result = result.into_array(4).expect("Failed to convert to array");
447
448        assert_eq!(
449            &result,
450            &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
451        );
452
453        Ok(())
454    }
455}