Skip to main content

datafusion_physical_expr/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::scalar_subquery::ScalarSubqueryExpr;
21use crate::{HigherOrderFunctionExpr, ScalarFunctionExpr};
22use crate::{
23    PhysicalExpr,
24    expressions::{self, Column, Literal, binary, like, similar_to},
25};
26
27use arrow::datatypes::Schema;
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::datatype::FieldExt;
30use datafusion_common::metadata::{FieldMetadata, format_type_and_metadata};
31use datafusion_common::{
32    DFSchema, Result, ScalarValue, TableReference, ToDFSchema, exec_err,
33    internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
34};
35use datafusion_expr::execution_props::ExecutionProps;
36use datafusion_expr::expr::{
37    Alias, Cast, HigherOrderFunction, InList, Lambda, LambdaVariable, Placeholder,
38    ScalarFunction,
39};
40use datafusion_expr::var_provider::VarType;
41use datafusion_expr::var_provider::is_system_variables;
42use datafusion_expr::{
43    Between, BinaryExpr, Expr, ExprSchemable, Like, Operator, TryCast, binary_expr, lit,
44};
45
46/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
47/// AS int)`.
48///
49/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical
50/// planning, and can be evaluated directly on a [RecordBatch]. They are
51/// normally created from [Expr] by a [PhysicalPlanner] and can be created
52/// directly using [create_physical_expr].
53///
54/// A Physical expression knows its type, nullability and how to evaluate itself.
55///
56/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
57/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
58///
59/// # Example: Create `PhysicalExpr` from `Expr`
60/// ```
61/// # use arrow::datatypes::{DataType, Field, Schema};
62/// # use datafusion_common::DFSchema;
63/// # use datafusion_expr::{Expr, col, lit};
64/// # use datafusion_physical_expr::create_physical_expr;
65/// # use datafusion_expr::execution_props::ExecutionProps;
66/// // For a logical expression `a = 1`, we can create a physical expression
67/// let expr = col("a").eq(lit(1));
68/// // To create a PhysicalExpr we need 1. a schema
69/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
70/// let df_schema = DFSchema::try_from(schema).unwrap();
71/// // 2. ExecutionProps
72/// let props = ExecutionProps::new();
73/// // We can now create a PhysicalExpr:
74/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
75/// ```
76///
77/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue]
78/// ```
79/// # use std::sync::Arc;
80/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
81/// # use arrow::datatypes::{DataType, Field, Schema};
82/// # use datafusion_common::{assert_batches_eq, DFSchema};
83/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
84/// # use datafusion_physical_expr::create_physical_expr;
85/// # use datafusion_expr::execution_props::ExecutionProps;
86/// # let expr = col("a").eq(lit(1));
87/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
88/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
89/// # let props = ExecutionProps::new();
90/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this:
91/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
92/// // Input of [1,2,3]
93/// let input_batch = RecordBatch::try_from_iter(vec![
94///   ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
95/// ]).unwrap();
96/// // The result is a ColumnarValue (either an Array or a Scalar)
97/// let result = physical_expr.evaluate(&input_batch).unwrap();
98/// // In this case, a BooleanArray with the result of the comparison
99/// let ColumnarValue::Array(arr) = result else {
100///  panic!("Expected an array")
101/// };
102/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false]));
103/// ```
104///
105/// [ColumnarValue]: datafusion_expr::ColumnarValue
106///
107/// Create a physical expression from a logical expression ([Expr]).
108///
109/// # Arguments
110///
111/// * `e` - The logical expression
112/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
113///   to qualified or unqualified fields by name.
114#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
115pub fn create_physical_expr(
116    e: &Expr,
117    input_dfschema: &DFSchema,
118    execution_props: &ExecutionProps,
119) -> Result<Arc<dyn PhysicalExpr>> {
120    let input_schema = input_dfschema.as_arrow();
121
122    match e {
123        Expr::Alias(Alias { expr, metadata, .. }) => {
124            if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
125                let new_metadata = FieldMetadata::merge_options(
126                    prior_metadata.as_ref(),
127                    metadata.as_ref(),
128                );
129                Ok(Arc::new(Literal::new_with_metadata(
130                    v.clone(),
131                    new_metadata,
132                )))
133            } else {
134                Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
135            }
136        }
137        Expr::Column(c) => {
138            let idx = input_dfschema.index_of_column(c)?;
139            Ok(Arc::new(Column::new(&c.name, idx)))
140        }
141        Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
142            value.clone(),
143            metadata.clone(),
144        ))),
145        Expr::ScalarVariable(_, variable_names) => {
146            if is_system_variables(variable_names) {
147                match execution_props.get_var_provider(VarType::System) {
148                    Some(provider) => {
149                        let scalar_value = provider.get_value(variable_names.clone())?;
150                        Ok(Arc::new(Literal::new(scalar_value)))
151                    }
152                    _ => plan_err!("No system variable provider found"),
153                }
154            } else {
155                match execution_props.get_var_provider(VarType::UserDefined) {
156                    Some(provider) => {
157                        let scalar_value = provider.get_value(variable_names.clone())?;
158                        Ok(Arc::new(Literal::new(scalar_value)))
159                    }
160                    _ => plan_err!("No user defined variable provider found"),
161                }
162            }
163        }
164        Expr::IsTrue(expr) => {
165            let binary_op = binary_expr(
166                expr.as_ref().clone(),
167                Operator::IsNotDistinctFrom,
168                lit(true),
169            );
170            create_physical_expr(&binary_op, input_dfschema, execution_props)
171        }
172        Expr::IsNotTrue(expr) => {
173            let binary_op =
174                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
175            create_physical_expr(&binary_op, input_dfschema, execution_props)
176        }
177        Expr::IsFalse(expr) => {
178            let binary_op = binary_expr(
179                expr.as_ref().clone(),
180                Operator::IsNotDistinctFrom,
181                lit(false),
182            );
183            create_physical_expr(&binary_op, input_dfschema, execution_props)
184        }
185        Expr::IsNotFalse(expr) => {
186            let binary_op =
187                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
188            create_physical_expr(&binary_op, input_dfschema, execution_props)
189        }
190        Expr::IsUnknown(expr) => {
191            let binary_op = binary_expr(
192                expr.as_ref().clone(),
193                Operator::IsNotDistinctFrom,
194                Expr::Literal(ScalarValue::Boolean(None), None),
195            );
196            create_physical_expr(&binary_op, input_dfschema, execution_props)
197        }
198        Expr::IsNotUnknown(expr) => {
199            let binary_op = binary_expr(
200                expr.as_ref().clone(),
201                Operator::IsDistinctFrom,
202                Expr::Literal(ScalarValue::Boolean(None), None),
203            );
204            create_physical_expr(&binary_op, input_dfschema, execution_props)
205        }
206        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
207            // Create physical expressions for left and right operands
208            let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
209            let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
210            // Note that the logical planner is responsible
211            // for type coercion on the arguments (e.g. if one
212            // argument was originally Int32 and one was
213            // Int64 they will both be coerced to Int64).
214            //
215            // There should be no coercion during physical
216            // planning.
217            binary(lhs, *op, rhs, input_schema)
218        }
219        Expr::Like(Like {
220            negated,
221            expr,
222            pattern,
223            escape_char,
224            case_insensitive,
225        }) => {
226            // `\` is the implicit escape, see https://github.com/apache/datafusion/issues/13291
227            if escape_char.unwrap_or('\\') != '\\' {
228                return exec_err!(
229                    "LIKE does not support escape_char other than the backslash (\\)"
230                );
231            }
232            let physical_expr =
233                create_physical_expr(expr, input_dfschema, execution_props)?;
234            let physical_pattern =
235                create_physical_expr(pattern, input_dfschema, execution_props)?;
236            like(
237                *negated,
238                *case_insensitive,
239                physical_expr,
240                physical_pattern,
241                input_schema,
242            )
243        }
244        Expr::SimilarTo(Like {
245            negated,
246            expr,
247            pattern,
248            escape_char,
249            case_insensitive,
250        }) => {
251            if escape_char.is_some() {
252                return exec_err!("SIMILAR TO does not support escape_char yet");
253            }
254            let physical_expr =
255                create_physical_expr(expr, input_dfschema, execution_props)?;
256            let physical_pattern =
257                create_physical_expr(pattern, input_dfschema, execution_props)?;
258            similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
259        }
260        Expr::Case(case) => {
261            let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
262                Some(create_physical_expr(
263                    e.as_ref(),
264                    input_dfschema,
265                    execution_props,
266                )?)
267            } else {
268                None
269            };
270            let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
271                .when_then_expr
272                .iter()
273                .map(|(w, t)| (w.as_ref(), t.as_ref()))
274                .unzip();
275            let when_expr =
276                create_physical_exprs(when_expr, input_dfschema, execution_props)?;
277            let then_expr =
278                create_physical_exprs(then_expr, input_dfschema, execution_props)?;
279            let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
280                when_expr
281                    .iter()
282                    .zip(then_expr.iter())
283                    .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
284                    .collect();
285            let else_expr: Option<Arc<dyn PhysicalExpr>> =
286                if let Some(e) = &case.else_expr {
287                    Some(create_physical_expr(
288                        e.as_ref(),
289                        input_dfschema,
290                        execution_props,
291                    )?)
292                } else {
293                    None
294                };
295            Ok(expressions::case(expr, when_then_expr, else_expr)?)
296        }
297        Expr::Cast(Cast { expr, field }) => expressions::cast_with_target_field(
298            create_physical_expr(expr, input_dfschema, execution_props)?,
299            input_schema,
300            Arc::clone(field),
301            None,
302        ),
303        Expr::TryCast(TryCast { expr, field }) => {
304            if !field.metadata().is_empty() {
305                let (_, src_field) = expr.to_field(input_dfschema)?;
306                return plan_err!(
307                    "TryCast from {} to {} is not supported",
308                    format_type_and_metadata(
309                        src_field.data_type(),
310                        Some(src_field.metadata()),
311                    ),
312                    format_type_and_metadata(field.data_type(), Some(field.metadata()))
313                );
314            }
315
316            expressions::try_cast(
317                create_physical_expr(expr, input_dfschema, execution_props)?,
318                input_schema,
319                field.data_type().clone(),
320            )
321        }
322        Expr::Not(expr) => {
323            expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
324        }
325        Expr::Negative(expr) => expressions::negative(
326            create_physical_expr(expr, input_dfschema, execution_props)?,
327            input_schema,
328        ),
329        Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
330            expr,
331            input_dfschema,
332            execution_props,
333        )?),
334        Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
335            expr,
336            input_dfschema,
337            execution_props,
338        )?),
339        Expr::ScalarFunction(ScalarFunction { func, args }) => {
340            let physical_args =
341                create_physical_exprs(args, input_dfschema, execution_props)?;
342            let config_options = match execution_props.config_options.as_ref() {
343                Some(config_options) => Arc::clone(config_options),
344                None => Arc::new(ConfigOptions::default()),
345            };
346
347            Ok(Arc::new(ScalarFunctionExpr::try_new(
348                Arc::clone(func),
349                physical_args,
350                input_schema,
351                config_options,
352            )?))
353        }
354        Expr::Between(Between {
355            expr,
356            negated,
357            low,
358            high,
359        }) => {
360            let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
361            let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
362            let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
363
364            // rewrite the between into the two binary operators
365            let binary_expr = binary(
366                binary(
367                    Arc::clone(&value_expr),
368                    Operator::GtEq,
369                    low_expr,
370                    input_schema,
371                )?,
372                Operator::And,
373                binary(
374                    Arc::clone(&value_expr),
375                    Operator::LtEq,
376                    high_expr,
377                    input_schema,
378                )?,
379                input_schema,
380            );
381
382            if *negated {
383                expressions::not(binary_expr?)
384            } else {
385                binary_expr
386            }
387        }
388        Expr::InList(InList {
389            expr,
390            list,
391            negated,
392        }) => match expr.as_ref() {
393            Expr::Literal(ScalarValue::Utf8(None), _) => {
394                Ok(expressions::lit(ScalarValue::Boolean(None)))
395            }
396            _ => {
397                let value_expr =
398                    create_physical_expr(expr, input_dfschema, execution_props)?;
399
400                let list_exprs =
401                    create_physical_exprs(list, input_dfschema, execution_props)?;
402                expressions::in_list(value_expr, list_exprs, negated, input_schema)
403            }
404        },
405        Expr::ScalarSubquery(sq) => {
406            match execution_props.subquery_indexes.get(sq) {
407                Some(&index) => {
408                    let schema = sq.subquery.schema();
409                    if schema.fields().len() != 1 {
410                        return plan_err!(
411                            "Scalar subquery must return exactly one column, got {}",
412                            schema.fields().len()
413                        );
414                    }
415                    let dt = schema.field(0).data_type().clone();
416                    let nullable = schema.field(0).is_nullable();
417                    Ok(Arc::new(ScalarSubqueryExpr::new(
418                        dt,
419                        nullable,
420                        index,
421                        execution_props.subquery_results.clone(),
422                    )))
423                }
424                None => {
425                    // Not found: either a correlated subquery that wasn't
426                    // rewritten to a join, or an uncorrelated one that wasn't
427                    // registered by the physical planner.
428                    not_impl_err!(
429                        "Physical plan does not support logical expression {e:?}"
430                    )
431                }
432            }
433        }
434        Expr::Placeholder(Placeholder { id, .. }) => {
435            exec_err!("Placeholder '{id}' was not provided a value for execution.")
436        }
437        Expr::HigherOrderFunction(invocation @ HigherOrderFunction { func, args }) => {
438            let num_lambdas = args
439                .iter()
440                .filter(|arg| matches!(arg, Expr::Lambda(_)))
441                .count();
442
443            let mut lambda_parameters =
444                invocation.lambda_parameters(input_dfschema)?.into_iter();
445
446            if num_lambdas > lambda_parameters.len() {
447                return plan_err!(
448                    "{} lambda_parameters returned only {} values for {num_lambdas} lambdas",
449                    func.name(),
450                    lambda_parameters.len()
451                );
452            }
453
454            let lambda_qualifier = 1 + input_dfschema
455                .iter()
456                .filter_map(|(qualifier, _field)| {
457                    qualifier.and_then(|tbl| {
458                        tbl.table().strip_prefix("lambda_")?.parse::<usize>().ok()
459                    })
460                })
461                .max()
462                .unwrap_or_default();
463
464            let qualifier = TableReference::bare(format!("lambda_{lambda_qualifier}"));
465
466            let physical_args = args
467                .iter()
468                .map(|arg| match arg {
469                    Expr::Lambda(lambda) => {
470                        let lambda_parameters = lambda_parameters
471                            .next()
472                            .ok_or_else(|| {
473                                internal_datafusion_err!(
474                                    "lambda_parameters len should have been checked above"
475                                )
476                            })?
477                            .into_iter()
478                            .zip(&lambda.params)
479                            .map(|(field, name)| {
480                                (Some(qualifier.clone()), field.renamed(name.as_str()))
481                            });
482
483                        let new_fields = input_dfschema
484                            .iter()
485                            .map(|(tbl, field)| (tbl.cloned(), Arc::clone(field)))
486                            .chain(lambda_parameters)
487                            .collect();
488
489                        let lambda_schema = DFSchema::new_with_metadata(
490                            new_fields,
491                            input_dfschema.metadata().clone(),
492                        )?;
493
494                        let execution_props = execution_props
495                            .clone()
496                            .with_qualified_lambda_variables(&qualifier, &lambda.params);
497
498                        create_physical_expr(arg, &lambda_schema, &execution_props)
499                    }
500                    _ => create_physical_expr(arg, input_dfschema, execution_props),
501                })
502                .collect::<Result<_>>()?;
503
504            let config_options = match execution_props.config_options.as_ref() {
505                Some(config_options) => Arc::clone(config_options),
506                None => Arc::new(ConfigOptions::default()),
507            };
508
509            Ok(Arc::new(HigherOrderFunctionExpr::try_new_with_schema(
510                Arc::clone(func),
511                physical_args,
512                input_schema,
513                config_options,
514            )?))
515        }
516        Expr::Lambda(Lambda { params, body }) => expressions::lambda(
517            params,
518            create_physical_expr(body, input_dfschema, execution_props)?,
519        ),
520        Expr::LambdaVariable(LambdaVariable {
521            name,
522            field,
523            spans: _,
524        }) => {
525            let field = field.as_ref().ok_or_else(|| {
526                plan_datafusion_err!("unresolved LambdaVariable {name}")
527            })?;
528
529            let qualifier = execution_props
530                .lambda_variable_qualifier
531                .get(name)
532                .ok_or_else(|| {
533                    plan_datafusion_err!("qualifier for lambda variable {name} not found")
534                })?;
535
536            let index = input_dfschema
537                .index_of_column_by_name(Some(qualifier), name)
538                .ok_or_else(|| {
539                    plan_datafusion_err!(
540                        "lambda variable {qualifier}.{name} not found in planning schema"
541                    )
542                })?;
543
544            let schema_field = input_dfschema.field(index);
545
546            // LambdaVariable.field will be made optional as in Expr::Placeholder
547            // and only LambdaVariable.name used, and field.name ignored,
548            // so they're not enforced to match for logical expressions
549            // Rename the field to match the schema one and use it's PartialEq impl instead
550            // of checking property by property and fail if new properties get's added to it.
551            // While not necessary, the sql planner does create lambda vars with matching names,
552            // so this shouldn't allocate with a lambda var from it
553            let renamed_field = Arc::clone(field).renamed(name);
554
555            if &renamed_field != schema_field {
556                return plan_err!(
557                    "LambdaVariable field and schema field mismatch {} != {}",
558                    renamed_field,
559                    schema_field
560                );
561            }
562
563            Ok(Arc::new(expressions::LambdaVariable::new(
564                index,
565                Arc::clone(schema_field),
566            )))
567        }
568        other => {
569            not_impl_err!("Physical plan does not support logical expression {other:?}")
570        }
571    }
572}
573
574/// Create vector of Physical Expression from a vector of logical expression
575pub fn create_physical_exprs<'a, I>(
576    exprs: I,
577    input_dfschema: &DFSchema,
578    execution_props: &ExecutionProps,
579) -> Result<Vec<Arc<dyn PhysicalExpr>>>
580where
581    I: IntoIterator<Item = &'a Expr>,
582{
583    exprs
584        .into_iter()
585        .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
586        .collect()
587}
588
589/// Convert a logical expression to a physical expression (without any simplification, etc)
590pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
591    // TODO this makes a deep copy of the Schema. Should take SchemaRef instead and avoid deep copy
592    let df_schema = schema.clone().to_dfschema().unwrap();
593    let execution_props = ExecutionProps::new();
594    create_physical_expr(expr, &df_schema, &execution_props).unwrap()
595}
596
597#[cfg(test)]
598mod tests {
599    use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
600    use arrow::datatypes::{DataType, Field};
601    use datafusion_expr::col;
602
603    use super::*;
604
605    fn test_cast_schema() -> Schema {
606        Schema::new(vec![Field::new("a", DataType::Int32, false)])
607    }
608
609    fn lower_cast_expr(expr: &Expr, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
610        let df_schema = DFSchema::try_from(schema.clone())?;
611        create_physical_expr(expr, &df_schema, &ExecutionProps::new())
612    }
613
614    fn as_planner_cast(physical: &Arc<dyn PhysicalExpr>) -> &expressions::CastExpr {
615        physical
616            .downcast_ref::<expressions::CastExpr>()
617            .expect("planner should lower logical CAST to CastExpr")
618    }
619
620    #[test]
621    fn test_create_physical_expr_scalar_input_output() -> Result<()> {
622        let expr = col("letter").eq(lit("A"));
623
624        let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
625        let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
626        let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
627
628        let batch = RecordBatch::try_new(
629            Arc::new(schema),
630            vec![Arc::new(StringArray::from_iter_values(vec![
631                "A", "B", "C", "D",
632            ]))],
633        )?;
634        let result = p.evaluate(&batch)?;
635        let result = result.into_array(4).expect("Failed to convert to array");
636
637        assert_eq!(
638            &result,
639            &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
640        );
641
642        Ok(())
643    }
644
645    #[test]
646    fn test_cast_lowering_preserves_target_field_metadata() -> Result<()> {
647        let schema = test_cast_schema();
648        let target_field = Arc::new(
649            Field::new("cast_target", DataType::Int64, true)
650                .with_metadata([("target_meta".to_string(), "1".to_string())].into()),
651        );
652        let cast_expr = Expr::Cast(Cast::new_from_field(
653            Box::new(col("a")),
654            Arc::clone(&target_field),
655        ));
656
657        let physical = lower_cast_expr(&cast_expr, &schema)?;
658        let cast = as_planner_cast(&physical);
659
660        assert_eq!(cast.target_field(), &target_field);
661        assert_eq!(physical.return_field(&schema)?, target_field);
662        assert!(physical.nullable(&schema)?);
663
664        Ok(())
665    }
666
667    #[test]
668    fn test_cast_lowering_preserves_standard_cast_semantics() -> Result<()> {
669        let schema = test_cast_schema();
670        let cast_expr = Expr::Cast(Cast::new(Box::new(col("a")), DataType::Int64));
671
672        let physical = lower_cast_expr(&cast_expr, &schema)?;
673        let cast = as_planner_cast(&physical);
674        let returned_field = physical.return_field(&schema)?;
675
676        assert_eq!(cast.cast_type(), &DataType::Int64);
677        assert_eq!(returned_field.name(), "a");
678        assert_eq!(returned_field.data_type(), &DataType::Int64);
679        assert!(!physical.nullable(&schema)?);
680
681        Ok(())
682    }
683
684    #[test]
685    fn test_cast_lowering_preserves_same_type_field_semantics() -> Result<()> {
686        let schema = test_cast_schema();
687        let target_field = Arc::new(
688            Field::new("same_type_cast", DataType::Int32, true).with_metadata(
689                [("target_meta".to_string(), "same-type".to_string())].into(),
690            ),
691        );
692        let cast_expr = Expr::Cast(Cast::new_from_field(
693            Box::new(col("a")),
694            Arc::clone(&target_field),
695        ));
696
697        let physical = lower_cast_expr(&cast_expr, &schema)?;
698        let cast = as_planner_cast(&physical);
699
700        assert_eq!(cast.target_field(), &target_field);
701        assert_eq!(physical.return_field(&schema)?, target_field);
702        assert!(physical.nullable(&schema)?);
703
704        Ok(())
705    }
706
707    /// Test that deeply nested expressions do not cause a stack overflow.
708    ///
709    /// This test only runs when the `recursive_protection` feature is enabled,
710    /// as it would overflow the stack otherwise.
711    #[test]
712    #[cfg_attr(not(feature = "recursive_protection"), ignore)]
713    fn test_deeply_nested_binary_expr() -> Result<()> {
714        // Create a deeply nested binary expression tree: ((((a + a) + a) + a) + ... )
715        // With 1000 levels of nesting, this would overflow the stack without recursion protection.
716        let depth = 1000;
717
718        let mut expr = col("a");
719        for _ in 0..depth {
720            expr = Expr::BinaryExpr(BinaryExpr {
721                left: Box::new(expr),
722                op: Operator::Plus,
723                right: Box::new(col("a")),
724            });
725        }
726
727        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
728        let df_schema = DFSchema::try_from(schema)?;
729
730        // This should not stack overflow
731        let _physical_expr =
732            create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
733
734        Ok(())
735    }
736}