datafusion_physical_expr/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::ScalarFunctionExpr;
21use crate::{
22    expressions::{self, binary, like, similar_to, Column, Literal},
23    PhysicalExpr,
24};
25
26use arrow::datatypes::Schema;
27use datafusion_common::config::ConfigOptions;
28use datafusion_common::metadata::FieldMetadata;
29use datafusion_common::{
30    exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
31};
32use datafusion_expr::execution_props::ExecutionProps;
33use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
34use datafusion_expr::var_provider::is_system_variables;
35use datafusion_expr::var_provider::VarType;
36use datafusion_expr::{
37    binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
38};
39
40/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
41/// AS int)`.
42///
43/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical
44/// planning, and can be evaluated directly on a [RecordBatch]. They are
45/// normally created from [Expr] by a [PhysicalPlanner] and can be created
46/// directly using [create_physical_expr].
47///
48/// A Physical expression knows its type, nullability and how to evaluate itself.
49///
50/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
51/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
52///
53/// # Example: Create `PhysicalExpr` from `Expr`
54/// ```
55/// # use arrow::datatypes::{DataType, Field, Schema};
56/// # use datafusion_common::DFSchema;
57/// # use datafusion_expr::{Expr, col, lit};
58/// # use datafusion_physical_expr::create_physical_expr;
59/// # use datafusion_expr::execution_props::ExecutionProps;
60/// // For a logical expression `a = 1`, we can create a physical expression
61/// let expr = col("a").eq(lit(1));
62/// // To create a PhysicalExpr we need 1. a schema
63/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
64/// let df_schema = DFSchema::try_from(schema).unwrap();
65/// // 2. ExecutionProps
66/// let props = ExecutionProps::new();
67/// // We can now create a PhysicalExpr:
68/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
69/// ```
70///
71/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue]
72/// ```
73/// # use std::sync::Arc;
74/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
75/// # use arrow::datatypes::{DataType, Field, Schema};
76/// # use datafusion_common::{assert_batches_eq, DFSchema};
77/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
78/// # use datafusion_physical_expr::create_physical_expr;
79/// # use datafusion_expr::execution_props::ExecutionProps;
80/// # let expr = col("a").eq(lit(1));
81/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
82/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
83/// # let props = ExecutionProps::new();
84/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this:
85/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
86/// // Input of [1,2,3]
87/// let input_batch = RecordBatch::try_from_iter(vec![
88///   ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
89/// ]).unwrap();
90/// // The result is a ColumnarValue (either an Array or a Scalar)
91/// let result = physical_expr.evaluate(&input_batch).unwrap();
92/// // In this case, a BooleanArray with the result of the comparison
93/// let ColumnarValue::Array(arr) = result else {
94///  panic!("Expected an array")
95/// };
96/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false]));
97/// ```
98///
99/// [ColumnarValue]: datafusion_expr::ColumnarValue
100///
101/// Create a physical expression from a logical expression ([Expr]).
102///
103/// # Arguments
104///
105/// * `e` - The logical expression
106/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
107///   to qualified or unqualified fields by name.
108pub fn create_physical_expr(
109    e: &Expr,
110    input_dfschema: &DFSchema,
111    execution_props: &ExecutionProps,
112) -> Result<Arc<dyn PhysicalExpr>> {
113    let input_schema = input_dfschema.as_arrow();
114
115    match e {
116        Expr::Alias(Alias { expr, metadata, .. }) => {
117            if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
118                let new_metadata = FieldMetadata::merge_options(
119                    prior_metadata.as_ref(),
120                    metadata.as_ref(),
121                );
122                Ok(Arc::new(Literal::new_with_metadata(
123                    v.clone(),
124                    new_metadata,
125                )))
126            } else {
127                Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
128            }
129        }
130        Expr::Column(c) => {
131            let idx = input_dfschema.index_of_column(c)?;
132            Ok(Arc::new(Column::new(&c.name, idx)))
133        }
134        Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
135            value.clone(),
136            metadata.clone(),
137        ))),
138        Expr::ScalarVariable(_, variable_names) => {
139            if is_system_variables(variable_names) {
140                match execution_props.get_var_provider(VarType::System) {
141                    Some(provider) => {
142                        let scalar_value = provider.get_value(variable_names.clone())?;
143                        Ok(Arc::new(Literal::new(scalar_value)))
144                    }
145                    _ => plan_err!("No system variable provider found"),
146                }
147            } else {
148                match execution_props.get_var_provider(VarType::UserDefined) {
149                    Some(provider) => {
150                        let scalar_value = provider.get_value(variable_names.clone())?;
151                        Ok(Arc::new(Literal::new(scalar_value)))
152                    }
153                    _ => plan_err!("No user defined variable provider found"),
154                }
155            }
156        }
157        Expr::IsTrue(expr) => {
158            let binary_op = binary_expr(
159                expr.as_ref().clone(),
160                Operator::IsNotDistinctFrom,
161                lit(true),
162            );
163            create_physical_expr(&binary_op, input_dfschema, execution_props)
164        }
165        Expr::IsNotTrue(expr) => {
166            let binary_op =
167                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
168            create_physical_expr(&binary_op, input_dfschema, execution_props)
169        }
170        Expr::IsFalse(expr) => {
171            let binary_op = binary_expr(
172                expr.as_ref().clone(),
173                Operator::IsNotDistinctFrom,
174                lit(false),
175            );
176            create_physical_expr(&binary_op, input_dfschema, execution_props)
177        }
178        Expr::IsNotFalse(expr) => {
179            let binary_op =
180                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
181            create_physical_expr(&binary_op, input_dfschema, execution_props)
182        }
183        Expr::IsUnknown(expr) => {
184            let binary_op = binary_expr(
185                expr.as_ref().clone(),
186                Operator::IsNotDistinctFrom,
187                Expr::Literal(ScalarValue::Boolean(None), None),
188            );
189            create_physical_expr(&binary_op, input_dfschema, execution_props)
190        }
191        Expr::IsNotUnknown(expr) => {
192            let binary_op = binary_expr(
193                expr.as_ref().clone(),
194                Operator::IsDistinctFrom,
195                Expr::Literal(ScalarValue::Boolean(None), None),
196            );
197            create_physical_expr(&binary_op, input_dfschema, execution_props)
198        }
199        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
200            // Create physical expressions for left and right operands
201            let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
202            let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
203            // Note that the logical planner is responsible
204            // for type coercion on the arguments (e.g. if one
205            // argument was originally Int32 and one was
206            // Int64 they will both be coerced to Int64).
207            //
208            // There should be no coercion during physical
209            // planning.
210            binary(lhs, *op, rhs, input_schema)
211        }
212        Expr::Like(Like {
213            negated,
214            expr,
215            pattern,
216            escape_char,
217            case_insensitive,
218        }) => {
219            // `\` is the implicit escape, see https://github.com/apache/datafusion/issues/13291
220            if escape_char.unwrap_or('\\') != '\\' {
221                return exec_err!(
222                    "LIKE does not support escape_char other than the backslash (\\)"
223                );
224            }
225            let physical_expr =
226                create_physical_expr(expr, input_dfschema, execution_props)?;
227            let physical_pattern =
228                create_physical_expr(pattern, input_dfschema, execution_props)?;
229            like(
230                *negated,
231                *case_insensitive,
232                physical_expr,
233                physical_pattern,
234                input_schema,
235            )
236        }
237        Expr::SimilarTo(Like {
238            negated,
239            expr,
240            pattern,
241            escape_char,
242            case_insensitive,
243        }) => {
244            if escape_char.is_some() {
245                return exec_err!("SIMILAR TO does not support escape_char yet");
246            }
247            let physical_expr =
248                create_physical_expr(expr, input_dfschema, execution_props)?;
249            let physical_pattern =
250                create_physical_expr(pattern, input_dfschema, execution_props)?;
251            similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
252        }
253        Expr::Case(case) => {
254            let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
255                Some(create_physical_expr(
256                    e.as_ref(),
257                    input_dfschema,
258                    execution_props,
259                )?)
260            } else {
261                None
262            };
263            let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
264                .when_then_expr
265                .iter()
266                .map(|(w, t)| (w.as_ref(), t.as_ref()))
267                .unzip();
268            let when_expr =
269                create_physical_exprs(when_expr, input_dfschema, execution_props)?;
270            let then_expr =
271                create_physical_exprs(then_expr, input_dfschema, execution_props)?;
272            let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
273                when_expr
274                    .iter()
275                    .zip(then_expr.iter())
276                    .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
277                    .collect();
278            let else_expr: Option<Arc<dyn PhysicalExpr>> =
279                if let Some(e) = &case.else_expr {
280                    Some(create_physical_expr(
281                        e.as_ref(),
282                        input_dfschema,
283                        execution_props,
284                    )?)
285                } else {
286                    None
287                };
288            Ok(expressions::case(expr, when_then_expr, else_expr)?)
289        }
290        Expr::Cast(Cast { expr, data_type }) => expressions::cast(
291            create_physical_expr(expr, input_dfschema, execution_props)?,
292            input_schema,
293            data_type.clone(),
294        ),
295        Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
296            create_physical_expr(expr, input_dfschema, execution_props)?,
297            input_schema,
298            data_type.clone(),
299        ),
300        Expr::Not(expr) => {
301            expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
302        }
303        Expr::Negative(expr) => expressions::negative(
304            create_physical_expr(expr, input_dfschema, execution_props)?,
305            input_schema,
306        ),
307        Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
308            expr,
309            input_dfschema,
310            execution_props,
311        )?),
312        Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
313            expr,
314            input_dfschema,
315            execution_props,
316        )?),
317        Expr::ScalarFunction(ScalarFunction { func, args }) => {
318            let physical_args =
319                create_physical_exprs(args, input_dfschema, execution_props)?;
320            let config_options = match execution_props.config_options.as_ref() {
321                Some(config_options) => Arc::clone(config_options),
322                None => Arc::new(ConfigOptions::default()),
323            };
324
325            Ok(Arc::new(ScalarFunctionExpr::try_new(
326                Arc::clone(func),
327                physical_args,
328                input_schema,
329                config_options,
330            )?))
331        }
332        Expr::Between(Between {
333            expr,
334            negated,
335            low,
336            high,
337        }) => {
338            let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
339            let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
340            let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
341
342            // rewrite the between into the two binary operators
343            let binary_expr = binary(
344                binary(
345                    Arc::clone(&value_expr),
346                    Operator::GtEq,
347                    low_expr,
348                    input_schema,
349                )?,
350                Operator::And,
351                binary(
352                    Arc::clone(&value_expr),
353                    Operator::LtEq,
354                    high_expr,
355                    input_schema,
356                )?,
357                input_schema,
358            );
359
360            if *negated {
361                expressions::not(binary_expr?)
362            } else {
363                binary_expr
364            }
365        }
366        Expr::InList(InList {
367            expr,
368            list,
369            negated,
370        }) => match expr.as_ref() {
371            Expr::Literal(ScalarValue::Utf8(None), _) => {
372                Ok(expressions::lit(ScalarValue::Boolean(None)))
373            }
374            _ => {
375                let value_expr =
376                    create_physical_expr(expr, input_dfschema, execution_props)?;
377
378                let list_exprs =
379                    create_physical_exprs(list, input_dfschema, execution_props)?;
380                expressions::in_list(value_expr, list_exprs, negated, input_schema)
381            }
382        },
383        Expr::Placeholder(Placeholder { id, .. }) => {
384            exec_err!("Placeholder '{id}' was not provided a value for execution.")
385        }
386        other => {
387            not_impl_err!("Physical plan does not support logical expression {other:?}")
388        }
389    }
390}
391
392/// Create vector of Physical Expression from a vector of logical expression
393pub fn create_physical_exprs<'a, I>(
394    exprs: I,
395    input_dfschema: &DFSchema,
396    execution_props: &ExecutionProps,
397) -> Result<Vec<Arc<dyn PhysicalExpr>>>
398where
399    I: IntoIterator<Item = &'a Expr>,
400{
401    exprs
402        .into_iter()
403        .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
404        .collect()
405}
406
407/// Convert a logical expression to a physical expression (without any simplification, etc)
408pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
409    // TODO this makes a deep copy of the Schema. Should take SchemaRef instead and avoid deep copy
410    let df_schema = schema.clone().to_dfschema().unwrap();
411    let execution_props = ExecutionProps::new();
412    create_physical_expr(expr, &df_schema, &execution_props).unwrap()
413}
414
415#[cfg(test)]
416mod tests {
417    use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
418    use arrow::datatypes::{DataType, Field};
419
420    use datafusion_expr::{col, lit};
421
422    use super::*;
423
424    #[test]
425    fn test_create_physical_expr_scalar_input_output() -> Result<()> {
426        let expr = col("letter").eq(lit("A"));
427
428        let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
429        let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
430        let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
431
432        let batch = RecordBatch::try_new(
433            Arc::new(schema),
434            vec![Arc::new(StringArray::from_iter_values(vec![
435                "A", "B", "C", "D",
436            ]))],
437        )?;
438        let result = p.evaluate(&batch)?;
439        let result = result.into_array(4).expect("Failed to convert to array");
440
441        assert_eq!(
442            &result,
443            &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
444        );
445
446        Ok(())
447    }
448}